January 24, 2010

Effects produced by different thread-safe coding styles

Was a bit surprised that different thread-safe coding approaches can impact on app. logic. Suppose we have trivial auth. service like this one:
public interface Service {
void configChanged() throws Exception;
void login(Session session) throws Exception;
}
It's possible impl.:
public class ThreadSafeServiceImpl implements Service {
 /**
  * Environmental properties;
  */
 private volatile Properties env;

 /**
  * Properties could have been changed;
  */
 public void configChanged() throws Exception {
  Properties env__ = new Properties();
  env__.put("login", property("app.user.login"));
  env__.put("passwrd", property("app.user.passwrd"));
  env__.put("prefs", property("app.user.prefs"));

  env = env__;
 }

 /**
  * Store current user credentials in session;
  */
 public void login(Session session) throws Exception {
  session.getAttribute("creds").apply(env);
 }
}
Main idea behind service - login user by storing his info in session. Subsequent business logic would have retrieved "creds" object, checked and either accepts "creds" or throws some kind of application exception there by enforcing currently logged-in user to re-login or preventing unauth. access.

Let's take a look on FastServiceImpl from a point of view of several threads: at first it's safe to access service methods for them, no inconsistency: login() sees very last_ version of env, configChanged() could safely update env and subsequent login() could have seen either new or old reference to env, but in any case he would haven't seen partly updated env object. Very ellegant and simple approach, by the way - the same idea is behind CopyOnWriteArrayList
So, what is not_ cool about it?! The answer depends on whether it's critical for your app. to support  real_ concurrnet access in FastServiceImpl: do you want to manage situation when thread-1 enters configChanged() and thread-2 at the same time enters login() (?). If you do_ then keep reading.

The main issue with service impl. - it doesn't handle concurrent access. Yes, it's thread-safe, but_ it doesn't handle concurrency: it's possible for thread-1 enter w/o a problem configChaned() and at the same time for thread-2 it's possible to enter, also w/o a problem, login() method. By introducing real concurrent access(not just thread-safe) - we would get a situation where thread-1 should wait if someone accessing login() at the same time; and vice versa - if thread-2 accessing login() finds that configChanged() busy - then thread-2 should wait as well!

To get this done, first that comes to mind - use RRWL:
public class ConcurrentServiceImpl extends ThreadSafeServiceImpl {
 private final ReadWriteLock rw_lock = new ReentrantReadWriteLock();

 @Override
 public void configChanged() throws Exception {
  rw_lock.writeLock().lock();
  try {
   super.configChanged();
  } finally {
   rw_lock.writeLock().unlock();
  }
 }

 @Override
 public void login(Session session) throws Exception {
  rw_lock.readLock().lock();
  try {
   super.login(session);
  } finally {
   rw_lock.readLock().unlock();
  }
 }
}
 Obviously this impl. is better than prev. one: because of support of real_ concurrent access.



Thanks for reading! 


P.S.
For those who currious of why ThreadSafeServiceImpl.env has been declared with volatile
read next: What does volatile do? and New guarantees for volatile.

January 10, 2010

Fixing "check-then-act" sequence with ReentrantReadWriteLock(RRWL)

Consider multithreaded component with two methods: first one executes highly concurrent operations, e.g. put chat messages to shared concurrent queue, second one plays role of an immediate stopper. See code:
public class CheckThenAct {
 /**
  * Unbound shared message queue;
  */
 private final ConcurrentLinkedQueue<String> q;

 /**
  * Flag indicating whether operation should proceed;
  */
 private final AtomicBoolean proceed = new AtomicBoolean(true);

 public CheckThenAct(ConcurrentLinkedQueue<String> q) {
  this.q = q;
 }

 public boolean putMessage(String msg) {
  return proceed.get() ? q.offer(msg) : false;
 }

 public boolean stop() {
  return proceed.compareAndSet(true, false);
 }
}
In the putMessage() method we have "check-than-act" concurrency problem. I.e. when in thread1, thread2, ..., thread20 we pass proceed.get() but just right after that in thread23 we call stop(), i.e. no more messages, seriously; but in fact twenty messages being added to the q. Since putMessage() must be highly concurrent I can't make it(and stop()) synchronized. Solution is - to use RRWL:
...

private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();

...

public boolean putMessage(String msg) {
 if (!rw.readLock().tryLock())
  return false;
 try {
  return proceed.get() ? q.offer(msg) : false;
 } finally {
  rw.readLock().unlock();
 }
}

