Low Latency Java using CAS
Filed in: Java | Low Latency Java
How to use Compare and Swap CAS to code low latency Java
When writing efficient Java code or Low Latency Java code it means you have to squeeze every microsecond out of the system and for that you need to start looking at ways to improve performance across the system, CAS is one way of doing that but not the only way.
When looking at low latency code one of the traditional problems you have is Thread contention, by this I mean threads fighting for locks and sleeping while a lock is not available and then trying again. This cycle of check if lock is free, if not wait and check again is managed by the JVM when multiple threads try to get the same lock and 1 thread wins but all the other threads then have to wait for the lock to be released. What some of the non blocking Algorithms try to do is to alleviate the problem of the Thread wait if the lock is already taken, and in the process significantly improve the speed of the code.
Let us take a Simple counter class we call a method in the class to increase the counter by 1. Given the class is used by many threads at the same time it is important we avoid race conditions and that only one thread at a time can update the counter so that you get correct results. This does imply that you will now have some form of thread contention which will inevitable slow things down, however we want to keep that disruption to a minimum. So one way of doing this is to use Optimistic locking.
Let me explain what I mean by optimistic locking. In general most of the Threads we use that have Synchronized sections will explicitly block another thread and act as a guard on a particular object. This traditional way of doing things would effectively have a counter with an increment method that was Synchronized and a private int counter. This is the traditional locking methodology called pessimistic locking as we are always on guard for the worst case scenario. However optimistic locking does not guard the variable instead we allow all threads to take an action but only update the variable if and only if there is a consistent state.
Lets look at the code below:
Compare and swap Example
public class CAS {
class NonblockingCounter {
private AtomicInteger value;
public int getValue() {
value.incrementAndGet();
return value.get();
}
public int increment() {
int v;
do {
v = value.get();
}
while (!value.compareAndSet(v, v + 1));
return v + 1;
}
}
You will see the increment method has no guard so multiple threads can run through the method at the same time. Note that v is on the stack so safe and then we do an atomic get locally in the thread so we are still safe. Now we get to the while and we run the atomic compareAndSet with a not so if the v is in a consistent state ie: as long as the value has not changed we will update it to +1. However if the value has changed ie by another thread then we fail and have to do the while loop again so we spin until we have a clear run at changing the variable this keeps the value variable in a consistent state.
This does mean that threads can spin until they have the opportunity to make a clear run. This means everything always remains in a clear state without a thread going into a lock waiting state. A lock waiting state will be much slower rather than just a spin, and while this makes things faster by spinning it is still a concern that Java 8 tries to address.
You may have asked why not just do an Atomic incrementAndGet? Well you could as this is functionally the same as the code above. The compareAndSet method actually runs a hardware primitive called compare and swap (CAS) at the assembly level which runs directly on the hardware. Now lets look under the hood for what this incrementAndGet looked like in Java 7 and we see the following code:
public final int incrementAndGet_Java7() {
for(;;){
int current = value.get();
int next = current + 1;
if(value.compareAndSet(current, next))
return next;
}
}
And the Java 8 Implementation looks like
/**
* Atomically increments by one the current value.
*
* @return the previous value
*/
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
Why is the Java 8 version so different ? This is because it is just LOCK XADD assembly instruction at the hardware in assembly language which is faster than Compare and Swap (compareAndSet ) in High thread contention environments.
The issue is that in high Thread contention the JIT or Just in Time compiler will verge on the side of failure with a CAS methodology as a lot of threads will be failing and rerunning around the loop again in CAS this means that the future code paths will always vere in the direction of failure. The new construct of Java 8 does not imply failure at all so the JIT can be slightly faster and in tests this has proven to be the case. For more information on LOCK XADD also known as the Fetch and Add paradigm you can look here https://en.wikipedia.org/wiki/Fetch-and-add
How to use LongAdder and why it is faster than Atomic counters
Another counter that you could use is an Atomic LongAdder. This would do the same thing as the Counters above. So the question is why do you want to use a LongAdder as opposed to a straight AtomicLong.
To answer this we need to look at the performance of the Adder as opposes to an AtomicLong and why the Adders are better. The Adders under the hood will actually store the value for you rather than update the value directly. So you only get the fully added value once you called longValue. The downside is you use a bit more memory in order to gain better performance.
So the Adders use more memory to store adds but is a bit faster at the expense of a bit more memory usage, which is why you need to getValue when you are done to get the result.
Below is a thread-safe example it creates a frequency count of some random variables then sorts the Map values by the Frequency count and then takes the mean.
Example using LongAdder
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.IntStream;
public class Statistics {
ConcurrentHashMapfreqs = new ConcurrentHashMap<>();
public void add(int key){
//If no LongAdder Object ie it is Absent create one and Increment
freqs.computeIfAbsent(key,k -> new LongAdder()).increment();
}
public void stats(Mapmap){
//Use CopyonWriteArrayList as the Map may still be being used
//Best to only run method after Map has all entries.
List> list = new CopyOnWriteArrayList >(map.entrySet());
//We want to sort on Value which the LongAdder so we need our own Comparator
Collections.sort(list, new Comparator>() {
@Override
public int compare(Entryo1, Entry o2) {
if( o1.getValue().longValue() > o2.getValue().longValue() ){
return -1;
}else{
return 1;
}
}
});
//Write Map sorted by Value
for( Entrye: list){
System.out.println(String.format("%d:%d ",e.getKey(), e.getValue().longValue()));
}
//Compute the mean
long total=list.stream().mapToLong(s -> s.getValue().longValue()).reduce(0 , (a,b) -> a+b);
System.out.println(String.format("total freq counts avg %d total Value Count across all Keys %d ",
total/list.size(), total
));
}
public static void main(String[] args ){
Statistics statistics = new Statistics();
Random rand = new Random();
//Create thread pool of 100
ExecutorService producer = Executors.newFixedThreadPool(100);
//Submit 1000 tasks of 100 LongAdder adds each - SHould cause thread contention on Pool of 100 threads
IntStream.range(0, 1000).forEach(x -> {
producer.submit(() -> {
for(int i=0; i<100; i++)
statistics.add(rand.nextInt(10));
});
});
//Now wait for all threads to finish
try {
producer.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
//Interrupted so force shutdown
producer.shutdown();
//Preserve interrupt status so others are aware and can act
Thread.currentThread().interrupt();
}finally{
//Do a Shutdown if all went well otherwise app will not end.
producer.shutdownNow();
}
//Run Stats Function for Mean - By Now the Thread pool should have had a chance to finish
statistics.stats(statistics.freqs);
//Now look at the Map and see the order based on Natural Key ordering of Integer
for( Integer i : statistics.freqs.keySet()){
System.out.println(String.format(" %d:%d", i, statistics.freqs.get(i).longValue()));
}
}
}
People who enjoyed this article also enjoyed the following:
Create a Statistics Distribution
Naive Bayes classification AI algorithm
K-Means Clustering AI algorithm
Equity Derivatives tutorial
Fixed Income tutorial
And the following Trails:
C++
Java
python
Scala
Investment Banking tutorials
HOME
