Blocking Queue Implementations in Java
Filed in: Java
If you are looking for how to do a Blocking Q in C++ please follow this link here.
A blocking queue is simply a class with an encapsulated q or similar data structure with a put and take method that never puts onto a q if the q is full and it never takes from a q if the q is empty. In summary we have two invariant conditions that must be satisfied.
Invariant 1. We must never put onto a full q so if q is full we must wait for the take to remove from the q
Invariant 2. We must never take from an empty q if q is empty we must wait for it to be non empty
So the question becomes how do we manage the invariants and more importantly how do we solve these two problems that arise from maintaining the invariants.
Notification 1. When we do a take how do we notify the put that q is no longer full because we took something from it.
Notification 2. When we do a put how do we notify the take that the q is no longer empty
So you can see from above we have some invariants and we need to do some notifications.
In Java we can solve this in a few ways. The first and most simple way is to take advantage of wait() and notifyAll() in Object(). This simply entails waiting on the invariants and notifying once an action has occurred. Recall that wait must be called in a Synchronized context and will give up its lock. Also you should note that notifyAll notifies all threads waiting on this action that the action occurred. Here it would suffice to just notify() which only signals one thread.
Here is the code:
//Guarded by this
List holder;
//Final
final int MAX;
public WaitNotifyList(int iSize) {
holder=new LinkedList();
MAX=iSize;
}
//produce
public synchronized void put(int elem) {
while(holder.size()==MAX) {
try {
this.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
holder.add(elem);
this.notifyAll();
}
public synchronized int take() {
while(holder.isEmpty()) {
try {
this.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
int res= holder.remove(0);
this.notifyAll();
return res;
}
public static void main(String[] args) {
WaitNotifyList wn = new WaitNotifyList(5);
Thread p1=new Thread(() -> {
for(int i=0; i< 20; i++) {
wn.put(i);
System.out.format("Produced %d \n", i);
}
});
p1.start();
Thread c1=new Thread(() -> {
for(int i=0; i< 20; i++) {
int res=wn.take();
System.out.format("Consumed %d \n", res);
}
});
c1.start();
}
}
For this example instead of a Q we are using a List data structure of type LinkedList. Like I said any data structure that allows you to hold data and access the bottom or 0th element and allows you to add to the top of the structure is fine so List Q or array.
If you wanted to improve the performance you could just use a standard array in Java this means you don't have the overhead of object creation or any overhead in the LinkedList methods that do the adding and removing. However if we use an array we must manage the put and take pointers so we know where the put and take are in the array. The reason we prefer built in data structures like ArrayList or LinkedList is because they handle that low level work for us.
So lets convert this to use an array like so:
class WaitNotifyArray {
//Guarded by this
int arr[];
int count, put, take;
public WaitNotifyArray(int iSize) {
arr= new int[iSize];
}
//produce
public synchronized void put(int elem) {
while(count==arr.length) {
try {
this.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
arr[put]=elem;
if(++put==arr.length) put=0;
count++;
this.notifyAll();
}
public synchronized int take() {
while(count==0) {
try {
this.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
int res = arr[take];
if(++take==arr.length) take=0;
count--;
this.notifyAll();
return res;
}
public static void main(String[] args) {
WaitNotifyArray wn = new WaitNotifyArray(5);
Thread p1=new Thread(() -> {
for(int i=0; i< 20; i++) {
wn.put(i);
System.out.format("Produced %d \n", i);
}
});
p1.start();
Thread c1=new Thread(() -> {
for(int i=0; i< 20; i++) {
int res=wn.take();
System.out.format("Consumed %d \n", res);
}
});
c1.start();
}
}
This is a bit faster but only marginally and it shows how you can use different structures but still maintain the same effect.
As of Java 8 there is a much simpler way to perform the notification and that is to use a Java Condition. A Condition object is always implicitly bound to a lock so you must create the condition from the lock. The lock will now be bound to the condition await() and signal() which is similar to the wait() and notifyAll().
Here we are explicitly doing lock() and unlock() rather than a synchronized method and using finally to guarantee the lock is unlocked().
Here is the code:
class BoundedBufferUsingConditions {
//Instead of Wait Notify lets use a Lock and some conditions
final Lock lock = new ReentrantLock();
//Condition is an interface
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
//Guarded by this
int arr[];
int count, put, take;
public BoundedBufferUsingConditions(int iSize) {
arr= new int[iSize];
}
//produce
public void put(int elem) {
lock.lock();
try {
while(count==arr.length) {
try {
//this.wait();
notFull.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
arr[put]=elem;
if(++put==arr.length) put=0;
count++;
//this.notifyAll();
notEmpty.signal();
}finally { lock.unlock(); }
}
public int take() {
lock.lock();
try {
while(count==0) {
try {
//this.wait();
notEmpty.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
int res = arr[take];
if(++take==arr.length) take=0;
count--;
//this.notifyAll();
notFull.signal();
return res;
}finally { lock.unlock(); }
}
public static void main(String[] args) {
BoundedBufferUsingConditions wn = new BoundedBufferUsingConditions(5);
Thread p1=new Thread(() -> {
for(int i=0; i< 20; i++) {
wn.put(i);
System.out.format("Produced %d \n", i);
}
});
p1.start();
Thread c1=new Thread(() -> {
for(int i=0; i< 20; i++) {
int res=wn.take();
System.out.format("Consumed %d \n", res);
}
});
c1.start();
}
}
Thread Safe Java Singleton
Create a Statistics Distribution
Naive Bayes classification AI algorithm
K-Means Clustering AI algorithm
Equity Derivatives tutorial
Fixed Income tutorial
Java
python
Scala
Investment Banking tutorials
HOME

A blocking queue is simply a class with an encapsulated q or similar data structure with a put and take method that never puts onto a q if the q is full and it never takes from a q if the q is empty. In summary we have two invariant conditions that must be satisfied.
Invariant 1. We must never put onto a full q so if q is full we must wait for the take to remove from the q
Invariant 2. We must never take from an empty q if q is empty we must wait for it to be non empty
So the question becomes how do we manage the invariants and more importantly how do we solve these two problems that arise from maintaining the invariants.
Notification 1. When we do a take how do we notify the put that q is no longer full because we took something from it.
Notification 2. When we do a put how do we notify the take that the q is no longer empty
So you can see from above we have some invariants and we need to do some notifications.
In Java we can solve this in a few ways. The first and most simple way is to take advantage of wait() and notifyAll() in Object(). This simply entails waiting on the invariants and notifying once an action has occurred. Recall that wait must be called in a Synchronized context and will give up its lock. Also you should note that notifyAll notifies all threads waiting on this action that the action occurred. Here it would suffice to just notify() which only signals one thread.
Here is the code:
public class WaitNotifyList {
//Guarded by this
List
//Final
final int MAX;
public WaitNotifyList(int iSize) {
holder=new LinkedList
}
//produce
public synchronized void put(int elem) {
while(holder.size()==MAX) {
try {
this.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
holder.add(elem);
this.notifyAll();
}
public synchronized int take() {
while(holder.isEmpty()) {
try {
this.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
int res= holder.remove(0);
this.notifyAll();
return res;
}
public static void main(String[] args) {
WaitNotifyList wn = new WaitNotifyList(5);
Thread p1=new Thread(() -> {
for(int i=0; i< 20; i++) {
wn.put(i);
System.out.format("Produced %d \n", i);
}
});
p1.start();
Thread c1=new Thread(() -> {
for(int i=0; i< 20; i++) {
int res=wn.take();
System.out.format("Consumed %d \n", res);
}
});
c1.start();
}
}
For this example instead of a Q we are using a List data structure of type LinkedList. Like I said any data structure that allows you to hold data and access the bottom or 0th element and allows you to add to the top of the structure is fine so List Q or array.
If you wanted to improve the performance you could just use a standard array in Java this means you don't have the overhead of object creation or any overhead in the LinkedList methods that do the adding and removing. However if we use an array we must manage the put and take pointers so we know where the put and take are in the array. The reason we prefer built in data structures like ArrayList or LinkedList is because they handle that low level work for us.
So lets convert this to use an array like so:
//Prefer ArrayBlockingQueue
class WaitNotifyArray {
//Guarded by this
int arr[];
int count, put, take;
public WaitNotifyArray(int iSize) {
arr= new int[iSize];
}
//produce
public synchronized void put(int elem) {
while(count==arr.length) {
try {
this.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
arr[put]=elem;
if(++put==arr.length) put=0;
count++;
this.notifyAll();
}
public synchronized int take() {
while(count==0) {
try {
this.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
int res = arr[take];
if(++take==arr.length) take=0;
count--;
this.notifyAll();
return res;
}
public static void main(String[] args) {
WaitNotifyArray wn = new WaitNotifyArray(5);
Thread p1=new Thread(() -> {
for(int i=0; i< 20; i++) {
wn.put(i);
System.out.format("Produced %d \n", i);
}
});
p1.start();
Thread c1=new Thread(() -> {
for(int i=0; i< 20; i++) {
int res=wn.take();
System.out.format("Consumed %d \n", res);
}
});
c1.start();
}
}
This is a bit faster but only marginally and it shows how you can use different structures but still maintain the same effect.
As of Java 8 there is a much simpler way to perform the notification and that is to use a Java Condition. A Condition object is always implicitly bound to a lock so you must create the condition from the lock. The lock will now be bound to the condition await() and signal() which is similar to the wait() and notifyAll().
Here we are explicitly doing lock() and unlock() rather than a synchronized method and using finally to guarantee the lock is unlocked().
Here is the code:
//Prefer ArrayBlockingQueue
class BoundedBufferUsingConditions {
//Instead of Wait Notify lets use a Lock and some conditions
final Lock lock = new ReentrantLock();
//Condition is an interface
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
//Guarded by this
int arr[];
int count, put, take;
public BoundedBufferUsingConditions(int iSize) {
arr= new int[iSize];
}
//produce
public void put(int elem) {
lock.lock();
try {
while(count==arr.length) {
try {
//this.wait();
notFull.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
arr[put]=elem;
if(++put==arr.length) put=0;
count++;
//this.notifyAll();
notEmpty.signal();
}finally { lock.unlock(); }
}
public int take() {
lock.lock();
try {
while(count==0) {
try {
//this.wait();
notEmpty.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
int res = arr[take];
if(++take==arr.length) take=0;
count--;
//this.notifyAll();
notFull.signal();
return res;
}finally { lock.unlock(); }
}
public static void main(String[] args) {
BoundedBufferUsingConditions wn = new BoundedBufferUsingConditions(5);
Thread p1=new Thread(() -> {
for(int i=0; i< 20; i++) {
wn.put(i);
System.out.format("Produced %d \n", i);
}
});
p1.start();
Thread c1=new Thread(() -> {
for(int i=0; i< 20; i++) {
int res=wn.take();
System.out.format("Consumed %d \n", res);
}
});
c1.start();
}
}
People who enjoyed this article also enjoyed the following:
Thread Safe Java Singleton
Create a Statistics Distribution
Naive Bayes classification AI algorithm
K-Means Clustering AI algorithm
Equity Derivatives tutorial
Fixed Income tutorial
And the following Trails:
C++Java
python
Scala
Investment Banking tutorials
HOME