public boolean stop() {
 rw.writeLock().lock();
 try {
  return proceed.compareAndSet(true, false);
 } finally {
  rw.writeLock().unlock();
 }
}
Let me explain why:
- Read lock in putMessage() doesn't bring serialized behaviour, method still is highly concurrent;
- Read lock being acquired only when there is no write lock;
- Write lock being acquired only when there are no read locks;
- Write lock is exclusive;

So, main demand satisfied - truly shared access in putMessage() and remedy the "check-then-act" behaviour!

Thanks for reading!

January 9, 2010

Alternative look at ReentrantReadWriteLock(RRWL)

I adore to hunt for non-standard/non-documented properties/usages/features of old-good(and standard) utilities. 
Lets take ReentrantReadWriteLock(RRWL) - it's more than five years old(in Java env. of course) concurrency utility aimed chiefly for efficient separation of shared read and exclusive write operations. Definitely look for more in javadoc. What is interesting about RRWL is that examples rendering its usage usually concentrate attention on the fact that readLock() guards 'read' operations and writeLock() guards 'write' operations. Let me show:
public class RRWLockRegularCase {
 private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
 private final ReadLock r = rw.readLock();
 private final WriteLock w = rw.writeLock();

 private final char[] arr = new char[] { 'c', 'o', 'o', 'l', '!' };

 public void iterate() {
  r.lock();
   for (char c : arr) {
   }
  r.unlock();
 }

 public void modify() {
  w.lock();
   arr[4] = '?';
  w.unlock();
 }
}
And this one from javadoc:
class RWDictionary {
    private final Map<String, Data>  m = new TreeMap<String, Data>();
    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private final Lock r = rwl.readLock();
    private final Lock w = rwl.writeLock();

    public Data get(String key) {
        r.lock(); try { return m.get(key); } finally { r.unlock(); }
    }
    public String[] allKeys() {
        r.lock(); try { return m.keySet().toArray(); } finally { r.unlock(); }
    }
    public Data put(String key, Data value) {
        w.lock(); try { return m.put(key, value); } finally { w.unlock(); }
    }
    public void clear() {
        w.lock(); try { m.clear(); } finally { w.unlock(); }
    }
}
From examples it's obviously that readLock() guards read(R) operations: get(String), allKeys(), iterate(); and writeLock() - clear(), put(String, Data), modify() - write(W) operations.

But do you know that RRWL could be used in an alternative way?!

Suppose we have appl. component that tracks user activity: when user came to site, what page he entered and so on. It would be Map with key customerId and value - list of log messages. To simplify concurr. map access there's a rule: customerId isn't shared among threads. Also it's known that component has two methods: saveOrUpdate() - for saving/updating activity log messages and clearThemAll() method for removing logs from memory. Like this:
public class RRWLockUnusualCase {
 /**
  * Pairs like [customerId, list of log messgs] e.g.: [100,
  * {"opened session at 10/10/2010 00:14:40",
  * "entered on main_page at 00:14:45", "entered help_page at 00:16:21"}];
  * customerId is strictly thread bound i.e. it's impossible seeing two
  * threads sharing same customerId(think of it like of business logic
  * restriction);
  */
 private final ConcurrentMap<Long, List<String>> customerActionLog;

 public RRWLockUnusualCase(ConcurrentMap<Long, List<String>> customerActionLog) {
  this.customerActionLog = customerActionLog;
 }

 public void saveOrUpdate(Long customerId, String messg) {
  if (customerActionLog.get(customerId) != null) {
   customerActionLog.get(customerId).add(messg);
  } else {
   customerActionLog.put(customerId, Arrays.asList(messg));
  }
 }
 
