December 29, 2009

Overcoming limitations of ExecutorService.shutdownNow(), transparently with AspectJ5

The "Chapter 7. Cancellation and Shutdown" of Java Concurrency in Practice book(as well as javadoc of ExecutorService.shutdownNow()) says that there is limitation applied on what is being returned by shutdownNow().
The book says: 
"When an ExecutorService is shut down abruptly with shutdownNow, it attempts to cancel the tasks currently in progress and returns a list of tasks that were submitted but never started so that they can be logged or saved for later processing. However, there is no general way to find out which tasks started but did not complete. This means that there is no way of knowing the state of the tasks in progress at shutdown time unless the tasks themselves perform some sort of checkpointing. To know which tasks have not completed, you need to know not only which tasks didn't start, but also which tasks were in progress when the executor was shut down."
Also author provides us with work-around to this problem, by extending AbstractExecutorService, overriding submit(Runnable) and defining method that returns cancelled tasks. But all the same, in this case we are enforced to use shutdownNow() in a next fashion: 
...
//hint to running tasks to stop what they are doing!
exec.shutdownNow();
//wait until all running tasks stopped, and gathered in some "cancelled tasks collection" (but how long?!)
exec.awaitTermination(TIMEOUT, UNIT);
//here is our result!
exec.getCancelledTasks();
...
And yet, in proposed solution there is a requirement - tasks should be responsive to interruption. In this post I wrote why this req. is difficult to achieve(would say impossible).

Being in no way a critic of what suggested in Java Concurrency in Practice, let me share alternative solution.

Goal is put as few requirements as possible on the clients of ExecutorService.shutdownNow(): as a user of executor service I want to obtain it through Executors factory, then submit tasks to it(tasks either sensitive to interruption or not!), when time comes - call ExecutorService.shutdownNow() and in a guaranteed fashion get all cancelled tasks(queued but didn't started and started ones but didn't completed). Also, don't want to extend abstract executor service and deal with new type and its new method.

It's "doable" - with AspectJ5.

The solution primarily relies on: instant interruption handling, atomic variables, synchronized collection and AOP magic.
Before looking into the code, in "two words" let me describe how stuff does work:

- when you submit callable/runnable to executor service it's picked out from work-queue by available thread and then its exec. flow reaches to call()/run() - at this point I assert that task is at the first step of execution - this execution point is caught with AOP which tracks this fact by atomically inrementing number of running tasks.

- once task(either callable/runnable) started executing - at this point its behaviour is handled in a next way: if task throws checked/unchecked exception from its call()/run() method - it's ok, task being considered as completed and number of running tasks atomically decremented; but if during task execution it appears that thread was requested for interruption - instant interruption handling comes to play - runtime exception is being thrown, task added to collection of cancelled tasks and number of running tasks atomically decremented.

- as you might guess, ExecutorService.shutdownNow()(dressed with AOP) after requesting interruption of all running tasks - merely loops until number of running tasks becomes 0, and thanks to instant interruption handling - loop is finite!


