Gareth Edwards [https://community.jboss.org/people/garethed] created the discussion
"Re: Problem with multiple sessions"
To view the discussion, visit: https://community.jboss.org/message/806960#806960
--------------------------------------------------------------
Cheers,
Here are the main classes. I have removed some irrelevant methods and renamed certain variables.
I changed to this:
StatefulKnowledgeSession s = (StatefulKnowledgeSession) event.getKnowledgeRuntime();
s.execute(dc);
but still see the same behaviour.
package com.x.api.service.impl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import javax.persistence.EntityManagerFactory;
import org.apache.log4j.Logger;
import org.drools.KnowledgeBase;
import org.drools.KnowledgeBaseFactory;
import org.drools.event.process.DefaultProcessEventListener;
import org.drools.event.process.ProcessCompletedEvent;
import org.drools.persistence.jpa.JPAKnowledgeService;
import org.drools.runtime.Environment;
import org.drools.runtime.EnvironmentName;
import org.drools.runtime.StatefulKnowledgeSession;
import org.jbpm.process.audit.JPAWorkingMemoryDbLogger;
import org.jbpm.process.workitem.wsht.AsyncHornetQHTWorkItemHandler;
import org.jbpm.task.service.hornetq.AsyncHornetQTaskClient;
import org.jbpm.task.utils.OnErrorAction;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import com.x.api.jbpm.handler.BiometricVerificationHandler;
import com.x.api.jbpm.handler.WorkflowCurrentTaskHandler;
import com.x.api.jbpm.listener.WorkflowEventListener;
import com.x.api.model.WorkflowSession;
import com.x.api.model.WorkflowSettings;
import com.x.api.model.user.SessionUser;
import com.x.api.model.user.User;
import com.x.api.service.PackageService;
import com.x.api.service.WorkflowSessionService;
import com.x.api.service.WorkflowSessionService;
import com.x.api.util.CMTDisposeCommand;
import com.x.api.util.xAPIException;
public class WorkflowSessionServiceImpl implements InitializingBean, WorkflowSessionService {
         private static Logger log = Logger.getLogger(WorkflowSessionServiceImpl.class);
Â
         @Autowired
         private PackageService packageService;
Â
         @Autowired
         private WorkflowSessionService WorkflowSessionService;
Â
         private StatefulKnowledgeSession ksession = null;
Â
         private ArrayList<WorkflowSessionTask> workFlowSessionTasks;
Â
         private Queue<WorkflowSettings> workflowQueue = new LinkedList<WorkflowSettings>();
Â
         @Value("${HumanTaskServiceIp}")
         private String ipAddress;
Â
         @Override
         public void afterPropertiesSet() throws Exception {
                   workFlowSessionTasks = new ArrayList<WorkflowSessionServiceImpl.WorkflowSessionTask>();
         }
Â
         @Override
         public List<WorkflowSession> resumeWorkflowSessions(User user)throws xAPIException{
                   List<WorkflowSession> workflowSessions = WorkflowSessionService.getActiveSessions();
                   List<WorkflowSession> resumedSessions = new ArrayList<WorkflowSession>();
Â
                   for (WorkflowSession WorkflowSession : workflowSessions) {
                             if (!isSessionLoaded(WorkflowSession.getId())){
                                       if (this.resumeWorkflowSession(WorkflowSession, user))
                                                 resumedSessions.add(WorkflowSession);
                             }
                   }
                   return resumedSessions;
         }
Â
         private boolean resumeWorkflowSession(WorkflowSession pws, User user)throws xAPIException{
                   boolean success = false;
                   EntityManagerFactory emf = packageService.getEntityManagerFactory();
                   Environment env = KnowledgeBaseFactory.newEnvironment();
                   KnowledgeBase kbase;
                   try {
                             kbase = packageService.getKnowledgeBase(pws.getPackageRef(), pws.getPackageVersion());
                      env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf);
                      StatefulKnowledgeSession sks = JPAKnowledgeService.loadStatefulKnowledgeSession(pws.getId(), kbase, null, env);
                      WorkflowSettings wfs = new WorkflowSettings();
                      wfs.setActive(true);
                      wfs.setCompleted(false);
                      wfs.setId(sks.getId());
                      wfs.setPackageName(pws.getPackageRef());
                      wfs.setPackageVersion(pws.getPackageVersion());
                      WorkflowSessionTask wst = new WorkflowSessionTask(WorkflowSessionService, sks, wfs);
                      // Make a copy of the user to enable updates of workflowSession table
                      final User u = new SessionUser(user.getId());
                      wst.startWorkFlow(false, u);
                      workFlowSessionTasks.add(wst);
                      success = true;
                   } catch (Exception e) {
                             success = false;
                             //throw new xAPIException("Error getting knowledge base:" + e.getLocalizedMessage());
                   }
                   return success;
         }
Â
         private boolean isSessionLoaded(int sessionId){
Â
                   for (WorkflowSessionTask workflowSessionTask : workFlowSessionTasks) {
                             if (workflowSessionTask.ksession.getId() == sessionId && !workflowSessionTask.completed){
                                       return true;
                             }
                   }
                   return false;
         }
Â
         @Override
         public synchronized int startNewSession(WorkflowSettings wfs) throws xAPIException{
                   purgeDeadSessions();
Â
                   KnowledgeBase kbase = packageService.getKnowledgeBase(wfs.getPackageName(), wfs.getPackageVersion());
Â
                   //StatefulKnowledgeSession ksession = getSession(kbase);
                   //if (ksession == null)
                             ksession = getSession(kbase);
                   log.debug("Starting new session:" + ksession.getId());
                   wfs.setSessionId(ksession.getId());
                   WorkflowSessionTask wst = new WorkflowSessionTask(WorkflowSessionService, ksession, wfs);
                   // Make a copy of the user to enable updates of workflowSession table
            final User u = new SessionUser(wfs.getUser().getId());
                   wst.startWorkFlow(true, u);
Â
                   this.workFlowSessionTasks.add(wst);
Â
                   log.info("WorkflowSettings:" + wfs.toString());
                   return ksession.getId();
         }
Â
           private StatefulKnowledgeSession getSession(KnowledgeBase kb){
                     Environment env = KnowledgeBaseFactory.newEnvironment();
               env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, packageService.getEntityManagerFactory());
                     StatefulKnowledgeSession ksession = JPAKnowledgeService.newStatefulKnowledgeSession(kb, null, env);
                     return ksession;
           }
          Â
           protected class WorkflowSessionTask{
                    Â
                     private StatefulKnowledgeSession ksession;
                     private WorkflowSettings workflowSettings;
                     private boolean completed;
                     private WorkflowSessionService pws;
                    Â
                     public WorkflowSessionTask(WorkflowSessionService pws, StatefulKnowledgeSession session, WorkflowSettings wfs){
                               this.ksession = session;
                               this.workflowSettings = wfs;
                               this.pws = pws;
                     }
                   public void startWorkFlow(boolean startProcess, final User user) {
                             this.completed = false;
                             JPAWorkingMemoryDbLogger logger = new JPAWorkingMemoryDbLogger(ksession);
                String connectorName = "Hornet" + UUID.randomUUID().toString();
                final AsyncHornetQHTWorkItemHandler humanTaskHandler = new AsyncHornetQHTWorkItemHandler(new AsyncHornetQTaskClient(connectorName), ksession, OnErrorAction.LOG);
                //final HumanTaskHandler humanTaskHandler = new HumanTaskHandler(new AsyncHornetQTaskClient(connectorName), ksession, OnErrorAction.LOG);
                humanTaskHandler.setIpAddress(ipAddress);
                humanTaskHandler.setOwningSessionOnly(true);
                             final CMTDisposeCommand dc = new CMTDisposeCommand();
                             dc.setWorkitemhandler(humanTaskHandler);
Â
                             ksession.getWorkItemManager().registerWorkItemHandler("Human Task", humanTaskHandler);
                             ksession.getWorkItemManager().registerWorkItemHandler("UpdateWorkflowCurrentTask", new WorkflowCurrentTaskHandler(null, ksession.getId(), null));
                             ksession.addEventListener(new WorkflowEventListener(null, ksession.getId(), null));
                             ksession.addEventListener(new DefaultProcessEventListener(){
                                       @Override
                                       public void afterProcessCompleted(ProcessCompletedEvent event){
                                                 log.info("~~~~~~~~~Workflow Session:" + ksession.getId() + " Completed~~~~~~~~~");
                                                 log.info("Disposing of " + event.getProcessInstance().getProcessName() + "!");
                                                 StatefulKnowledgeSession s = (StatefulKnowledgeSession) event.getKnowledgeRuntime();
                                                 s.execute(dc);
                                                 //ksession.execute(dc);
                                                 completed = true;
                                                 workflowSettings.setCompleted(true);
                                       }
                             });
Â
                             if (startProcess)
                                       ksession.startProcess(workflowSettings.getWorkflowName(),workflowSettings.getWorkFlowData());
                   }
Â
                   public boolean getCompleted(){
                             return this.completed;
                   }
Â
                   public WorkflowSettings getWorkFlowSettings(){
                             return this.workflowSettings;
Â
                   }
Â
           }
          Â
           private void purgeDeadSessions(){
                     for (Iterator<WorkflowSessionTask> iterator = workFlowSessionTasks.iterator(); iterator.hasNext();) {
                             WorkflowSessionTask task = iterator.next();
                             if (task.getCompleted())
                                       iterator.remove();
                   }
           }
          Â
         @Override
         public synchronized ArrayList<WorkflowSettings> getWorkflowSettings(){
                     ArrayList<WorkflowSettings> sessions = new ArrayList<WorkflowSettings>();
                     for (WorkflowSessionTask task : workFlowSessionTasks ) {
                             sessions.add(task.getWorkFlowSettings());
                   }
                     return sessions;
         }
