February 17, 2011

Doing smart stuff with List and Iterator

Many times the objects in a list must be converted into some others objects, i.e. when
we need to get a different list with converted objects. It's trivial from first glance:
List toJmsMessageList(List originalMyMessageList) {
 List resultList = new ArrayList(originalMyMessageList.size());
 for(MyMessage msg : originalMyMessageList) {
  resultList.add(Converter.convert(msg));
 }
 return resultList;
}
Nothing wrong with the above code, except:
1. it's boring and verbose
2. it creates new list
3. it iterates over original list
4. problems might occur when original list is ORM proxy list obtained as a result
of sql query, so operations such as size() on it - would cause problems such as retrieval of whole result set' content into memory

So, here's solution. What I actually want - convert list' stuff into list of another stuff, I would like to have something like:
List resultList = ListFactory.createList(originalList, callback);
So whenever resultList iteration being called I want to have already converted stuff in it! So, here's what I came to:
 List createList(final Iterator originalIterator, final Callable c) {
 // iteration handler for custom list implementation
 InvocationHandler invocationHandler = new InvocationHandler() {
  @Override
  public Object invoke(Object proxy, Method method, Object[] args) {
   if (method.getName().equals("iterator")) {
    return new Iterator() {
     @Override
     public boolean hasNext() {
      return originalIterator.hasNext();
     }

     @Override
     public T next() {
      try {
       // call 'custom behavior'
       return c.call();
      } catch (Exception e) {
       log.error("Error during user defined operation on originalIterator", e);
       throw new RuntimeException(e);
      }
     }

     @Override
     public void remove() {
      // 'remove' isn't supported
      throw new UnsupportedOperationException();
     }
    };
   }
   // everthying other than iteration isn't supported
   throw new UnsupportedOperationException();
  }
 };
 return (List) Proxy.newProxyInstance(
     Thread.currentThread().getClass().getClassLoader(),
     new Class[] {List.class}, 
     invocationHandler);
}
Here's possible use cases:
return ListFactory.createList(originalListIterator, new Callable() {
 private int fetchedNum = 0; // the number of fetched objects
 @Override
 public MessageEntity call() {
  if (++fetchedNum % fetchSize == 0) {
   // remove fetched MessageEntity entities from iternal jpa's identity map
   // when number of fetched objects reaches 'fetchedNum'
   getEntityManager().clear();
  }
  // don't forget call iterator.next()
  return originalListIterator.next();
 }
});
or
ListFactory.createList(messagesIterator, new Callable() {
 @Override
 public Entry call() {
  try {
   return toEntry(messagesIterator.next());
  } catch (Exception e) {
   log.error("Error occured while converting list of jms messages", e);
   throw new RuntimeException(e);
  }
 }
});

Thanks for reading.

November 3, 2010

Language that matters

Preface
The purpose of this article is to help my friends to better understand, why they should spend their time learning Erlang/Scala, and even functional programming, instead of Java.
This's "time-friendly" explanation of the "actor-based" concurrency, it's not something that requires from you to have the iq >= 150.
Author found himself in a very bad shape when he had started getting understanding "actor-based" concurrency from wikipedia or from original article. So, if you're smart enough to understand everything from the original sources and think that this article is a naive piece of shit then getta fuck out from here.


