December 29, 2009

Overcoming limitations of ExecutorService.shutdownNow(), transparently with AspectJ5

The "Chapter 7. Cancellation and Shutdown" of Java Concurrency in Practice book(as well as javadoc of ExecutorService.shutdownNow()) says that there is limitation applied on what is being returned by shutdownNow().
The book says: 
"When an ExecutorService is shut down abruptly with shutdownNow, it attempts to cancel the tasks currently in progress and returns a list of tasks that were submitted but never started so that they can be logged or saved for later processing. However, there is no general way to find out which tasks started but did not complete. This means that there is no way of knowing the state of the tasks in progress at shutdown time unless the tasks themselves perform some sort of checkpointing. To know which tasks have not completed, you need to know not only which tasks didn't start, but also which tasks were in progress when the executor was shut down."
Also author provides us with work-around to this problem, by extending AbstractExecutorService, overriding submit(Runnable) and defining method that returns cancelled tasks. But all the same, in this case we are enforced to use shutdownNow() in a next fashion: 
...
//hint to running tasks to stop what they are doing!
exec.shutdownNow();
//wait until all running tasks stopped, and gathered in some "cancelled tasks collection" (but how long?!)
exec.awaitTermination(TIMEOUT, UNIT);
//here is our result!
exec.getCancelledTasks();
...
And yet, in proposed solution there is a requirement - tasks should be responsive to interruption. In this post I wrote why this req. is difficult to achieve(would say impossible).

Being in no way a critic of what suggested in Java Concurrency in Practice, let me share alternative solution.

Goal is put as few requirements as possible on the clients of ExecutorService.shutdownNow(): as a user of executor service I want to obtain it through Executors factory, then submit tasks to it(tasks either sensitive to interruption or not!), when time comes - call ExecutorService.shutdownNow() and in a guaranteed fashion get all cancelled tasks(queued but didn't started and started ones but didn't completed). Also, don't want to extend abstract executor service and deal with new type and its new method.

It's "doable" - with AspectJ5.

The solution primarily relies on: instant interruption handling, atomic variables, synchronized collection and AOP magic.
Before looking into the code, in "two words" let me describe how stuff does work:

- when you submit callable/runnable to executor service it's picked out from work-queue by available thread and then its exec. flow reaches to call()/run() - at this point I assert that task is at the first step of execution - this execution point is caught with AOP which tracks this fact by atomically inrementing number of running tasks.

- once task(either callable/runnable) started executing - at this point its behaviour is handled in a next way: if task throws checked/unchecked exception from its call()/run() method - it's ok, task being considered as completed and number of running tasks atomically decremented; but if during task execution it appears that thread was requested for interruption - instant interruption handling comes to play - runtime exception is being thrown, task added to collection of cancelled tasks and number of running tasks atomically decremented.

- as you might guess, ExecutorService.shutdownNow()(dressed with AOP) after requesting interruption of all running tasks - merely loops until number of running tasks becomes 0, and thanks to instant interruption handling - loop is finite!