 public void clearThemAll() {
  for (Long k : customerActionLog.keySet()) {
   customerActionLog.remove(k);
  }
 }
}
The main drawback - not thread-safe(in spite of ConcurrentMap) 'cause suffers from "check-then-act" problem:
...
if (customerActionLog.get(customerId) != null) {
customerActionLog.get(customerId).add(messg);
...
On second line we run into trouble if after first one method clearThemAll()* removes target entry, *which could being executed concurrently. To fix, first that occurs to me - synchronization on object customerActionLog:
public void saveOrUpdate(Long customerId, String messg) {
 synchronized (customerActionLog) {
  ...
 }  
}

public void clearThemAll() {
 synchronized (customerActionLog) {
  ...
 }
}
Which is bad idea since serializing saveOrUpdate() is inadmissible. There is an alternative to synchronization. What we have: tons of threads executing saveOrUpdate(), write activity logs in a non-blocking manner; issue comes when we need to clearThemAll(). In latter method would be logical to wait until all saveOrUpdate() operations have finished and only_ after this proceed(exclusively) with clearing. This transforms to:
public class RRWLockUnusualCase {
 private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
 private final ReadLock r = rw.readLock();
 private final WriteLock w = rw.writeLock();

 // Same as above ...
 
 public void saveOrUpdate(Long customerId, String messg) {
  r.lock();
  try {
   // Same as above ...
  } finally {
   r.unlock();
  }
 }

 public void clearThemAll() {
  w.lock();
  try {
   // Same as above ...
  } finally {
   w.unlock();
  }
 }
}
So that was an 'alternative look' - readLock() doesn't protected 'read' operation - it protects shared operation.

And finally pattern, I believe:
public class RRWLockPattern {
 private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
 private final ReadLock r = rw.readLock();
 private final WriteLock w = rw.writeLock();

 // ...

 /**
  * Can be executed concurrently if there is no exclusive operation
  * currently being executed;
  */
 public void sharedOperation() {
  r.lock();
  try {
   // PUT SHARED OPERATION LOGIC HERE
  } finally {
   r.unlock();
  }
 }

 /**
  *  Can be executed only exclusively i.e. by single thread;
  *  In case of shared operations currently being executed - blocks
  *  until all_ shared operations have completed; 
  */
 public void exclusiveOperation() {
  w.lock();
  try {
   // PUT EXCLUSIVE OPERATION LOGIC HERE
  } finally {
   w.unlock();
  }
 }
}


Thanks for reading!

January 8, 2010

Revealing FutureTask's hidden property

As you may know java.util.concurrent.FutureTask is used to encapsulate deferred computation and the result of that compuatation.
We accustomed to use it jointly with Executor by submitting it to thread pool and retrieving result in a blocked fashion with future.get() method. Nothing new so far.

You know, as for me it's a lot of fun finding standard utility, like FutureTask, being used in an alternative way! What if you haven't an Executor but still wants to use FutureTask's core principle - one thread does a job, others - should wait. So you have just current thread and want your current stuff being done in current thread at the same time having other workers, who migth came at the same execution point, being waited for the computation result.

Easily:
public class NiceFutureFeature {
 private final AtomicReference<FutureTask> f = new AtomicReference<FutureTask>();

 public Object work(Callable c) throws InterruptedException, ExecutionException {
  if (f.compareAndSet(null, new FutureTask(c))) {
   // Only one thread will pass and will do the job
   f.get().run();
  }
  // Since we didn't pass we should wait for result
  return f.get().get();
 }
}
In this class atomic reference ensures that only one thread will proceed with f.run(), others will wait. Core point - atomically restrict access to task execution to only one worker.

I believe this approach would be useful when we meet execution of idempotent operations at the same time. E.g.:
idempotent business routine that includes sql query and is being executed by n threads at a given time, i.e. if you had stopped time you would have seen n threads going to hit db with same sql(I checked that on Toplink)! No problem if you execute methods sequentally, i.e. hit db - put result in cache - next time execute method and get result from cache, the issue appears when you perform operations concurrently. In this regard let me extend code above:
public class NiceFutureFeature {
 /**
  * Key of type List; Could be in the form:
  * ["select id from Emp e where e.id=:id and e.salary=:salary", 100, 2000]
  */
 private final ConcurrentMap<List, FutureTask> cm = new ConcurrentHashMap<List, FutureTask>();

 public Object work(List key, Callable c) throws InterruptedException, ExecutionException {
  FutureTask f = new FutureTask(c);
  if (cm.putIfAbsent(key, f) == null) {
   // Only one will pass and will do the job
   f.run();
  }
  // Since we didn't pass we should wait for result
  return cm.get(key).get();
 }
}
Same principle applied in this code to guard main idea by using atomical putIfAbsent() method.

Thanks for reading!

January 4, 2010

Limitations of percflow/percflowbelow instantiation modes in AspecJ5

Recently I enjoyed percflow/percflowbelow advanced instantiation modes in aspectj; but later found considerable limitation(from my point of view) - they are thread bound. In this mode Aspect's TTL is limited to flow-of-execution - ok, this is what they had been designed for, but what if that flow-of-exec. initiates another thread underneath and submit execution unit like callable/runnable to it - wouldn't be nice if that submitted unit could be weaved by aspect as well?
Suppose we have code like: 
public void testConcurrentLogic() {
 ExecutorService exec = ...
 Future f = exec.submit(runnable);
 assert...(f.get());
}
The issue is - need an aspect with lifecycle bound to testConcurrentLogic() and with ability to match joint points in runnable.run(). At current, first part is doable with percflow instantiation mode, second one - isn't.