[Carbon-commits] [Carbon] svn commit r67528 - branches/carbon/3.0.0/dependencies/ode/engine/src/main/java/org/apache/ode/bpel/engine

waruna at wso2.com waruna at wso2.com
Thu Jun 3 04:34:03 PDT 2010


Author: waruna
Date: Thu Jun  3 04:34:03 2010
New Revision: 67528
URL: http://wso2.org/svn/browse/wso2?view=rev&revision=67528

Log:
fix for https://wso2.org/jira/browse/CARBON-7496

Modified:
   branches/carbon/3.0.0/dependencies/ode/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java

Modified: branches/carbon/3.0.0/dependencies/ode/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://wso2.org/svn/browse/wso2/branches/carbon/3.0.0/dependencies/ode/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?rev=67528&r1=67527&r2=67528&view=diff
==============================================================================
--- branches/carbon/3.0.0/dependencies/ode/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java	(original)
+++ branches/carbon/3.0.0/dependencies/ode/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java	Thu Jun  3 04:34:03 2010
@@ -25,14 +25,7 @@
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -107,7 +100,7 @@
      * Set of processes that are registered with the server. Includes hydrated and dehydrated processes. Guarded by
      * _mngmtLock.writeLock().
      */
-    private final HashMap<QName, ODEProcess> _registeredProcesses = new HashMap<QName, ODEProcess>();
+    private final ConcurrentHashMap<QName, ODEProcess> _registeredProcesses = new ConcurrentHashMap<QName, ODEProcess>();
 
     /** Mapping from myrole service name to active process. */
     private final HashMap<QName, List<ODEProcess>> _serviceMap = new HashMap<QName, List<ODEProcess>>();
@@ -128,12 +121,6 @@
     BpelDatabase _db;
 
     /**
-     * Management lock for synchronizing management operations and preventing processing (transactions) from occuring while
-     * management operations are in progress.
-     */
-    private ReadWriteLock _mngmtLock = new ReentrantReadWriteLock();
-
-    /**
      * The last time we started a {@link ServerCallable}. Useful for keeping track of idleness.
      */
     private final AtomicLong _lastTimeOfServerCallable = new AtomicLong(System.currentTimeMillis());