Meet the ExecutorServiceShutdownNowAspect:
public aspect ExecutorServiceShutdownNowAspect {
 /////////////////////////////////////////////
 // ATTRIBUTES

 /**
  * Collection of interrupted workers;
  */
 private List<Runnable> interruptedWorkers = Collections
   .synchronizedList(new ArrayList<Runnable>());

 /**
  * Number of tasks currently being executed;
  */
 private volatile AtomicInteger activeTasksNum = new AtomicInteger();

 /////////////////////////////////////////////
 // POINCUTS

 /**
  * Matches {@link Callable#call()}-s;
  */
 pointcut executionOfCallable_() : execution(* Callable.call());

 /**
  * Matches {@link Callable#call()}-s;
  */
 pointcut executionOfCallable(Callable task) : executionOfCallable_() && this(task);

 /**
  * Compilation hint;
  */
 pointcut notwithin() :
  !within(eg.ExecutorServiceShutdownNowLimitationTest) && !within(ExecutorServiceShutdownNowAspect);

 /////////////////////////////////////////////
 // ADVICES

 /**
  * Check for interruption before every call of a method that doesn't throw
  * {@link InterruptedException};
  */
 before(Callable task) :
  cflow(executionOfCallable(task)) && call(* *(..) throws !InterruptedException) && notwithin() {
  if (interrupted()) {
   handleInterruption(task, new InterruptedException());
  }
 }

 /**
  * Handles interruption exception in the flow of execution of
  * {@link Callable#call()};
  */
 after(Callable task) throwing (InterruptedException e) :
  cflow(executionOfCallable(task)) && notwithin() {
  handleInterruption(task, e);
 }

 Object around() : executionOfCallable_() && notwithin() {
  // increment number of tasks currently being executed;
  activeTasksNum.incrementAndGet();
  try {
   // let execution of callable.call() proceed;
   return proceed();
  } finally {
   // decrement number of running tasks depending on interruption
   // status;
   if (!interrupted()) {
    activeTasksNum.decrementAndGet();
   }
  }
 }

 /**
  * Decorates task collection returned by
  * {@link ExecutorService#shutdownNow()} by adding to it interrupted tasks;
  */
 List<Runnable> around() : call(List<Runnable> ExecutorService.shutdownNow()) {
  interruptedWorkers.addAll(proceed());

  // wait until all active tasks have interrupted;
  while (activeTasksNum.get() != 0) {
  }

  // workaround caused by aspect's instantiation mode issue; 
  // in reality we should simply return interruptedWorkers;
  List<Runnable> ret = interruptedWorkers;
  interruptedWorkers = Collections
    .synchronizedList(new ArrayList<Runnable>());

  return ret;
 }

 /////////////////////////////////////////////
 // METHODS

 private void handleInterruption(Callable task, InterruptedException e) {
  Thread.currentThread().interrupt();
  interruptedWorkers.add(new FutureTask(task));
  activeTasksNum.decrementAndGet();
  throw new RuntimeException(e);
 }

 private boolean interrupted() {
  return Thread.currentThread().isInterrupted();
 }
}
More words on instant interruption - this is implemented by dynamically adding a check for current thread's interruption status by means of 'before' advice. Before every method call in the execution flow of callable.call() the code if (interrupted()) is being executed. Actually, not before every_ method call, but only before methods that don't throw InterruptedException. As for this exception, it has interesting treatment, would say, very serious treatment: once throw of InterruptedException detected - aspect doesn't play with it - set interruption status of current thread to 'true', saves current callable, atomically decrements number of running tasks and finally throws RuntimeException:
private void handleInterruption(Callable task, InterruptedException e) {
 Thread.currentThread().interrupt();
 interruptedWorkers.add(new FutureTask(task));
 activeTasksNum.decrementAndGet();
 throw new RuntimeException(e);
}
Instant interruption handling isn't single interesting thing. Look into the code, read comments and you will find more. 
What is great about aspectj - readability 8).

A few words about testing. 
Several cases/situations have been considered - all of them succeeded. Let me describe them briefly:

Wrong interruption handling:
void method() throws InterruptedException {
 ...
 synchronized (lock) {
   try {
      // let "emulate" blocking;
      lock.wait();
   } catch (InterruptedException ignore) {
      // case: ignorance of interruption;
   }
 }
 Thread.sleep(10000);
 ...
}
Re-throwing interrupted exception:
void method() throws InterruptedException {
 ...
 // case: interrupted exception throwing;
 Thread.sleep(10000);
 ...
}
Throwing checked exception:
void method() throws Exception {
 ...
 // case: application exception;
 throw new CheckedException();
}
Regular code insensible to standard interruption mechanism:
void method() {
 // lets "emulate" task insensible to standard interruption mechanism;
 for (;;) {
  anotherRegularMethod();
 }
}

At current, that's all cases/situations I devised. Will update post once invent new one.

I used TestNG framework to write test code, since it gives overhelming benefit over JUnit4 when you deal with single test body and several cases/situations for which this test aimed. Here is test code itself:
@Test
public void abruptPoolTermination(AbstractCallableFactory f,
                                  int expectingTaskNum)
  throws InterruptedException {
 ExecutorService exec = Executors.newFixedThreadPool(THREAD_NUM);
 exec.submit(f.callable());
 exec.submit(f.callable());
 exec.submit(f.callable());
 Thread.sleep(200);
 Assert.assertEquals(exec.shutdownNow().size(), expectingTaskNum);
}


In summary.
Should say it's very enjoyable to overcome existing java limits with aspectj, since it's not hard, it doesn't require from you an ability to mentally multiply hex digits; and even more - once you know java's concurreny rules - same rules could be legally applied in aspectj!

Thanks for your time!

No comments:

Post a Comment