Â
         private boolean queueWorkflow(WorkflowSettings wfs){
                   log.info("Queueing workflow: " + wfs.getWorkflowName());
                   return this.workflowQueue.add(wfs);
         }
Â
Â
         public void dequeueAndStartWorkflows(){
                   log.info("Polling workflowQueue");
                   WorkflowSettings wfs = null;
                   do{
                             wfs = workflowQueue.poll();
                             if (wfs != null)
                                       try {
                                                 this.startNewSession(wfs);
                                       } catch (xAPIException e) {
                                                 // TODO Auto-generated catch block
                                                 e.printStackTrace();
                                       }
                   }
                   while(wfs!=null);
         }
}
CMTDisposeCommand
package com.x.api.util;
import javax.naming.InitialContext;
import javax.transaction.Synchronization;
import javax.transaction.TransactionManager;
import org.apache.log4j.Logger;
import org.drools.command.Context;
import org.drools.command.impl.GenericCommand;
import org.drools.command.impl.KnowledgeCommandContext;
import org.drools.runtime.StatefulKnowledgeSession;
import org.jbpm.process.workitem.wsht.AsyncGenericHTWorkItemHandler;
public class CMTDisposeCommand implements GenericCommand<Void> {
   private static final long serialVersionUID = 1L;
   private static Logger log = Logger.getLogger(CMTDisposeCommand.class);
   private String tmLookupName = "java:jboss/TransactionManager";
  Â
   public CMTDisposeCommand() {  Â
   }
   private AsyncGenericHTWorkItemHandler workitemhandler;
  Â
   public CMTDisposeCommand(String tmLookup) {
       this.tmLookupName = tmLookup;
   }
  Â
   public AsyncGenericHTWorkItemHandler getWorkitemhandler() {
                   return workitemhandler;
         }
         public void setWorkitemhandler(AsyncGenericHTWorkItemHandler workitemhandler) {
                   this.workitemhandler = workitemhandler;
         }
         @Override
   public Void execute(Context context) {
      Â
       final StatefulKnowledgeSession ksession = ((KnowledgeCommandContext) context).getStatefulKnowledgesession();
       try {
           TransactionManager tm = (TransactionManager) new InitialContext().lookup(tmLookupName);
           tm.getTransaction().registerSynchronization(new Synchronization() {
              Â
               @Override
               public void beforeCompletion() {
                   // not used here
                  Â
               }
              Â
               @Override
               public void afterCompletion(int arg0) {
                         if (workitemhandler != null)
                                                           try {
                                                                     log.info("Disposing workitemHandler for session:" + ksession.getId());
                                                                     workitemhandler.dispose();
                                                           } catch (Exception e) {
                                                                     e.printStackTrace();
                                                           }
                   ksession.dispose();             Â
               }
           });
       } catch (Exception e) {
           e.printStackTrace();
       }        Â
       return null;
         }
}
PackageService
package com.x.api.service.impl;
import java.net.URI;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Set;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;
import org.drools.KnowledgeBase;
import org.drools.KnowledgeBaseFactory;
import org.drools.builder.KnowledgeBuilder;
import org.drools.builder.KnowledgeBuilderFactory;
import org.drools.builder.ResourceType;
import org.drools.io.ResourceFactory;
import org.drools.io.impl.UrlResource;
import org.drools.runtime.Environment;
import org.drools.runtime.EnvironmentName;
import org.json.JSONArray;
import org.json.JSONObject;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.x.api.model.JbpmPackage;
import com.x.api.model.JbpmWorkflow;
import com.x.api.service.PackageService;
import com.x.api.util.APIException;
@Service("packageService")
public class PackageServiceImpl implements InitializingBean, PackageService {
         private static Logger log = Logger.getLogger(PackageService.class);
Â
         @Value( "${DroolsGuvnorUrl}" )
         private String url;
Â
         private EntityManagerFactory emf = null;
         private HashMap<String, KnowledgeBase> knowLedgeBases = new HashMap<String, KnowledgeBase>();
Â
         private List<JbpmPackage> packageCache;
Â
         // keys by package
         private Hashtable<String, List<JbpmWorkflow>> workflowCache;
Â
         @Override
         public void afterPropertiesSet() throws Exception {
                   // TODO load required knowledgeBases
                   this.emf = getEntityManagerFactory();
                   this.workflowCache = new Hashtable<String,List<JbpmWorkflow>>();
         }
Â
         @Override
         public void clearCaches() {
                   this.packageCache = null;
                   this.workflowCache = new Hashtable<String,List<JbpmWorkflow>>();
         }
Â
         @Override
         public List<JbpmPackage> getJbpmPackages() throws APIException {
                   // caching to speed up the requests
                   if (packageCache==null) {
                             List<JbpmPackage> packages = new ArrayList<JbpmPackage>();
                             String authorizationHeader = "Basic " + org.apache.cxf.common.util.Base64Utility.encode("admin:admin".getBytes());
                             HttpClient httpclient = new DefaultHttpClient();
                             try {
                                       URIBuilder builder = new URIBuilder(url +"/rest/packages");
Â
                                       URI uri = builder.build();
                                       HttpGet httpget = new HttpGet(uri);
                                       httpget.addHeader("Authorization", authorizationHeader);
                                       httpget.addHeader("Accept", "application/json");
                                       HttpResponse response = httpclient.execute(httpget);
                                       if (response.getStatusLine().getStatusCode()!=HttpStatus.SC_OK)
                                                 throw new PIException("Failed to retrieve list of packages from drools REST service",response.getStatusLine().getStatusCode());
                                       else {
                                                 String json = IOUtils.toString(response.getEntity().getContent(),"UTF-8");
                                                 JSONArray jsonArray = new JSONArray(json);
                                                 for (int i=0;i<jsonArray.length();i++) {
                                                           JSONObject jo = (JSONObject)jsonArray.get(i);
                                                           String title = jo.getString("title");
                                                           if (title.startsWith("x.")) {
                                                                     JbpmPackage wp = new JbpmPackage();
                                                                     wp.setId(title);
                                                                     wp.setDisplayTitle(title.substring(title.indexOf("x.")+8).replace("_", " "));
                                                                     wp.setDescription(jo.getString("description"));
                                                                     wp.setPublishedDate(new Date(jo.getLong("published")));
                                                                     JSONObject meta = jo.getJSONObject("metadata");
                                                                     wp.setVersion(meta.getInt("versionNumber"));
                                                                     wp.setArchived(meta.getBoolean("archived"));
                                                                     wp.setCreatedDate(new Timestamp(meta.getLong("created")));
                                                                     log.debug("WorkflowPackage: "+wp.toString());
                                                                     packages.add(wp);
                                                           }
                                                 }
                                       }
                             }
                             catch (Exception ue) {
                                       throw new APIException(ue);
                             }
                             this.packageCache = packages;
                   }
                   return this.packageCache;
         }
         @Override
         public KnowledgeBase getKnowledgeBase(String packageName, String version) throws PIException{
Â
                   if (packageName == null)
                             throw new PIException("Package Name must not be null.");
Â
                   if (version == null)
                             version = "LATEST";
Â
                   log.debug("Package requested: " + packageName + "/" + version);
Â
                   String key = packageName + "|" + version;
Â
                   if (!knowLedgeBases.containsKey(key)){
                             log.info("Package " + key + " not cached, attempting to get from repository");
                      UrlResource resource = (UrlResource) ResourceFactory.newUrlResource(url + "/org.drools.guvnor.Guvnor/package/" + packageName + "/" + version);
                      resource.setBasicAuthentication("enabled");
                      resource.setUsername("guest");
                      resource.setPassword("guest");
                      KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
                      kbuilder.add(resource, ResourceType.PKG);
                      knowLedgeBases.put(key, kbuilder.newKnowledgeBase());              Â
                   }
Â
                   log.debug(knowLedgeBases.toString());
Â
                   return knowLedgeBases.get(key);
         }
Â
         @Override
         public EntityManagerFactory getEntityManagerFactory(){
                   if (emf == null){
                             emf = Persistence.createEntityManagerFactory( "org.jbpm.persistence.jpa" );
                             Environment env = KnowledgeBaseFactory.newEnvironment();
                             env.set( EnvironmentName.ENTITY_MANAGER_FACTORY, emf );
                   }
                   return emf;
         }
Â
         @Override
         public synchronized Set<String> getLoadedKnowledgeBases(){
                   return knowLedgeBases.keySet();
         }
}
Cheers,
Gareth.
--------------------------------------------------------------
Reply to this message by going to Community
[https://community.jboss.org/message/806960#806960]
Start a new discussion in jBPM at Community
[https://community.jboss.org/choose-container!input.jspa?contentType=1&containerType=14&container=2034]