Meet the ExecutorServiceShutdownNowAspect:
public aspect ExecutorServiceShutdownNowAspect {
 /////////////////////////////////////////////
 // ATTRIBUTES

 /**
  * Collection of interrupted workers;
  */
 private List<Runnable> interruptedWorkers = Collections
   .synchronizedList(new ArrayList<Runnable>());

 /**
  * Number of tasks currently being executed;
  */
 private volatile AtomicInteger activeTasksNum = new AtomicInteger();

 /////////////////////////////////////////////
 // POINCUTS

 /**
  * Matches {@link Callable#call()}-s;
  */
 pointcut executionOfCallable_() : execution(* Callable.call());

 /**
  * Matches {@link Callable#call()}-s;
  */
 pointcut executionOfCallable(Callable task) : executionOfCallable_() && this(task);

 /**
  * Compilation hint;
  */
 pointcut notwithin() :
  !within(eg.ExecutorServiceShutdownNowLimitationTest) && !within(ExecutorServiceShutdownNowAspect);

 /////////////////////////////////////////////
 // ADVICES

 /**
  * Check for interruption before every call of a method that doesn't throw
  * {@link InterruptedException};
  */
 before(Callable task) :
  cflow(executionOfCallable(task)) && call(* *(..) throws !InterruptedException) && notwithin() {
  if (interrupted()) {
   handleInterruption(task, new InterruptedException());
  }
 }

 /**
  * Handles interruption exception in the flow of execution of
  * {@link Callable#call()};
  */
 after(Callable task) throwing (InterruptedException e) :
  cflow(executionOfCallable(task)) && notwithin() {
  handleInterruption(task, e);
 }

 Object around() : executionOfCallable_() && notwithin() {
  // increment number of tasks currently being executed;
  activeTasksNum.incrementAndGet();
  try {
   // let execution of callable.call() proceed;
   return proceed();
  } finally {
   // decrement number of running tasks depending on interruption
   // status;
   if (!interrupted()) {
    activeTasksNum.decrementAndGet();
   }
  }
 }

 /**
  * Decorates task collection returned by
  * {@link ExecutorService#shutdownNow()} by adding to it interrupted tasks;
  */
 List<Runnable> around() : call(List<Runnable> ExecutorService.shutdownNow()) {
  interruptedWorkers.addAll(proceed());

  // wait until all active tasks have interrupted;
  while (activeTasksNum.get() != 0) {
  }

  // workaround caused by aspect's instantiation mode issue; 
  // in reality we should simply return interruptedWorkers;
  List<Runnable> ret = interruptedWorkers;
  interruptedWorkers = Collections
    .synchronizedList(new ArrayList<Runnable>());

  return ret;
 }

 /////////////////////////////////////////////
 // METHODS

 private void handleInterruption(Callable task, InterruptedException e) {
  Thread.currentThread().interrupt();
  interruptedWorkers.add(new FutureTask(task));
  activeTasksNum.decrementAndGet();
  throw new RuntimeException(e);
 }

 private boolean interrupted() {
  return Thread.currentThread().isInterrupted();
 }
}
More words on instant interruption - this is implemented by dynamically adding a check for current thread's interruption status by means of 'before' advice. Before every method call in the execution flow of callable.call() the code if (interrupted()) is being executed. Actually, not before every_ method call, but only before methods that don't throw InterruptedException. As for this exception, it has interesting treatment, would say, very serious treatment: once throw of InterruptedException detected - aspect doesn't play with it - set interruption status of current thread to 'true', saves current callable, atomically decrements number of running tasks and finally throws RuntimeException:
private void handleInterruption(Callable task, InterruptedException e) {
 Thread.currentThread().interrupt();
 interruptedWorkers.add(new FutureTask(task));
 activeTasksNum.decrementAndGet();
 throw new RuntimeException(e);
}
Instant interruption handling isn't single interesting thing. Look into the code, read comments and you will find more. 
What is great about aspectj - readability 8).

A few words about testing. 
Several cases/situations have been considered - all of them succeeded. Let me describe them briefly:

Wrong interruption handling:
void method() throws InterruptedException {
 ...
 synchronized (lock) {
   try {
      // let "emulate" blocking;
      lock.wait();
   } catch (InterruptedException ignore) {
      // case: ignorance of interruption;
   }
 }
 Thread.sleep(10000);
 ...
}
Re-throwing interrupted exception:
void method() throws InterruptedException {
 ...
 // case: interrupted exception throwing;
 Thread.sleep(10000);
 ...
}
Throwing checked exception:
void method() throws Exception {
 ...
 // case: application exception;
 throw new CheckedException();
}
Regular code insensible to standard interruption mechanism:
void method() {
 // lets "emulate" task insensible to standard interruption mechanism;
 for (;;) {
  anotherRegularMethod();
 }
}

At current, that's all cases/situations I devised. Will update post once invent new one.

I used TestNG framework to write test code, since it gives overhelming benefit over JUnit4 when you deal with single test body and several cases/situations for which this test aimed. Here is test code itself:
@Test
public void abruptPoolTermination(AbstractCallableFactory f,
                                  int expectingTaskNum)
  throws InterruptedException {
 ExecutorService exec = Executors.newFixedThreadPool(THREAD_NUM);
 exec.submit(f.callable());
 exec.submit(f.callable());
 exec.submit(f.callable());
 Thread.sleep(200);
 Assert.assertEquals(exec.shutdownNow().size(), expectingTaskNum);
}


