December 15, 2009

Proof-of-concept: true interruption handling, transparently with AspectJ5

Perhaps, you know what InterruptedException does mean in java. In either case, let me describe it a little. Javadoc will not allow me to deceive you.
So, a method is interruptable if it: (a) throws InterrupedException; (b) has a logic that actually throws this kind of exception; It's simple - each thread, i.e. Thread object, has flag-field that holds interruption status. So, for a method to be cancellable(this word is better) - it must poll for interruption status with Thread.isInterrupted() and if it is eq. "true" throw InterruptedException, clearing the status at first. 
There are a lot of methods in java that throws InterrupedException: Object.wait(), Future.get(), Thread.sleep(), CountDownLatch.await() and etc. etc. and many others. Applying to them what I have already told we can assert that these methods checks interruption status of thread in which they are being executed, and throw an exception if it's case. In other words, regardless of when thread was being interrupted, i.e. before_ calling a method that throws InterruptedException or during_ execution of that method, method in either case will catch interruption status and will throw exception.
Pretty simple so far: just poll for interruption status and throw the exception, and let code at higher level treat that damn exception as he wants. So, where is issue with all of this?

As for me there is a big issue: it's impossible to make all the code, under execution thread, being explicitly interruptible, because you can't make all_ your code sensible to interruption. Got it? 
Let put this into code(try to imagine that code is meaningfull):


public abstract class Service {

 public Object doSomeDb(long id) {
  return findItemInDb(id);
 }

 abstract Object findItemInDb(long id);

 public void doSomeOperation(InputStream in, OutputStream out) throws IOException {
  if (doSomeDb(100L) != null) {
   executeIOStuff(in, out);
  }
 }

 void executeIOStuff(InputStream in, OutputStream out) throws IOException {
  byte[] buff = new byte[4096];
  while (in.read(buff) != -1) {
   out.write(buff);
  }
 }
}
If thread's execution flow which is currently being executed, at given moment of time is in the doSomeOperation() method, and our current thread isn't yet interrupted - it's ok, we proceed successfully with doSomeDb() 
and return from it. After that, it appears - when we were in doSomeDb(), during this time, someone who has access to our thread's reference, requested interruption on it; so, at this point, logically, we must instantly abort execution of our thread; unfortunately - not so, java will proceed with executeIOStuff() which in turn will do old IO operations, which aren't interruptable; But even if they were interruptable(like NIO operations) - it's wrong to execute something after a thread interruption.
So, now I hope you caught the core point - no_ operations after thread interruption! How to do this? While you think, let me introduce aspectj solution that effectively fix_ this.
At first, how_ we will fix?! I believe, dynamically checking thread's interruption status before and after every method call and throwing runtime exception if it's case, would be sufficient(especially in the context of "proff-of-concept solution").
Let me introduce aspect that effectively reflects exactly the same as I have said:


@Aspect
public aspect InterruptionAspect {
 @Pointcut("cflow(call(void majorFlow())&&target(b))&&call(* *(..))&&!within(InterruptionAspect)")
 public void everyMethodCallInTheControlFlowOfMajorFlowMethod(B b) {
 }

 @Before("everyMethodCallInTheControlFlowOfMajorFlowMethod(b)")
 public void adviceAfter(B b) {
  checkInterruption(b);
 }

 void checkInterruption(B b) {
  if (Thread.interrupted()) {
   b.interrupedBy = STATUS_MAJOR_FLOW_INTERRUPTED_ASAPLY;
   throw new CleverInterruptedException();
  }
 }
} 
Take a look on requestInstantInterruption(): if current thread has been interrupted - then stop any actions and abort execution(in java it's possible only through exception mechanism).


Unit test them all.

public class A {
 static final int STATUS_MAJOR_FLOW_INTERRUPTED_NORMALY = 1;
 static final int STATUS_MAJOR_FLOW_INTERRUPTED_ASAPLY = 2;

 /**
  * @see #STATUS_MAJOR_FLOW_INTERRUPTED_NORMALY
  * @see #STATUS_MAJOR_FLOW_INTERRUPTED_ASAPLY
  */
 volatile int interrupedBy;

 /**
  * Flag that indicates that computation in {@link #methodUnsensitiveToInterruption} has occured;
  */
 volatile boolean we_were_in_methodUnsensitiveToInterruption;

 /**
  * Entry point to class functionality;
  */
 public void majorFlow() {
  methodUnsensitiveToInterruption();
  try {
   blockingOperation();
  } catch (InterruptedException e) {
   // setting back the status of current thread to interrupted, so higher level code that
   // polls for thread's status could catch it.
   Thread.currentThread().interrupt();
   interrupedBy = STATUS_MAJOR_FLOW_INTERRUPTED_NORMALY;
  }
 }

 private void blockingOperation() throws InterruptedException {
  Thread.sleep(1000000L);
 }

 private void methodUnsensitiveToInterruption() {
  // just method...that is unsensitive to interruption, i.e. when we come here and current
  // thread's status is interrupted, we wouldn't know that and instead of stopping we would
  // proceed
  we_were_in_methodUnsensitiveToInterruption = true;
 }
}

public class B extends A {
}

public class CleverInterruptedException extends RuntimeException {
}
Class A: entry point is majorFlow() method. Under regular(without apllying aspect logic) interruption majorFlow() will abort its execution only in the blockingOperation(). I.e. if thread is being interrupted before methodUnsesitiveToInterruption(), under "normal java" this will be revealed only in the operation that throws InterruptedException, in our case - in blockingOperation(). So, under regular conditions the flag we_were_in_methodUnsensitiveToInterruption will be "true" and interruptedBy will be ...INTERRUPTED_NORMALLY.

Actually unit tests: 
public class TestAdvancedInterruption {
 private final ExecutorService executor = Executors.newSingleThreadExecutor();

 private static class MyTask implements Callable {
  final A target;

  public MyTask(A a) {
   this.target = a;
  }

  public A call() {
   // Logically, at start thread isn't interrupted
   // Lets "emulate" interruption;
   Thread.currentThread().interrupt();
   // watch out for CleverInterruptedException!
   try {
    target.majorFlow();
   } 
   catch (CleverInterruptedException ignore) {
    // It's ok to ignore it here, but don't forget to set the status;
    Thread.currentThread().interrupt();
   }
   return target;
  }
 };

 @Test
 public void handleInterruption_Normaly() throws Exception {
  A a_state = executor.submit(new MyTask(new A())).get();
  assertEquals(STATUS_MAJOR_FLOW_INTERRUPTED_NORMALY, a_state.interrupedBy);
  assertTrue(a_state.we_were_in_methodUnsensitiveToInterruption);
 }

 @Test
 public void handleInterruption_ASAP() throws Exception {
  B b_state = (B) executor.submit(new MyTask(new B())).get();
  assertEquals(STATUS_MAJOR_FLOW_INTERRUPTED_ASAPLY, b_state.interrupedBy);
  assertFalse(b_state.we_were_in_methodUnsensitiveToInterruption);
 }
}
After applying aspect picture becomes totally different - handleInterruption_ASAP() tests aspect "presence": I assert that interruptedBy status now eq. to ...INTERRUPTED_ASAPLY and it's false that we were in methodUnsensitiveToInterruption! Stop here. You see - we fixed the issue - thread was being interrupted before call to methodUnsensitiveToInterruption() and_ code, thanks to aspectj, handled that immediately!


Thanks for reading and for your time!

No comments:

Post a Comment