Sunday, August 23, 2015

ExecutorService Sample


For Thread Management one can use simple Executor Service. We had lots of small job but time consuming jobs which
could execute later. Below is the sample code.

If you want the jobs to be executed in the order you give , use sequential.
Example : You want to fire a register event and then an email notification.
You want these to be executed in the order you submitted to queue.

If you are not interested in the order just use pooled.





protected static ThreadLocal<reentrantlock> requestLock = new ThreadLocal<reentrantlock>( ) ;  
   protected static ThreadLocal<condition> requestCondition = new ThreadLocal<condition>( ) ;  
   public static ReentrantLock getLock( boolean isInit )  
   {  
     ReentrantLock lock = requestLock.get( ) ;  
     if( lock == null &amp;&amp; isInit )  
     {  
       lock = new ReentrantLock( true ) ;  
       requestLock.set( lock ) ;  
     }  
     return lock ;  
   }  
   public static Condition getCondition( boolean isInit )  
   {  
     Condition con = requestCondition.get( ) ;  
     if( con == null &amp;&amp; isInit )  
     {  
       con = getLock( true ).newCondition( ) ;  
       requestCondition.set( con ) ;  
     }  
     return con ;  
   }  
 protected static ExecutorService _sequentialExecutor = null ;  
   protected static ExecutorService _pooledExecutor = null ;  
   static  
   {  
     init( true ) ;  
   }  
   public static void init( boolean isForce )  
   {  
     if( _sequentialExecutor != null &amp;&amp; _pooledExecutor != null &amp;&amp; !isForce )  
     {  
       return ;  
     }  
     _sequentialExecutor = Executors.newSingleThreadExecutor( ) ;  
     _pooledExecutor = Executors.newFixedThreadPool( 4 ) ;  
   }  
   public static <t> Future<t> sequentialExec( YTask<t> task, boolean isWaitForRequest )  
   {  
     if( isWaitForRequest )  
     {  
       ReentrantLock lock = getLock( true ) ;  
       // initialize the condition as well  
       getCondition( true ) ;  
       task.setLock( lock ) ;  
       if( !lock.isHeldByCurrentThread( ) )  
       {  
         lock.lock( ) ;  
       }  
     }  
     return _sequentialExecutor.submit( ( Callable )task ) ;  
   }  
   public static <t> Future<t> sequentialExec( YScheduledTask task )  
   {  
     return _sequentialExecutor.submit( ( Callable )task ) ;  
   }  
   public static <t> Future<t> sequentialExec( Callable<t> callable )  
   {  
     return _sequentialExecutor.submit( callable ) ;  
   }  
   public static <t> Future<t> pooledExec( YTask<t> task, boolean isWaitForRequest )  
   {  
     if( isWaitForRequest )  
     {  
       ReentrantLock lock = getLock( true ) ;  
       // initialize the condition as well  
       getCondition( true ) ;  
       task.setLock( lock ) ;  
       if( !lock.isHeldByCurrentThread( ) )  
       {  
         lock.lock( ) ;  
       }  
     }  
     return _pooledExecutor.submit( ( Callable )task ) ;  
   }  




YTask is a utility class for callable services via this class.





import java.util.concurrent.Callable ;  
 import java.util.concurrent.TimeUnit ;  
 import java.util.concurrent.locks.Condition ;  
 import java.util.concurrent.locks.ReentrantLock ;  
 public class YTask<t> implements Callable<t>, Runnable  
 {  
   protected Callable<t> callable ;  
   protected long delayMs = 0 ;  
   protected ReentrantLock lock = null ;  
   protected Condition condition = null ;  
   public YTask( Callable<t> callable )  
   {  
     this.callable = callable ;  
   }  
   public Callable<t> getCallable( )  
   {  
     return this.callable ;  
   }  
   public long getDelayMs( )  
   {  
     return this.delayMs ;  
   }  
   public ReentrantLock getLock( )  
   {  
     return this.lock ;  
   }  
   public void setLock( ReentrantLock lock )  
   {  
     this.lock = lock ;  
   }  
   public Condition getCondition( )  
   {  
     return this.condition ;  
   }  
   public void setCondition( Condition condition )  
   {  
     this.condition = condition ;  
   }  
   @Override  
   public void run( )  
   {  
     try  
     {  
       call( ) ;  
     }  
     catch( Exception e )  
     {  
       e.printStackTrace( ) ;  
       throw new RuntimeException( "Error during task...", e ) ;  
     }  
   }  
   @Override  
   public T call( ) throws Exception  
   {  
     T res = null ;  
     if( this.delayMs > 0 )  
     {  
       try  
       {  
         Thread.sleep( this.delayMs ) ;  
       }  
       catch( Exception e )  
       {  
         e.printStackTrace( ) ;  
       }  
     }  
     if( this.lock != null )  
     {  
       if( !this.lock.isHeldByCurrentThread( ) )  
       {  
         this.lock.lock( ) ;  
       }  
       System.err.println( "LOCK Info ytask : isHeldByCurrentThread() : " + this.lock.isHeldByCurrentThread( ) + ", getHoldCount() : " + this.lock.getHoldCount( ) + " , " + this.lock.getQueueLength( ) ) ;  
     }  
     try  
     {  
       if( this.callable != null )  
       {  
         if( this.condition != null )  
         {  
           if( !this.condition.await( 180, TimeUnit.SECONDS ) )  
           {  
             sendErrorMessage( null, "Error during waiting for lock in YTask", "Error during waiting for lock in YTask" ) ;  
           }  
         }  
         res = this.callable.call( ) ;  
       }  
       return res ;  
     }  
     finally  
     {  
       if( this.lock != null )  
       {  
         this.lock.unlock( ) ;  
       }  
     }  
   }  
 }  



No comments:

Post a Comment