In summary.
Should say it's very enjoyable to overcome existing java limits with aspectj, since it's not hard, it doesn't require from you an ability to mentally multiply hex digits; and even more - once you know java's concurreny rules - same rules could be legally applied in aspectj!

Thanks for your time!

December 15, 2009

Proof-of-concept: true interruption handling, transparently with AspectJ5

Perhaps, you know what InterruptedException does mean in java. In either case, let me describe it a little. Javadoc will not allow me to deceive you.
So, a method is interruptable if it: (a) throws InterrupedException; (b) has a logic that actually throws this kind of exception; It's simple - each thread, i.e. Thread object, has flag-field that holds interruption status. So, for a method to be cancellable(this word is better) - it must poll for interruption status with Thread.isInterrupted() and if it is eq. "true" throw InterruptedException, clearing the status at first. 
There are a lot of methods in java that throws InterrupedException: Object.wait(), Future.get(), Thread.sleep(), CountDownLatch.await() and etc. etc. and many others. Applying to them what I have already told we can assert that these methods checks interruption status of thread in which they are being executed, and throw an exception if it's case. In other words, regardless of when thread was being interrupted, i.e. before_ calling a method that throws InterruptedException or during_ execution of that method, method in either case will catch interruption status and will throw exception.
Pretty simple so far: just poll for interruption status and throw the exception, and let code at higher level treat that damn exception as he wants. So, where is issue with all of this?

As for me there is a big issue: it's impossible to make all the code, under execution thread, being explicitly interruptible, because you can't make all_ your code sensible to interruption. Got it? 
Let put this into code(try to imagine that code is meaningfull):


public abstract class Service {

 public Object doSomeDb(long id) {
  return findItemInDb(id);
 }

 abstract Object findItemInDb(long id);

 public void doSomeOperation(InputStream in, OutputStream out) throws IOException {
  if (doSomeDb(100L) != null) {
   executeIOStuff(in, out);
  }
 }

 void executeIOStuff(InputStream in, OutputStream out) throws IOException {
  byte[] buff = new byte[4096];
  while (in.read(buff) != -1) {
   out.write(buff);
  }
 }
}
If thread's execution flow which is currently being executed, at given moment of time is in the doSomeOperation() method, and our current thread isn't yet interrupted - it's ok, we proceed successfully with doSomeDb() 
and return from it. After that, it appears - when we were in doSomeDb(), during this time, someone who has access to our thread's reference, requested interruption on it; so, at this point, logically, we must instantly abort execution of our thread; unfortunately - not so, java will proceed with executeIOStuff() which in turn will do old IO operations, which aren't interruptable; But even if they were interruptable(like NIO operations) - it's wrong to execute something after a thread interruption.
So, now I hope you caught the core point - no_ operations after thread interruption! How to do this? While you think, let me introduce aspectj solution that effectively fix_ this.
At first, how_ we will fix?! I believe, dynamically checking thread's interruption status before and after every method call and throwing runtime exception if it's case, would be sufficient(especially in the context of "proff-of-concept solution").
Let me introduce aspect that effectively reflects exactly the same as I have said:


@Aspect
public aspect InterruptionAspect {
 @Pointcut("cflow(call(void majorFlow())&&target(b))&&call(* *(..))&&!within(InterruptionAspect)")
 public void everyMethodCallInTheControlFlowOfMajorFlowMethod(B b) {
 }

 @Before("everyMethodCallInTheControlFlowOfMajorFlowMethod(b)")
 public void adviceAfter(B b) {
  checkInterruption(b);
 }

 void checkInterruption(B b) {
  if (Thread.interrupted()) {
   b.interrupedBy = STATUS_MAJOR_FLOW_INTERRUPTED_ASAPLY;
   throw new CleverInterruptedException();
  }
 }
} 
Take a look on requestInstantInterruption(): if current thread has been interrupted - then stop any actions and abort execution(in java it's possible only through exception mechanism).


Unit test them all.

public class A {
 static final int STATUS_MAJOR_FLOW_INTERRUPTED_NORMALY = 1;
 static final int STATUS_MAJOR_FLOW_INTERRUPTED_ASAPLY = 2;

 /**
  * @see #STATUS_MAJOR_FLOW_INTERRUPTED_NORMALY
  * @see #STATUS_MAJOR_FLOW_INTERRUPTED_ASAPLY
  */
 volatile int interrupedBy;

 /**
  * Flag that indicates that computation in {@link #methodUnsensitiveToInterruption} has occured;
  */
 volatile boolean we_were_in_methodUnsensitiveToInterruption;

 /**
  * Entry point to class functionality;
  */
 public void majorFlow() {
  methodUnsensitiveToInterruption();
  try {
   blockingOperation();
  } catch (InterruptedException e) {
   // setting back the status of current thread to interrupted, so higher level code that
   // polls for thread's status could catch it.
   Thread.currentThread().interrupt();
   interrupedBy = STATUS_MAJOR_FLOW_INTERRUPTED_NORMALY;
  }
 }

 private void blockingOperation() throws InterruptedException {
  Thread.sleep(1000000L);
 }

 private void methodUnsensitiveToInterruption() {
  // just method...that is unsensitive to interruption, i.e. when we come here and current
  // thread's status is interrupted, we wouldn't know that and instead of stopping we would
  // proceed
  we_were_in_methodUnsensitiveToInterruption = true;
 }
}

public class B extends A {
}

public class CleverInterruptedException extends RuntimeException {
}
Class A: entry point is majorFlow() method. Under regular(without apllying aspect logic) interruption majorFlow() will abort its execution only in the blockingOperation(). I.e. if thread is being interrupted before methodUnsesitiveToInterruption(), under "normal java" this will be revealed only in the operation that throws InterruptedException, in our case - in blockingOperation(). So, under regular conditions the flag we_were_in_methodUnsensitiveToInterruption will be "true" and interruptedBy will be ...INTERRUPTED_NORMALLY.

Actually unit tests: 
public class TestAdvancedInterruption {
 private final ExecutorService executor = Executors.newSingleThreadExecutor();

 private static class MyTask implements Callable {
  final A target;

  public MyTask(A a) {
   this.target = a;
  }

  public A call() {
   // Logically, at start thread isn't interrupted
   // Lets "emulate" interruption;
   Thread.currentThread().interrupt();
   // watch out for CleverInterruptedException!
   try {
    target.majorFlow();
   } 
   catch (CleverInterruptedException ignore) {
    // It's ok to ignore it here, but don't forget to set the status;
    Thread.currentThread().interrupt();
   }
   return target;
  }
 };

 @Test
 public void handleInterruption_Normaly() throws Exception {
  A a_state = executor.submit(new MyTask(new A())).get();
  assertEquals(STATUS_MAJOR_FLOW_INTERRUPTED_NORMALY, a_state.interrupedBy);
  assertTrue(a_state.we_were_in_methodUnsensitiveToInterruption);
 }

 @Test
 public void handleInterruption_ASAP() throws Exception {
  B b_state = (B) executor.submit(new MyTask(new B())).get();
  assertEquals(STATUS_MAJOR_FLOW_INTERRUPTED_ASAPLY, b_state.interrupedBy);
  assertFalse(b_state.we_were_in_methodUnsensitiveToInterruption);
 }
}
After applying aspect picture becomes totally different - handleInterruption_ASAP() tests aspect "presence": I assert that interruptedBy status now eq. to ...INTERRUPTED_ASAPLY and it's false that we were in methodUnsensitiveToInterruption! Stop here. You see - we fixed the issue - thread was being interrupted before call to methodUnsensitiveToInterruption() and_ code, thanks to aspectj, handled that immediately!


Thanks for reading and for your time!

December 11, 2009

Proof-of-concept: log in separate thread, transparently with AspectJ5

Suppose there is a case, when logging tasks grab a lot of processing time/memory or probably another "a lot of", e.g. tens of threads at the same time try to acquire a write-lock on resource(e.g. file) or it might appeared that logging routine is costly because it touches network IO.
So, in either case, we decided to log asynchronously. What we have: thousands of Logger.log() calls spread across the code. One obvious way - digg into 3rd party logger src and change log() behaviour so that each call on it will be wrapped in the "log-task" and forwarded underneath to the thread pool. From first sight it's pretty simple, but from the other one - there are drawbacks:
- changing 3rd party isn't good idea;
- weird impl., because now I must support logging_asynchronuosly_version1_my_impl_of_3rdparty.jar;
- hard to plugg-in/plugg-out;
- biggest one: not a granular implementation, since it encompasses all_ calls to Logger.log(), while one might want apply target behaviour only to a manageable subset of code, e.g. to integration test code.
Anyway, whether someone would argue about points above or no, there is an alternative that hasn't mentioned drawbacks (but certainly has another ones, like any solution).
Lets summarize requirements : impl. should be flexible, pluggable, granular(means no "all-or-nothing" approaches), transparent(no code changes).


The loggers are as following:
public class AnotherLogger extends Logger {
    @Override
    void log() {
        System.out.println("\t\tlogging in " + this.getClass().getCanonicalName());
    }
}

public class Logger {
    void log() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException ignore_here) {}
        System.out.println("\t\tlogging in " + this.getClass().getCanonicalName());
    }
}

