January 8, 2010

Revealing FutureTask's hidden property

As you may know java.util.concurrent.FutureTask is used to encapsulate deferred computation and the result of that compuatation.
We accustomed to use it jointly with Executor by submitting it to thread pool and retrieving result in a blocked fashion with future.get() method. Nothing new so far.

You know, as for me it's a lot of fun finding standard utility, like FutureTask, being used in an alternative way! What if you haven't an Executor but still wants to use FutureTask's core principle - one thread does a job, others - should wait. So you have just current thread and want your current stuff being done in current thread at the same time having other workers, who migth came at the same execution point, being waited for the computation result.

Easily:
public class NiceFutureFeature {
 private final AtomicReference<FutureTask> f = new AtomicReference<FutureTask>();

 public Object work(Callable c) throws InterruptedException, ExecutionException {
  if (f.compareAndSet(null, new FutureTask(c))) {
   // Only one thread will pass and will do the job
   f.get().run();
  }
  // Since we didn't pass we should wait for result
  return f.get().get();
 }
}
In this class atomic reference ensures that only one thread will proceed with f.run(), others will wait. Core point - atomically restrict access to task execution to only one worker.

I believe this approach would be useful when we meet execution of idempotent operations at the same time. E.g.:
idempotent business routine that includes sql query and is being executed by n threads at a given time, i.e. if you had stopped time you would have seen n threads going to hit db with same sql(I checked that on Toplink)! No problem if you execute methods sequentally, i.e. hit db - put result in cache - next time execute method and get result from cache, the issue appears when you perform operations concurrently. In this regard let me extend code above:
public class NiceFutureFeature {
 /**
  * Key of type List; Could be in the form:
  * ["select id from Emp e where e.id=:id and e.salary=:salary", 100, 2000]
  */
 private final ConcurrentMap<List, FutureTask> cm = new ConcurrentHashMap<List, FutureTask>();

 public Object work(List key, Callable c) throws InterruptedException, ExecutionException {
  FutureTask f = new FutureTask(c);
  if (cm.putIfAbsent(key, f) == null) {
   // Only one will pass and will do the job
   f.run();
  }
  // Since we didn't pass we should wait for result
  return cm.get(key).get();
 }
}
Same principle applied in this code to guard main idea by using atomical putIfAbsent() method.

Thanks for reading!

No comments:

Post a Comment