Blocking Queue in C++ using a condition variable
Filed in: C++
If you are looking for how to do a Blocking Q in Java please follow this link here.
A blocking queue is simply a class with an encapsulated q or similar data structure with a put and take method that never puts onto a q if the q is full and it never takes from a q if the q is empty. In summary we have two invariant conditions that must be satisfied.
Invariant 1. We must never put onto a full q so if q is full we must wait for the take to remove from the q
Invariant 2. We must never take from an empty q if q is empty we must wait for it to be non empty
So the question becomes how do we manage the invariants and more importantly how do we solve these two problems that arise from maintaining the invariants.
Notification 1. When we do a take how do we notify the put that q is no longer full because we took something from it.
Notification 2. When we do a put how do we notify the take that the q is no longer empty
So you can see from above we have some invariants and we need to do some notifications.
Since C++11 you can use a condition_variable from the std library. to use this you must lock the mutex via lock_guard and then provide a notify_one or notify_all.
For this example we create a queue and then we will only allow 5 items on the queue at a time. This is done by using the MAX variable which we check and then wait on the condition.
Note how we first take the lock or mutex via lock_guard then we await on the condition and the condition here is provided by a lambda either !empty for take or count is not 5. It took me a while to work out that the condition actually awaits if condition is false but not if it is true. So be warned as this can trip you up.
In order to know if the queue has 5 in it we use the count variable which is guarded by the mutex via lock_guard.
template<typename T>
class BlockingQueue
{
private:
std::mutex mut;
std::queue private_std_queue;
std::condition_variable condNotEmpty;
std::condition_variable condNotFull;
int count; // Guard with Mutex
const int MAX{5};
public:
void put(T new_value)
{
std::unique_lock<std::mutex> lk(mut);
//Condition takes a unique_lock and waits given the false condition
condNotFull.wait(lk,[this]{
if (count== MAX) {
return false;
}else{
return true;
}
});
private_std_queue.push(new_value);
count++;
condNotEmpty.notify_one();
}
void take(T& value)
{
std::unique_lock<std::mutex> lk(mut);
//Condition takes a unique_lock and waits given the false condition
condNotEmpty.wait(lk,[this]{return !private_std_queue.empty();});
value=private_std_queue.front();
private_std_queue.pop();
count--;
condNotFull.notify_one();
}
};
BlockingQueue<int> data_queue;
int main(){
std::vector<int> vres; // Capture results of consume
std::thread p = std::thread([](){
for(int i=0; i<20; i++){
data_queue.put(i);
}
});
std::thread c = std::thread([&vres](){
for(int i=0; i<20; i++){
//To prove thsi works you can slow down the consumer as follows
//if((i%5)==0) std::this_thread::sleep_for(std::chrono::milliseconds(1000) );
//Then return the count in pop so that you see what count is.
int res; //will just discard
data_queue.take(res);
//vres is not thread safe but we are the only ones using it so OK for demo only
vres.push_back(res);
}
});
c.join(); // join main tread to this thread so it can finish its work
p.join(); // join main tread to this thread so it can finish its work
for(int n :vres)
std::cout << "Consumed " << n << std::endl;
return 0;
}
C++ Asynch future promise package_task and shared_future
Blocking Queue in C++ using condition variable
Creating C++ threads
Low Latency Java using CAS and LongAdder
Naive Bayes classification AI algorithm
K-Means Clustering AI algorithm
Equity Derivatives tutorial
Fixed Income tutorial
Java
python
Scala
Investment Banking tutorials
HOME

A blocking queue is simply a class with an encapsulated q or similar data structure with a put and take method that never puts onto a q if the q is full and it never takes from a q if the q is empty. In summary we have two invariant conditions that must be satisfied.
Invariant 1. We must never put onto a full q so if q is full we must wait for the take to remove from the q
Invariant 2. We must never take from an empty q if q is empty we must wait for it to be non empty
So the question becomes how do we manage the invariants and more importantly how do we solve these two problems that arise from maintaining the invariants.
Notification 1. When we do a take how do we notify the put that q is no longer full because we took something from it.
Notification 2. When we do a put how do we notify the take that the q is no longer empty
So you can see from above we have some invariants and we need to do some notifications.
Since C++11 you can use a condition_variable from the std library. to use this you must lock the mutex via lock_guard and then provide a notify_one or notify_all.
For this example we create a queue and then we will only allow 5 items on the queue at a time. This is done by using the MAX variable which we check and then wait on the condition.
Note how we first take the lock or mutex via lock_guard then we await on the condition and the condition here is provided by a lambda either !empty for take or count is not 5. It took me a while to work out that the condition actually awaits if condition is false but not if it is true. So be warned as this can trip you up.
In order to know if the queue has 5 in it we use the count variable which is guarded by the mutex via lock_guard.
template<typename T>
class BlockingQueue
{
private:
std::mutex mut;
std::queue
std::condition_variable condNotEmpty;
std::condition_variable condNotFull;
int count; // Guard with Mutex
const int MAX{5};
public:
void put(T new_value)
{
std::unique_lock<std::mutex> lk(mut);
//Condition takes a unique_lock and waits given the false condition
condNotFull.wait(lk,[this]{
if (count== MAX) {
return false;
}else{
return true;
}
});
private_std_queue.push(new_value);
count++;
condNotEmpty.notify_one();
}
void take(T& value)
{
std::unique_lock<std::mutex> lk(mut);
//Condition takes a unique_lock and waits given the false condition
condNotEmpty.wait(lk,[this]{return !private_std_queue.empty();});
value=private_std_queue.front();
private_std_queue.pop();
count--;
condNotFull.notify_one();
}
};
BlockingQueue<int> data_queue;
int main(){
std::vector<int> vres; // Capture results of consume
std::thread p = std::thread([](){
for(int i=0; i<20; i++){
data_queue.put(i);
}
});
std::thread c = std::thread([&vres](){
for(int i=0; i<20; i++){
//To prove thsi works you can slow down the consumer as follows
//if((i%5)==0) std::this_thread::sleep_for(std::chrono::milliseconds(1000) );
//Then return the count in pop so that you see what count is.
int res; //will just discard
data_queue.take(res);
//vres is not thread safe but we are the only ones using it so OK for demo only
vres.push_back(res);
}
});
c.join(); // join main tread to this thread so it can finish its work
p.join(); // join main tread to this thread so it can finish its work
for(int n :vres)
std::cout << "Consumed " << n << std::endl;
return 0;
}
People who enjoyed this article also enjoyed the following:
C++ Asynch future promise package_task and shared_future
Blocking Queue in C++ using condition variable
Creating C++ threads
Low Latency Java using CAS and LongAdder
Naive Bayes classification AI algorithm
K-Means Clustering AI algorithm
Equity Derivatives tutorial
Fixed Income tutorial
And the following Trails:
C++Java
python
Scala
Investment Banking tutorials
HOME