Their "runtime":

public class LoggerTest {
    @Test
    public void test_LoggerLog() throws Exception {
        test(new Logger());
    }

    @Test
    public void test_LoggerOverridenLog() throws Exception {
        test(new AnotherLogger());
    }

    private void test(Logger logger) throws Exception {
        System.out.println("[" + Thread.currentThread().getName() + "]: about to log()");
        logger.log();
        System.out.println("[" + Thread.currentThread().getName() + "]: after log()");
        Thread.sleep(2000L);
        System.out.println("\ntest done: good bye synchronous logging!");
    }
}
With all that we have next output:



You can see logging occurs in main thread, stuff proceeds in synchronous way.
But very soon this will be refactored with AspecJ5 and its AOP "magic".




Meet the "aspect":
@Aspect
public class WrapLoggingInSeparateThreadTransparently {
    private final ExecutorService executor = Executors.newFixedThreadPool(1);

    /**
     * Catch execution of Logger.log in the context of execution flow of any LoggerTest method;
     * 
     * @param pjp is "reserved stuff" here, aspectj knows about use of "around advice" and
     * will provide method with {@link ProceedingJoinPoint} object on runtime.
    */
    @Around(value = "cflow(within(LoggerTest)) && execution(void Logger.log(..))")
    public void adviceMethod(final ProceedingJoinPoint pjp) throws Exception {
        executor.execute(new Runnable() {
        public void run() {
            try {
                System.out.println("[" + Thread.currentThread().getName() + "]: doing log()");
                // calling "trapped" execution join point;
                pjp.proceed();
               System.out.println("[" + Thread.currentThread().getName() + "]: done log()");
            } catch (Throwable ignore) {}
        }
        });
    }
}

After weiving above logic into the code(at compilation time), we have following output:


You see what have already happened?! Our sequential logging transparently became asynchronous! Logging have done in pool-1-thread-1, while our major flow proceeded in main without waiting for log operation. This is legal: log() is void and hasn't access to any shared state, also actual logging happens in the same order as it would in "normal" case(actually, this is true while the size of thread pool = 1 ;)).

Lets examine "aspect" and respective AOP(aspect oriented programming) therms shortly.
First of all, aspect roughly speaking could be mapped logically to a class, with annotation @Aspect(thanks to 5-th version of AspectJ), by default there is single aspect instance within class loader. Magic string "cflow(execution..." is a "pointcut", pointcut for java is like sql for db. Pointcut in AOP selects/matches/catches/fetches program's "join points"(while sql - rows in db). There are several types of join points: initialization of instance, static initialization of class(along with class members and static blocks), execution/call of a method/constructor and so on. So, join points are recognizable "points" in program! Recognizable by ajc.exe, aspectj compiler, which understands both java and aspectj code, because itself ajc is extension of javac. Last thing - adviceMethod()@Around - this is a routine wich will be called on aspect each time joint point is matched. Such routine is called "advice"(there are also another types of advice: @Before and @After). The reason why I chose @Around - because in this case aspectj provides me with ProceedingJoinPoint object(think of it as a trapped execution flow), and I have a choise: whether to proceed with catched join point calling ProceedingJoinPoin.proceed() or just skip it.
 
There are a lot of what we should know about AOP and its java's implementation - AspecJ5.
In case you want to start criticize such logging approach, I want warn you against doing that, because core point of the post not in logging approach being used but in AOP possibilities.
Thanks for your time!