@@ -167,16 +154,12 @@
 
     protected void waitForQuiessence() {
         do{
-        _mngmtLock.writeLock().lock();
-        _mngmtLock.writeLock().unlock();
         long ltime = _lastTimeOfServerCallable.get();
         try {
             Thread.sleep(150);
         } catch (InterruptedException e) {
             ;
         }
-        _mngmtLock.writeLock().lock();
-        _mngmtLock.writeLock().unlock();
         try {
             Thread.sleep(150);
         } catch (InterruptedException ie) {
@@ -190,8 +173,6 @@
     }
     
     public void start() {
-        _mngmtLock.writeLock().lock();
-        try {
             if (!checkState(State.INIT, State.RUNNING)) {
                 __log.debug("start() ignored -- already started");
                 return;
@@ -229,9 +210,6 @@
             __log.info(__msgs.msgServerStarted());
             if (_dehydrationPolicy != null)
                 new Thread(new ProcessDefReaper()).start();
-        } finally {
-            _mngmtLock.writeLock().unlock();
-        }
     }
 
     public BpelDatabase getBpelDb(){
@@ -288,8 +266,6 @@
     }
 
     public void stop() {
-        _mngmtLock.writeLock().lock();
-        try {
             if (!checkState(State.RUNNING, State.INIT)) {
                 __log.debug("stop() ignored -- already stopped");
                 return;
@@ -300,14 +276,9 @@
             _contexts.scheduler.stop();
             _state = State.INIT;
             __log.info(__msgs.msgServerStopped());
-        } finally {
-            _mngmtLock.writeLock().unlock();
-        }
     }
 
     public void init() throws BpelEngineException {
-        _mngmtLock.writeLock().lock();
-        try {
             if (!checkState(State.SHUTDOWN, State.INIT))
                 return;
 
@@ -317,25 +288,15 @@
             _state = State.INIT;
             _sharedEps = new SharedEndpoints();
             _sharedEps.init();
-
-        } finally {
-            _mngmtLock.writeLock().unlock();
-        }
     }
 
     public void shutdown() throws BpelEngineException {
-        _mngmtLock.writeLock().lock();
-        try {
             stop();
             unregisterBpelEventListeners();
 
             _sharedEps = null;
             _db = null;
             _state = State.SHUTDOWN;
-        } finally {
-            _mngmtLock.writeLock().unlock();
-        }
-
     }
 
     public void register(ProcessConf conf) {
@@ -346,14 +307,7 @@
 
         // Ok, IO out of the way, we will mod the server state, so need to get a
         // lock.
-        try {
-            _mngmtLock.writeLock().lockInterruptibly();
-        } catch (InterruptedException ie) {
-            __log.debug("register(...) interrupted.", ie);
-            throw new BpelEngineException(__msgs.msgOperationInterrupted());
-        }
 
-        try {
             // If the process is already active, do nothing.
             if (_registeredProcesses.containsKey(conf.getProcessId())) {
                 __log.debug("skipping doRegister" + conf.getProcessId() + ") -- process is already registered");
@@ -390,9 +344,6 @@
             if (_dehydrationPolicy == null) process.hydrate();
 
             __log.info(__msgs.msgProcessRegistered(conf.getProcessId()));
-        } finally {
-            _mngmtLock.writeLock().unlock();
-        }
     }
 
     public void unregister(QName pid) throws BpelEngineException {
@@ -400,13 +351,6 @@
             __log.trace("unregister: " + pid);
 
         try {
-            _mngmtLock.writeLock().lockInterruptibly();
-        } catch (InterruptedException ie) {
-            __log.debug("unregister() interrupted.", ie);
-            throw new BpelEngineException(__msgs.msgOperationInterrupted());
-        }
-
-        try {
             ODEProcess p = _registeredProcesses.remove(pid);
             if (p == null)
                 return;
@@ -427,8 +371,6 @@
         } catch (Exception ex) {
             __log.error(__msgs.msgProcessUnregisterFailed(pid), ex);
             throw new BpelEngineException(ex);
-        } finally {
-            _mngmtLock.writeLock().unlock();
         }
     }
 
@@ -468,12 +410,7 @@
     List<ODEProcess> route(QName service, Message request) {
         // TODO: use the message to route to the correct service if more than
         // one service is listening on the same endpoint.
-        _mngmtLock.readLock().lock();
-        try {
             return _serviceMap.get(service);
-        } finally {
-            _mngmtLock.readLock().unlock();
-        }
     }
     
     /**
@@ -509,7 +446,6 @@
     }
 
     public void onScheduledJob(final JobInfo jobInfo) throws JobProcessorException {
-        _mngmtLock.readLock().lock();
         try {
             final WorkEvent we = new WorkEvent(jobInfo.jobDetail);
             ODEProcess process = _registeredProcesses.get(we.getProcessId());
@@ -554,8 +490,6 @@
             process.handleWorkEvent(jobInfo);
         } catch (Exception ex) {
             throw new JobProcessorException(ex, jobInfo.jobDetail.get("inmem") == null);
-        } finally {
-            _mngmtLock.readLock().unlock();
         }
     }
 
@@ -612,8 +546,6 @@
     public MyRoleMessageExchange createMessageExchange(final InvocationStyle istyle, final QName targetService,
             final String operation, final String clientKey) throws BpelEngineException {
 
-        _mngmtLock.readLock().lock();
-        try {
             // Obtain the list of processes that this service is potentially targeted at
             final List<ODEProcess> targets = route(targetService, null);
 
@@ -633,9 +565,6 @@
                 }
                 return createNewMyRoleMex(targets.get(0), meps, istyle);	
             }
-        } finally {
-            _mngmtLock.readLock().unlock();
-        }
     }
     
     /**
@@ -686,9 +615,7 @@
     
     public MessageExchange getMessageExchange(final String mexId) throws BpelEngineException {
 
-        _mngmtLock.readLock().lock();
-        try {
-            final MessageExchangeDAO inmemdao = getInMemMexDAO(mexId);
+     final MessageExchangeDAO inmemdao = getInMemMexDAO(mexId);
 
             Callable<MessageExchange> loadMex = new Callable<MessageExchange>() {
 
@@ -736,10 +663,6 @@
                 throw new BpelEngineException(e);
             }
 
-        } finally {
-            _mngmtLock.readLock().unlock();
-        }
-
     }
 
     public MessageExchange getMessageExchangeByForeignKey(String foreignKey) throws BpelEngineException {
@@ -756,8 +679,6 @@
      */
     public Set<InvocationStyle> getSupportedInvocationStyle(QName serviceId) {
 
-        _mngmtLock.readLock().lock();
-        try {
             List<ODEProcess> processes = route(serviceId, null);
             if (processes == null || processes.size() == 0)
                 throw new BpelEngineException("No such service: " + serviceId);
@@ -777,38 +698,24 @@
                 }
             }
             return istyles;
-        } finally {
-            _mngmtLock.readLock().unlock();
-        }
     }
 
     MessageExchangeDAO getInMemMexDAO(String mexId) {
-        _mngmtLock.readLock().lock();
-        try {
           for (ODEProcess p : _registeredProcesses.values()) {
               MessageExchangeDAO mexDao = p.getInMemMexDAO(mexId);
               if (mexDao != null)
                   return mexDao;
           }
-        } finally {
-            _mngmtLock.readLock().unlock();
-        }
         
         return null;
     }
     
     public ProcessModel getProcessModel(QName processId) {
-        _mngmtLock.readLock().lock();
-        try {
             ODEProcess process = _registeredProcesses.get(processId);
 
             if (process == null) return null;
 
             return process.getProcessModel();
-
-        } finally {
-            _mngmtLock.readLock().unlock();
-        }
     }
 
 
@@ -886,8 +793,6 @@
             try {
                 while (true) {
                     Thread.sleep(pollingTime);
-                    _mngmtLock.writeLock().lockInterruptibly();
-                    try {
                         // Copying the runnning process list to avoid synchronizatMessageExchangeInterion
                         // problems and a potential mess if a policy modifies the list
                         List<ODEProcess> candidates = new ArrayList<ODEProcess>(_registeredProcesses.values());
@@ -904,9 +809,6 @@
                             __log.debug("Dehydrating process " + process.getPID());
                             process.dehydrate();
                         }
-                    } finally {
-                        _mngmtLock.writeLock().unlock();
-                    }
                 }
             } catch (InterruptedException e) {
                 __log.info(e);
@@ -915,12 +817,7 @@
     }
 
     public ODEProcess getBpelProcess(QName processId) {
-        _mngmtLock.readLock().lock();
-        try {
             return _registeredProcesses.get(processId);
-        } finally {
-            _mngmtLock.readLock().unlock();
-        }
     }
 
     private void ticktock() {
@@ -937,7 +834,6 @@
         
         public void run() {
             ticktock();
-            _mngmtLock.readLock().lock();
             try {
                 ticktock();
                 _work.run();
@@ -945,8 +841,6 @@
             } catch (Throwable ex) {
                 ticktock();
                 __log.fatal("Internal Error", ex);
-            } finally {
-                _mngmtLock.readLock().unlock();
             }
         }
         
@@ -962,7 +856,6 @@
         
         public T call () throws Exception {
             ticktock();
-            _mngmtLock.readLock().lock();
             try {
                 ticktock();
                 return _work.call();
@@ -971,7 +864,6 @@
                 __log.fatal("Internal Error", ex);
                 throw ex;
             } finally {
-                _mngmtLock.readLock().unlock();
                 ticktock();
             }
         }



More information about the Carbon-commits mailing list