Begin
Last year I was in Chernigov, visited the good friend of mine, and we had a kind of disput around concurrency.
That was a time when I finished learning utillities from java.util.concurrent, researching concurrent collections and the rest concurrent crap, and finally I became amazed of what Java offers! (still 're)
Then my friend said one thing, that backup me in reality, he said that there're tasks which couldn't be implemented concurrently, and you can't do any with this, it's simply impossible, say factorial: f(x) = x * f(x-1).
From the school you know that you should wait until next call will return result, and next call should wait ... etc. Even if you transform f(x) into more "parallel-friendly" form: x*(x-1)*f(x-2), you still wouldn't be able to calculate x*(x-1) in thread1 and f(x-2) in thread2. Seems like obvious thing - but it's undoable! Why? 'cause of Java' architecure doesn't allow this and C/C++ arch. doesn't allow this, and Python, Pascal doesn't allow this and whatever imperative language you take - it will fail with such a primitive task.


Last weekend I spent some time on Erlang presentation. Why Erlang? Because it's buzzword nowadays - "concurrent-oriented functional language", "next-mainstream language", "facebook and twitter" use it, and etc. etc. etc. So, like a regular code monkey which could be easily "picked up" by marketing noice, I decided to look into it.
But, you know, unfortunately I have an average IQ, so, I didn't get from the first try(and from the second, and from the third) several principal topics:
  • why it's functional language?!
  • what "message oriented language" certainly means, in details what's that?!
  • what're lightweight process, and how it's possible that Erlang maintains thousands of them?! (Ok, I know that cpu has cores and num_of_cores = num_of_threads(aka OS threads), I know what's OS processes mean, but WTF "leightweight process" does mean?!)
  • language' creators assert that erlang-program will run x- times faster on x- core machine! WTF?! How?! What they can do with factorial?! Does it matter 10 or 2 cores, for f(8) you still should wait for f(6), for f(6) you should wait for f(4) and etc.
After three hours I found the answer on last question, and then on the rest three. No math equations, no theorems - just logical thinking.


It's simple
Say, we want calculate f(5) concurrenctly, having 2 CPUs(cores), right now don't bother how. See the picture:












































You can see that "lightweight process" is just a function. But how they connect to each other(*), how they return results to each other in a way that makes program run very fast?! In Java, for example, functions "connect" to each other through stack of
execution and in C/C++ too, and in Python, and in Pascal..




Let me guide you through the last question(*). Write down f(5) in this way:


P1#cpu2 = P2#cpu1 * (P3#cpu1 = P4#cpu1 * P5#cpu2)


Pi#CPUj     - i-th leightweight process on CPUj
i           - in [1, many]
j           - num of cores


Ok, but still, how this excels imperative style ?! Why f(5) should run faster if add one more core?!


The answer: Pi executes its work and doesn't wait for completion of Pi+1, each Pi executes independently! It's done through non-blocking IO, non-blocking queues and message passing! Each process have a non-blocking queue from which it reads messages, it's same as: 
class process {
    ConcurrentLinkedQueue queue;
    void receive() {
        for(;;) {
            if (queue.get() != null) {
                // hey! what's up?!
            }
        }
    }
}
OS scheduler allocates thread and cpu-time for each Pi to execute, and it's ok if it can't do anything usefull! For example if P3#cpu1 gets time to execute(read 'being assigned to OS thread') it will find that there's nothing to do 'cause P4#cpu1 and P5#cpu2 didn't return results, so what?! - yield OS thread and let the other processes get a chance to work!

The most attracting part of all this - you get grid/cloud/network computing almost out of the box:
P1#(127.0.0.1) = P2#(243.88.33.11) * (P3#(10.10.0.6) = P4#(77.55.99.33) * P5#(14.133.15.16))


Hope this article will help you in understanding the "state-of-things" when you decide start learning Erlang or Scala and encourage you to get more about the topic. 


Thanks for your time!


References
http://en.wikipedia.org/wiki/Lambda_calculus
http://en.wikipedia.org/wiki/Actor_(computer_science)
http://en.wikipedia.org/wiki/Erlang_(programming_language)

March 22, 2010

Using thread-bound HttpServletRequest exposed by Spring to bring DI on view tier

Im saying about how to obtain request object in a web application at arbitrary point of your web tier code. Sometimes it can be very useful, especially when this object hidden in abstract layers.


Spring provides a couple of very useful classes for doing this. But at first let me explain the problem.
See, sometimes you can have legacy application that wasnt designed for dependency injection, testability, being POJO and etc. For example, it means that somewhere in view tier code, in struts' actions you have:
...
MyActionForm execute(Form inForm, ActionMapping mapping, Session s) {
    ...
    AccountManager accountManager
             = locator.getSessionEjb(AccountManagerHome.JNDI_NAME, s.getEjbProviderUrl());
    ...
}
...
In other words, you have a ubiquitous EJB lookup code which works using http session 'cause it holds some critical info, e.g. providerUrl like in example above. And this code makes your actions:
  • not unit-testable;
  • being hard to switch from one biz impl. to another, e.g. from EJB to POJO;
  • contains RemoteException handling code (even if you have defined RE handling somewhere in superclass you actually need to call this superclass' handling method)
So, here's solution' primary points:
Implementation of lookup bean:
public class AccountManagerFactoryBean implements FactoryBean {
    // METHODS        

    public Object getObject() throws Exception {
        return Proxy.newProxyInstance(getClass().getClassLoader(),
                new Class[]{AccountManager.class},
                new BaseInvocationHandler() {
                    protected Class getEJBHomeClass() {
                        return AccountManagerHome.class;
                    }
                });
    }

    public Class getObjectType() {
        return AccountManager.class;
    }

    public boolean isSingleton() {
        return true;
    }
}


Where invocation handler that does an actual job:

public abstract class BaseInvocationHandler implements InvocationHandler {
    // METHODS

    /**
     * Method interceptor which guarantees invocation on EJB object obtained
     * through {@link SessionInfo#ejbProviderUrl}
     *
     * @param proxy  proxy object; not used;
     * @param method method; invocation will be comitted on this method;
     * @throws Throwable either RemoteException or application logic exception;
     * here we dont care about it; application exceptions will be propagated
     * to calling code, RemoteException handling will be defined in spring
     * context xml;
     */
    public Object invoke(Object proxy, Method method, Object[] args)
 throws Throwable {
        String interfaceMethodName = method.getName();
        Class[] parameterTypes = method.getParameterTypes();
        Object ejb =
           services.useSessionEjb(
               getEJBHomeClass(), getSessionInfo().getEjbProviderUrl());
        Method ejbDeclaredMethod =
           ejb.getClass().getDeclaredMethod(interfaceMethodName, parameterTypes);
        if (ejbDeclaredMethod != null) {
            return ejbDeclaredMethod.invoke(ejb, args);
        }
        else {
            // MUST BE UNREACHEBLE;
            throw new AssertionError("Not all methods implemented from the EJB");
        }
    }

    /**
     * Obtains session info object;
     *
     * @return session info object;
     */
    protected final SessionInfo getSessionInfo() {
        // getting request;
        // since we use RequestContextFilter in web.xml and we're
        // in servlet container it's legal to call RequestContextHolder;
        RequestAttributes ra = RequestContextHolder.getRequestAttributes();
        HttpServletRequest request = ((ServletRequestAttributes) ra).getRequest();
        // getting http session;
        return SessionInfo.getInstance(request);
    }

    /**
     * @return EJB Home class;
     */
    protected abstract Class getEJBHomeClass();
}


No need to say about benefits of object which is defined as bean in spring context(Im talking about lookup code placed in spring config xml in a form of FactoryBean(s)). One significant - ability to attach remote exception handler(on an object being created by ProxyFactoryBean) and there by get rid of hundreds of MyBaseAction.handleRemoteException() calls in web tier code, making it unaware of actual business logic implementation(being it either EJB or POJO) which is great thing!


Thanks for your time!

January 24, 2010

Effects produced by different thread-safe coding styles

Was a bit surprised that different thread-safe coding approaches can impact on app. logic. Suppose we have trivial auth. service like this one:
public interface Service {
void configChanged() throws Exception;
void login(Session session) throws Exception;
}
It's possible impl.:
public class ThreadSafeServiceImpl implements Service {
 /**
  * Environmental properties;
  */
 private volatile Properties env;

 /**
  * Properties could have been changed;
  */
 public void configChanged() throws Exception {
  Properties env__ = new Properties();
  env__.put("login", property("app.user.login"));
  env__.put("passwrd", property("app.user.passwrd"));
  env__.put("prefs", property("app.user.prefs"));

  env = env__;
 }

 /**
  * Store current user credentials in session;
  */
 public void login(Session session) throws Exception {
  session.getAttribute("creds").apply(env);
 }
}
Main idea behind service - login user by storing his info in session. Subsequent business logic would have retrieved "creds" object, checked and either accepts "creds" or throws some kind of application exception there by enforcing currently logged-in user to re-login or preventing unauth. access.

Let's take a look on FastServiceImpl from a point of view of several threads: at first it's safe to access service methods for them, no inconsistency: login() sees very last_ version of env, configChanged() could safely update env and subsequent login() could have seen either new or old reference to env, but in any case he would haven't seen partly updated env object. Very ellegant and simple approach, by the way - the same idea is behind CopyOnWriteArrayList
So, what is not_ cool about it?! The answer depends on whether it's critical for your app. to support  real_ concurrnet access in FastServiceImpl: do you want to manage situation when thread-1 enters configChanged() and thread-2 at the same time enters login() (?). If you do_ then keep reading.

The main issue with service impl. - it doesn't handle concurrent access. Yes, it's thread-safe, but_ it doesn't handle concurrency: it's possible for thread-1 enter w/o a problem configChaned() and at the same time for thread-2 it's possible to enter, also w/o a problem, login() method. By introducing real concurrent access(not just thread-safe) - we would get a situation where thread-1 should wait if someone accessing login() at the same time; and vice versa - if thread-2 accessing login() finds that configChanged() busy - then thread-2 should wait as well!

To get this done, first that comes to mind - use RRWL:
public class ConcurrentServiceImpl extends ThreadSafeServiceImpl {
 private final ReadWriteLock rw_lock = new ReentrantReadWriteLock();

 @Override
 public void configChanged() throws Exception {
  rw_lock.writeLock().lock();
  try {
   super.configChanged();
  } finally {
   rw_lock.writeLock().unlock();
  }
 }

 @Override
 public void login(Session session) throws Exception {
  rw_lock.readLock().lock();
  try {
   super.login(session);
  } finally {
   rw_lock.readLock().unlock();
  }
 }
}
 Obviously this impl. is better than prev. one: because of support of real_ concurrent access.



Thanks for reading! 


P.S.
For those who currious of why ThreadSafeServiceImpl.env has been declared with volatile
read next: What does volatile do? and New guarantees for volatile.

January 10, 2010

Fixing "check-then-act" sequence with ReentrantReadWriteLock(RRWL)

Consider multithreaded component with two methods: first one executes highly concurrent operations, e.g. put chat messages to shared concurrent queue, second one plays role of an immediate stopper. See code:
public class CheckThenAct {
 /**
  * Unbound shared message queue;
  */
 private final ConcurrentLinkedQueue<String> q;

 /**
  * Flag indicating whether operation should proceed;
  */
 private final AtomicBoolean proceed = new AtomicBoolean(true);

 public CheckThenAct(ConcurrentLinkedQueue<String> q) {
  this.q = q;
 }

 public boolean putMessage(String msg) {
  return proceed.get() ? q.offer(msg) : false;
 }

 public boolean stop() {
  return proceed.compareAndSet(true, false);
 }
}
In the putMessage() method we have "check-than-act" concurrency problem. I.e. when in thread1, thread2, ..., thread20 we pass proceed.get() but just right after that in thread23 we call stop(), i.e. no more messages, seriously; but in fact twenty messages being added to the q. Since putMessage() must be highly concurrent I can't make it(and stop()) synchronized. Solution is - to use RRWL:
...

private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();

...

public boolean putMessage(String msg) {
 if (!rw.readLock().tryLock())
  return false;
 try {
  return proceed.get() ? q.offer(msg) : false;
 } finally {
  rw.readLock().unlock();
 }
}

public boolean stop() {
 rw.writeLock().lock();
 try {
  return proceed.compareAndSet(true, false);
 } finally {
  rw.writeLock().unlock();
 }
}
Let me explain why:
- Read lock in putMessage() doesn't bring serialized behaviour, method still is highly concurrent;
- Read lock being acquired only when there is no write lock;
- Write lock being acquired only when there are no read locks;
- Write lock is exclusive;

So, main demand satisfied - truly shared access in putMessage() and remedy the "check-then-act" behaviour!

Thanks for reading!

January 9, 2010

Alternative look at ReentrantReadWriteLock(RRWL)

I adore to hunt for non-standard/non-documented properties/usages/features of old-good(and standard) utilities. 
Lets take ReentrantReadWriteLock(RRWL) - it's more than five years old(in Java env. of course) concurrency utility aimed chiefly for efficient separation of shared read and exclusive write operations. Definitely look for more in javadoc. What is interesting about RRWL is that examples rendering its usage usually concentrate attention on the fact that readLock() guards 'read' operations and writeLock() guards 'write' operations. Let me show:
public class RRWLockRegularCase {
 private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
 private final ReadLock r = rw.readLock();
 private final WriteLock w = rw.writeLock();

 private final char[] arr = new char[] { 'c', 'o', 'o', 'l', '!' };

 public void iterate() {
  r.lock();
   for (char c : arr) {
   }
  r.unlock();
 }

 public void modify() {
  w.lock();
   arr[4] = '?';
  w.unlock();
 }
}
And this one from javadoc:
class RWDictionary {
    private final Map<String, Data>  m = new TreeMap<String, Data>();
    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private final Lock r = rwl.readLock();
    private final Lock w = rwl.writeLock();

    public Data get(String key) {
        r.lock(); try { return m.get(key); } finally { r.unlock(); }
    }
    public String[] allKeys() {
        r.lock(); try { return m.keySet().toArray(); } finally { r.unlock(); }
    }
    public Data put(String key, Data value) {
        w.lock(); try { return m.put(key, value); } finally { w.unlock(); }
    }
    public void clear() {
        w.lock(); try { m.clear(); } finally { w.unlock(); }
    }
}
From examples it's obviously that readLock() guards read(R) operations: get(String), allKeys(), iterate(); and writeLock() - clear(), put(String, Data), modify() - write(W) operations.

But do you know that RRWL could be used in an alternative way?!

Suppose we have appl. component that tracks user activity: when user came to site, what page he entered and so on. It would be Map with key customerId and value - list of log messages. To simplify concurr. map access there's a rule: customerId isn't shared among threads. Also it's known that component has two methods: saveOrUpdate() - for saving/updating activity log messages and clearThemAll() method for removing logs from memory. Like this:
public class RRWLockUnusualCase {
 /**
  * Pairs like [customerId, list of log messgs] e.g.: [100,
  * {"opened session at 10/10/2010 00:14:40",
  * "entered on main_page at 00:14:45", "entered help_page at 00:16:21"}];
  * customerId is strictly thread bound i.e. it's impossible seeing two
  * threads sharing same customerId(think of it like of business logic
  * restriction);
  */
 private final ConcurrentMap<Long, List<String>> customerActionLog;

 public RRWLockUnusualCase(ConcurrentMap<Long, List<String>> customerActionLog) {
  this.customerActionLog = customerActionLog;
 }

 public void saveOrUpdate(Long customerId, String messg) {
  if (customerActionLog.get(customerId) != null) {
   customerActionLog.get(customerId).add(messg);
  } else {
   customerActionLog.put(customerId, Arrays.asList(messg));
  }
 }
 
 public void clearThemAll() {
  for (Long k : customerActionLog.keySet()) {
   customerActionLog.remove(k);
  }
 }
}
The main drawback - not thread-safe(in spite of ConcurrentMap) 'cause suffers from "check-then-act" problem:
...
if (customerActionLog.get(customerId) != null) {
customerActionLog.get(customerId).add(messg);
...
On second line we run into trouble if after first one method clearThemAll()* removes target entry, *which could being executed concurrently. To fix, first that occurs to me - synchronization on object customerActionLog:
public void saveOrUpdate(Long customerId, String messg) {
 synchronized (customerActionLog) {
  ...
 }  
}

public void clearThemAll() {
 synchronized (customerActionLog) {
  ...
 }
}
Which is bad idea since serializing saveOrUpdate() is inadmissible. There is an alternative to synchronization. What we have: tons of threads executing saveOrUpdate(), write activity logs in a non-blocking manner; issue comes when we need to clearThemAll(). In latter method would be logical to wait until all saveOrUpdate() operations have finished and only_ after this proceed(exclusively) with clearing. This transforms to:
public class RRWLockUnusualCase {
 private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
 private final ReadLock r = rw.readLock();
 private final WriteLock w = rw.writeLock();

 // Same as above ...
 
 public void saveOrUpdate(Long customerId, String messg) {
  r.lock();
  try {
   // Same as above ...
  } finally {
   r.unlock();
  }
 }

 public void clearThemAll() {
  w.lock();
  try {
   // Same as above ...
  } finally {
   w.unlock();
  }
 }
}
So that was an 'alternative look' - readLock() doesn't protected 'read' operation - it protects shared operation.

And finally pattern, I believe:
public class RRWLockPattern {
 private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
 private final ReadLock r = rw.readLock();
 private final WriteLock w = rw.writeLock();

 // ...

 /**
  * Can be executed concurrently if there is no exclusive operation
  * currently being executed;
  */
 public void sharedOperation() {
  r.lock();
  try {
   // PUT SHARED OPERATION LOGIC HERE
  } finally {
   r.unlock();
  }
 }

 /**
  *  Can be executed only exclusively i.e. by single thread;
  *  In case of shared operations currently being executed - blocks
  *  until all_ shared operations have completed; 
  */
 public void exclusiveOperation() {
  w.lock();
  try {
   // PUT EXCLUSIVE OPERATION LOGIC HERE
  } finally {
   w.unlock();
  }
 }
}


Thanks for reading!

January 8, 2010

Revealing FutureTask's hidden property

As you may know java.util.concurrent.FutureTask is used to encapsulate deferred computation and the result of that compuatation.
We accustomed to use it jointly with Executor by submitting it to thread pool and retrieving result in a blocked fashion with future.get() method. Nothing new so far.

You know, as for me it's a lot of fun finding standard utility, like FutureTask, being used in an alternative way! What if you haven't an Executor but still wants to use FutureTask's core principle - one thread does a job, others - should wait. So you have just current thread and want your current stuff being done in current thread at the same time having other workers, who migth came at the same execution point, being waited for the computation result.

Easily:
public class NiceFutureFeature {
 private final AtomicReference<FutureTask> f = new AtomicReference<FutureTask>();

 public Object work(Callable c) throws InterruptedException, ExecutionException {
  if (f.compareAndSet(null, new FutureTask(c))) {
   // Only one thread will pass and will do the job
   f.get().run();
  }
  // Since we didn't pass we should wait for result
  return f.get().get();
 }
}
In this class atomic reference ensures that only one thread will proceed with f.run(), others will wait. Core point - atomically restrict access to task execution to only one worker.

I believe this approach would be useful when we meet execution of idempotent operations at the same time. E.g.:
idempotent business routine that includes sql query and is being executed by n threads at a given time, i.e. if you had stopped time you would have seen n threads going to hit db with same sql(I checked that on Toplink)! No problem if you execute methods sequentally, i.e. hit db - put result in cache - next time execute method and get result from cache, the issue appears when you perform operations concurrently. In this regard let me extend code above:
public class NiceFutureFeature {
 /**
  * Key of type List; Could be in the form:
  * ["select id from Emp e where e.id=:id and e.salary=:salary", 100, 2000]
  */
 private final ConcurrentMap<List, FutureTask> cm = new ConcurrentHashMap<List, FutureTask>();

 public Object work(List key, Callable c) throws InterruptedException, ExecutionException {
  FutureTask f = new FutureTask(c);
  if (cm.putIfAbsent(key, f) == null) {
   // Only one will pass and will do the job
   f.run();
  }
  // Since we didn't pass we should wait for result
  return cm.get(key).get();
 }
}
Same principle applied in this code to guard main idea by using atomical putIfAbsent() method.

Thanks for reading!

January 4, 2010

Limitations of percflow/percflowbelow instantiation modes in AspecJ5

Recently I enjoyed percflow/percflowbelow advanced instantiation modes in aspectj; but later found considerable limitation(from my point of view) - they are thread bound. In this mode Aspect's TTL is limited to flow-of-execution - ok, this is what they had been designed for, but what if that flow-of-exec. initiates another thread underneath and submit execution unit like callable/runnable to it - wouldn't be nice if that submitted unit could be weaved by aspect as well?
Suppose we have code like: 
public void testConcurrentLogic() {
 ExecutorService exec = ...
 Future f = exec.submit(runnable);
 assert...(f.get());
}
The issue is - need an aspect with lifecycle bound to testConcurrentLogic() and with ability to match joint points in runnable.run(). At current, first part is doable with percflow instantiation mode, second one - isn't.

December 29, 2009

Overcoming limitations of ExecutorService.shutdownNow(), transparently with AspectJ5

The "Chapter 7. Cancellation and Shutdown" of Java Concurrency in Practice book(as well as javadoc of ExecutorService.shutdownNow()) says that there is limitation applied on what is being returned by shutdownNow().
The book says: 
"When an ExecutorService is shut down abruptly with shutdownNow, it attempts to cancel the tasks currently in progress and returns a list of tasks that were submitted but never started so that they can be logged or saved for later processing. However, there is no general way to find out which tasks started but did not complete. This means that there is no way of knowing the state of the tasks in progress at shutdown time unless the tasks themselves perform some sort of checkpointing. To know which tasks have not completed, you need to know not only which tasks didn't start, but also which tasks were in progress when the executor was shut down."
Also author provides us with work-around to this problem, by extending AbstractExecutorService, overriding submit(Runnable) and defining method that returns cancelled tasks. But all the same, in this case we are enforced to use shutdownNow() in a next fashion: 
...
//hint to running tasks to stop what they are doing!
exec.shutdownNow();
//wait until all running tasks stopped, and gathered in some "cancelled tasks collection" (but how long?!)
exec.awaitTermination(TIMEOUT, UNIT);
//here is our result!
exec.getCancelledTasks();
...
And yet, in proposed solution there is a requirement - tasks should be responsive to interruption. In this post I wrote why this req. is difficult to achieve(would say impossible).

Being in no way a critic of what suggested in Java Concurrency in Practice, let me share alternative solution.

Goal is put as few requirements as possible on the clients of ExecutorService.shutdownNow(): as a user of executor service I want to obtain it through Executors factory, then submit tasks to it(tasks either sensitive to interruption or not!), when time comes - call ExecutorService.shutdownNow() and in a guaranteed fashion get all cancelled tasks(queued but didn't started and started ones but didn't completed). Also, don't want to extend abstract executor service and deal with new type and its new method.

It's "doable" - with AspectJ5.

The solution primarily relies on: instant interruption handling, atomic variables, synchronized collection and AOP magic.
Before looking into the code, in "two words" let me describe how stuff does work:

- when you submit callable/runnable to executor service it's picked out from work-queue by available thread and then its exec. flow reaches to call()/run() - at this point I assert that task is at the first step of execution - this execution point is caught with AOP which tracks this fact by atomically inrementing number of running tasks.

- once task(either callable/runnable) started executing - at this point its behaviour is handled in a next way: if task throws checked/unchecked exception from its call()/run() method - it's ok, task being considered as completed and number of running tasks atomically decremented; but if during task execution it appears that thread was requested for interruption - instant interruption handling comes to play - runtime exception is being thrown, task added to collection of cancelled tasks and number of running tasks atomically decremented.

- as you might guess, ExecutorService.shutdownNow()(dressed with AOP) after requesting interruption of all running tasks - merely loops until number of running tasks becomes 0, and thanks to instant interruption handling - loop is finite!


Meet the ExecutorServiceShutdownNowAspect:
public aspect ExecutorServiceShutdownNowAspect {
 /////////////////////////////////////////////
 // ATTRIBUTES

 /**
  * Collection of interrupted workers;
  */
 private List<Runnable> interruptedWorkers = Collections
   .synchronizedList(new ArrayList<Runnable>());

 /**
  * Number of tasks currently being executed;
  */
 private volatile AtomicInteger activeTasksNum = new AtomicInteger();

 /////////////////////////////////////////////
 // POINCUTS

 /**
  * Matches {@link Callable#call()}-s;
  */
 pointcut executionOfCallable_() : execution(* Callable.call());

 /**
  * Matches {@link Callable#call()}-s;
  */
 pointcut executionOfCallable(Callable task) : executionOfCallable_() && this(task);

 /**
  * Compilation hint;
  */
 pointcut notwithin() :
  !within(eg.ExecutorServiceShutdownNowLimitationTest) && !within(ExecutorServiceShutdownNowAspect);

 /////////////////////////////////////////////
 // ADVICES

 /**
  * Check for interruption before every call of a method that doesn't throw
  * {@link InterruptedException};
  */
 before(Callable task) :
  cflow(executionOfCallable(task)) && call(* *(..) throws !InterruptedException) && notwithin() {
  if (interrupted()) {
   handleInterruption(task, new InterruptedException());
  }
 }

 /**
  * Handles interruption exception in the flow of execution of
  * {@link Callable#call()};
  */
 after(Callable task) throwing (InterruptedException e) :
  cflow(executionOfCallable(task)) && notwithin() {
  handleInterruption(task, e);
 }

 Object around() : executionOfCallable_() && notwithin() {
  // increment number of tasks currently being executed;
  activeTasksNum.incrementAndGet();
  try {
   // let execution of callable.call() proceed;
   return proceed();
  } finally {
   // decrement number of running tasks depending on interruption
   // status;
   if (!interrupted()) {
    activeTasksNum.decrementAndGet();
   }
  }
 }

 /**
  * Decorates task collection returned by
  * {@link ExecutorService#shutdownNow()} by adding to it interrupted tasks;
  */
 List<Runnable> around() : call(List<Runnable> ExecutorService.shutdownNow()) {
  interruptedWorkers.addAll(proceed());

  // wait until all active tasks have interrupted;
  while (activeTasksNum.get() != 0) {
  }

  // workaround caused by aspect's instantiation mode issue; 
  // in reality we should simply return interruptedWorkers;
  List<Runnable> ret = interruptedWorkers;
  interruptedWorkers = Collections
    .synchronizedList(new ArrayList<Runnable>());

  return ret;
 }

 /////////////////////////////////////////////
 // METHODS

 private void handleInterruption(Callable task, InterruptedException e) {
  Thread.currentThread().interrupt();
  interruptedWorkers.add(new FutureTask(task));
  activeTasksNum.decrementAndGet();
  throw new RuntimeException(e);
 }

 private boolean interrupted() {
  return Thread.currentThread().isInterrupted();
 }
}
More words on instant interruption - this is implemented by dynamically adding a check for current thread's interruption status by means of 'before' advice. Before every method call in the execution flow of callable.call() the code if (interrupted()) is being executed. Actually, not before every_ method call, but only before methods that don't throw InterruptedException. As for this exception, it has interesting treatment, would say, very serious treatment: once throw of InterruptedException detected - aspect doesn't play with it - set interruption status of current thread to 'true', saves current callable, atomically decrements number of running tasks and finally throws RuntimeException:
private void handleInterruption(Callable task, InterruptedException e) {
 Thread.currentThread().interrupt();
 interruptedWorkers.add(new FutureTask(task));
 activeTasksNum.decrementAndGet();
 throw new RuntimeException(e);
}
Instant interruption handling isn't single interesting thing. Look into the code, read comments and you will find more. 
What is great about aspectj - readability 8).

A few words about testing. 
Several cases/situations have been considered - all of them succeeded. Let me describe them briefly:

Wrong interruption handling:
void method() throws InterruptedException {
 ...
 synchronized (lock) {
   try {
      // let "emulate" blocking;
      lock.wait();
   } catch (InterruptedException ignore) {
      // case: ignorance of interruption;
   }
 }
 Thread.sleep(10000);
 ...
}
Re-throwing interrupted exception:
void method() throws InterruptedException {
 ...
 // case: interrupted exception throwing;
 Thread.sleep(10000);
 ...
}
Throwing checked exception:
void method() throws Exception {
 ...
 // case: application exception;
 throw new CheckedException();
}
Regular code insensible to standard interruption mechanism:
void method() {
 // lets "emulate" task insensible to standard interruption mechanism;
 for (;;) {
  anotherRegularMethod();
 }
}

At current, that's all cases/situations I devised. Will update post once invent new one.

I used TestNG framework to write test code, since it gives overhelming benefit over JUnit4 when you deal with single test body and several cases/situations for which this test aimed. Here is test code itself:
@Test
public void abruptPoolTermination(AbstractCallableFactory f,
                                  int expectingTaskNum)
  throws InterruptedException {
 ExecutorService exec = Executors.newFixedThreadPool(THREAD_NUM);
 exec.submit(f.callable());
 exec.submit(f.callable());
 exec.submit(f.callable());
 Thread.sleep(200);
 Assert.assertEquals(exec.shutdownNow().size(), expectingTaskNum);
}


In summary.
Should say it's very enjoyable to overcome existing java limits with aspectj, since it's not hard, it doesn't require from you an ability to mentally multiply hex digits; and even more - once you know java's concurreny rules - same rules could be legally applied in aspectj!

Thanks for your time!

December 15, 2009

Proof-of-concept: true interruption handling, transparently with AspectJ5

Perhaps, you know what InterruptedException does mean in java. In either case, let me describe it a little. Javadoc will not allow me to deceive you.
So, a method is interruptable if it: (a) throws InterrupedException; (b) has a logic that actually throws this kind of exception; It's simple - each thread, i.e. Thread object, has flag-field that holds interruption status. So, for a method to be cancellable(this word is better) - it must poll for interruption status with Thread.isInterrupted() and if it is eq. "true" throw InterruptedException, clearing the status at first. 
There are a lot of methods in java that throws InterrupedException: Object.wait(), Future.get(), Thread.sleep(), CountDownLatch.await() and etc. etc. and many others. Applying to them what I have already told we can assert that these methods checks interruption status of thread in which they are being executed, and throw an exception if it's case. In other words, regardless of when thread was being interrupted, i.e. before_ calling a method that throws InterruptedException or during_ execution of that method, method in either case will catch interruption status and will throw exception.
Pretty simple so far: just poll for interruption status and throw the exception, and let code at higher level treat that damn exception as he wants. So, where is issue with all of this?

As for me there is a big issue: it's impossible to make all the code, under execution thread, being explicitly interruptible, because you can't make all_ your code sensible to interruption. Got it? 
Let put this into code(try to imagine that code is meaningfull):


public abstract class Service {

 public Object doSomeDb(long id) {
  return findItemInDb(id);
 }

 abstract Object findItemInDb(long id);

 public void doSomeOperation(InputStream in, OutputStream out) throws IOException {
  if (doSomeDb(100L) != null) {
   executeIOStuff(in, out);
  }
 }

 void executeIOStuff(InputStream in, OutputStream out) throws IOException {
  byte[] buff = new byte[4096];
  while (in.read(buff) != -1) {
   out.write(buff);
  }
 }
}
If thread's execution flow which is currently being executed, at given moment of time is in the doSomeOperation() method, and our current thread isn't yet interrupted - it's ok, we proceed successfully with doSomeDb() 
and return from it. After that, it appears - when we were in doSomeDb(), during this time, someone who has access to our thread's reference, requested interruption on it; so, at this point, logically, we must instantly abort execution of our thread; unfortunately - not so, java will proceed with executeIOStuff() which in turn will do old IO operations, which aren't interruptable; But even if they were interruptable(like NIO operations) - it's wrong to execute something after a thread interruption.
So, now I hope you caught the core point - no_ operations after thread interruption! How to do this? While you think, let me introduce aspectj solution that effectively fix_ this.
At first, how_ we will fix?! I believe, dynamically checking thread's interruption status before and after every method call and throwing runtime exception if it's case, would be sufficient(especially in the context of "proff-of-concept solution").
Let me introduce aspect that effectively reflects exactly the same as I have said:


@Aspect
public aspect InterruptionAspect {
 @Pointcut("cflow(call(void majorFlow())&&target(b))&&call(* *(..))&&!within(InterruptionAspect)")
 public void everyMethodCallInTheControlFlowOfMajorFlowMethod(B b) {
 }

 @Before("everyMethodCallInTheControlFlowOfMajorFlowMethod(b)")
 public void adviceAfter(B b) {
  checkInterruption(b);
 }

 void checkInterruption(B b) {
  if (Thread.interrupted()) {
   b.interrupedBy = STATUS_MAJOR_FLOW_INTERRUPTED_ASAPLY;
   throw new CleverInterruptedException();
  }
 }
} 
Take a look on requestInstantInterruption(): if current thread has been interrupted - then stop any actions and abort execution(in java it's possible only through exception mechanism).


Unit test them all.

public class A {
 static final int STATUS_MAJOR_FLOW_INTERRUPTED_NORMALY = 1;
 static final int STATUS_MAJOR_FLOW_INTERRUPTED_ASAPLY = 2;

 /**
  * @see #STATUS_MAJOR_FLOW_INTERRUPTED_NORMALY
  * @see #STATUS_MAJOR_FLOW_INTERRUPTED_ASAPLY
  */
 volatile int interrupedBy;

 /**
  * Flag that indicates that computation in {@link #methodUnsensitiveToInterruption} has occured;
  */
 volatile boolean we_were_in_methodUnsensitiveToInterruption;

 /**
  * Entry point to class functionality;
  */
 public void majorFlow() {
  methodUnsensitiveToInterruption();
  try {
   blockingOperation();
  } catch (InterruptedException e) {
   // setting back the status of current thread to interrupted, so higher level code that
   // polls for thread's status could catch it.
   Thread.currentThread().interrupt();
   interrupedBy = STATUS_MAJOR_FLOW_INTERRUPTED_NORMALY;
  }
 }

 private void blockingOperation() throws InterruptedException {
  Thread.sleep(1000000L);
 }

 private void methodUnsensitiveToInterruption() {
  // just method...that is unsensitive to interruption, i.e. when we come here and current
  // thread's status is interrupted, we wouldn't know that and instead of stopping we would
  // proceed
  we_were_in_methodUnsensitiveToInterruption = true;
 }
}

public class B extends A {
}

public class CleverInterruptedException extends RuntimeException {
}
Class A: entry point is majorFlow() method. Under regular(without apllying aspect logic) interruption majorFlow() will abort its execution only in the blockingOperation(). I.e. if thread is being interrupted before methodUnsesitiveToInterruption(), under "normal java" this will be revealed only in the operation that throws InterruptedException, in our case - in blockingOperation(). So, under regular conditions the flag we_were_in_methodUnsensitiveToInterruption will be "true" and interruptedBy will be ...INTERRUPTED_NORMALLY.

Actually unit tests: 
public class TestAdvancedInterruption {
 private final ExecutorService executor = Executors.newSingleThreadExecutor();

 private static class MyTask implements Callable {
  final A target;

  public MyTask(A a) {
   this.target = a;
  }

  public A call() {
   // Logically, at start thread isn't interrupted
   // Lets "emulate" interruption;
   Thread.currentThread().interrupt();
   // watch out for CleverInterruptedException!
   try {
    target.majorFlow();
   } 
   catch (CleverInterruptedException ignore) {
    // It's ok to ignore it here, but don't forget to set the status;
    Thread.currentThread().interrupt();
   }
   return target;
  }
 };

 @Test
 public void handleInterruption_Normaly() throws Exception {
  A a_state = executor.submit(new MyTask(new A())).get();
  assertEquals(STATUS_MAJOR_FLOW_INTERRUPTED_NORMALY, a_state.interrupedBy);
  assertTrue(a_state.we_were_in_methodUnsensitiveToInterruption);
 }

 @Test
 public void handleInterruption_ASAP() throws Exception {
  B b_state = (B) executor.submit(new MyTask(new B())).get();
  assertEquals(STATUS_MAJOR_FLOW_INTERRUPTED_ASAPLY, b_state.interrupedBy);
  assertFalse(b_state.we_were_in_methodUnsensitiveToInterruption);
 }
}
After applying aspect picture becomes totally different - handleInterruption_ASAP() tests aspect "presence": I assert that interruptedBy status now eq. to ...INTERRUPTED_ASAPLY and it's false that we were in methodUnsensitiveToInterruption! Stop here. You see - we fixed the issue - thread was being interrupted before call to methodUnsensitiveToInterruption() and_ code, thanks to aspectj, handled that immediately!


Thanks for reading and for your time!