Blocking Queue Implementations in Java

If you are looking for how to do a Blocking Q in C++ please follow this link here.

A blocking queue is simply a class with an encapsulated q or similar data structure with a put and take method that never puts onto a q if the q is full and it never takes from a q if the q is empty. In summary we have two invariant conditions that must be satisfied.

Invariant 1. We must never put onto a full q so if q is full we must wait for the take to remove from the q
Invariant 2. We must never take from an empty q if q is empty we must wait for it to be non empty

So the question becomes how do we manage the invariants and more importantly how do we solve these two problems that arise from maintaining the invariants.

Notification 1. When we do a take how do we notify the put that q is no longer full because we took something from it.
Notification 2. When we do a put how do we notify the take that the q is no longer empty

So you can see from above we have some invariants and we need to do some notifications.

In Java we can solve this in a few ways. The first and most simple way is to take advantage of
wait() and notifyAll() in Object(). This simply entails waiting on the invariants and notifying once an action has occurred. Recall that wait must be called in a Synchronized context and will give up its lock. Also you should note that notifyAll notifies all threads waiting on this action that the action occurred. Here it would suffice to just notify() which only signals one thread.

Here is the code:



public class WaitNotifyList {
//Guarded by this
List
holder;


//Final
final int MAX;

public WaitNotifyList(int iSize) {
holder=new LinkedList();
MAX=iSize;
}
//produce
public synchronized void put(int elem) {
while(holder.size()==MAX) {
try {
this.wait();
}
catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

holder.add(elem);


this.notifyAll();


}


public synchronized int take() {
while(holder.isEmpty()) {
try {
this.wait();
}
catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

int res= holder.remove(0);

this.notifyAll();

return res;

}

public static void main(String[] args) {
WaitNotifyList
wn = new WaitNotifyList(5);



Thread
p1=new Thread(() -> {
for(int i=0; i< 20; i++) {
wn.put(i);
System.
out.format("Produced %d \n", i);
}
});
p1.start();
Thread
c1=new Thread(() -> {
for(int i=0; i< 20; i++) {
int res=wn.take();
System.
out.format("Consumed %d \n", res);
}
});
c1.start();


}

}




For this example instead of a Q we are using a
List data structure of type LinkedList. Like I said any data structure that allows you to hold data and access the bottom or 0th element and allows you to add to the top of the structure is fine so List Q or array.

If you wanted to improve the performance you could just use a standard array in Java this means you don't have the overhead of object creation or any overhead in the LinkedList methods that do the adding and removing. However if we use an array we must manage the put and take pointers so we know where the put and take are in the array. The reason we prefer built in data structures like ArrayList or LinkedList is because they handle that low level work for us.

So lets convert this to use an array like so:



//Prefer ArrayBlockingQueue
class WaitNotifyArray {
//Guarded by this
int arr[];
int count, put, take;


public WaitNotifyArray(int iSize) {
arr= new int[iSize];
}
//produce
public synchronized void put(int elem) {
while(count==arr.length) {
try {
this.wait();
}
catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

arr[put]=elem;
if(++put==arr.length) put=0;
count++;

this.notifyAll();


}


public synchronized int take() {
while(count==0) {
try {
this.wait();
}
catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

int res = arr[take];
if(++take==arr.length) take=0;
count--;

this.notifyAll();

return res;

}


public static void main(String[] args) {
WaitNotifyArray
wn = new WaitNotifyArray(5);

Thread
p1=new Thread(() -> {
for(int i=0; i< 20; i++) {
wn.put(i);
System.
out.format("Produced %d \n", i);
}
});
p1.start();
Thread
c1=new Thread(() -> {
for(int i=0; i< 20; i++) {
int res=wn.take();
System.
out.format("Consumed %d \n", res);
}
});
c1.start();

}


}





This is a bit faster but only marginally and it shows how you can use different structures but still maintain the same effect.

As of Java 8 there is a much simpler way to perform the notification and that is to use a Java
Condition. A Condition object is always implicitly bound to a lock so you must create the condition from the lock. The lock will now be bound to the condition await() and signal() which is similar to the wait() and notifyAll().
Here we are explicitly doing lock() and unlock() rather than a synchronized method and using finally to guarantee the lock is unlocked().

Here is the code:



//Prefer ArrayBlockingQueue
class BoundedBufferUsingConditions {
//Instead of Wait Notify lets use a Lock and some conditions
final Lock lock = new ReentrantLock();

//Condition is an interface
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();

//Guarded by this
int arr[];
int count, put, take;


public BoundedBufferUsingConditions(int iSize) {
arr= new int[iSize];
}
//produce
public void put(int elem) {
lock.lock();
try {

while(count==arr.length) {
try {
//this.wait();
notFull.await();
}
catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

arr[put]=elem;
if(++put==arr.length) put=0;
count++;

//this.notifyAll();
notEmpty.signal();
}
finally { lock.unlock(); }

}


public int take() {
lock.lock();
try {


while(count==0) {
try {
//this.wait();
notEmpty.await();
}
catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

int res = arr[take];
if(++take==arr.length) take=0;
count--;

//this.notifyAll();
notFull.signal();

return res;

}
finally { lock.unlock(); }

}


public static void main(String[] args) {
BoundedBufferUsingConditions
wn = new BoundedBufferUsingConditions(5);

Thread
p1=new Thread(() -> {
for(int i=0; i< 20; i++) {
wn.put(i);
System.
out.format("Produced %d \n", i);
}
});
p1.start();
Thread
c1=new Thread(() -> {
for(int i=0; i< 20; i++) {
int res=wn.take();
System.
out.format("Consumed %d \n", res);
}
});
c1.start();


}


}





People who enjoyed this article also enjoyed the following:


Thread Safe Java Singleton
Create a Statistics Distribution
Naive Bayes classification AI algorithm
K-Means Clustering AI algorithm
Equity Derivatives tutorial
Fixed Income tutorial


And the following Trails:

C++
Java
python
Scala
Investment Banking tutorials

HOME
homeicon




By clicking Dismiss you accept that you may get a cookie that is used to improve your user experience and for analytics.
All data is anonymised. Our privacy page is here =>
Privacy Policy
This message is required under GDPR (General Data Protection Rules ) and the ICO (Information Commissioners Office).