Condition variables
Let's imagine that somehow, we had a way through which we could tell our Thread-1 to wait until Thread-2 has made some data available for consumption. This is exactly what condition variables allow us to do. They allow us to synchronize two threads that depend on a shared resource. To understand more about this, let's take a look at the following code sample, which creates two threads, one that feeds in the email ID and another thread that is responsible for sending the emails:
# condition_variable.py
import threading
class EmailQueue(threading.Thread):
def __init__(self, email_queue, max_items, condition_var):
threading.Thread.__init__(self)
self.email_queue = email_queue
self.max_items = max_items
self.condition_var = condition_var
self.email_recipients = []
def add_recipient(self, email):
self.email_recipients.append(email)
def run(self):
while True:
self.condition_var.acquire()
if len(self.email_queue) == self.max_items:
print("E-mail queue is full. Entering wait state...")
self.condition_var.wait()
print("Received consume signal. Populating queue...")
while len(self.email_queue) < self.max_items:
if len(self.email_recipients) == 0:
break
email = self.email_recipients.pop()
self.email_queue.append(email)
self.condition_var.notify()
self.condition_var.release()
class EmailSender(threading.Thread):
def __init__(self, email_queue, condition_var):
threading.Thread.__init__(self)
self.email_queue = email_queue
self.condition_var = condition_var
def run(self):
while True:
self.condition_var.acquire()
if len(self.email_queue) == 0:
print("E-mail queue is empty. Entering wait state...")
self.condition_var.wait()
print("E-mail queue populated. Resuming operations...")
while len(self.email_queue) is not 0:
email = self.email_queue.pop()
print("Sending email to {}".format(email))
self.condition_var.notify()
self.condition_var.release()
queue = []
MAX_QUEUE_SIZE = 100
condition_var = threading.Condition()
email_queue = EmailQueue(queue, MAX_QUEUE_SIZE, condition_var)
email_sender = EmailSender(queue, condition_var)
email_queue.start()
email_sender.start()
email_queue.add_recipient("joe@example.com")
In this code example, we defined two classes, namely, EmailQueue, which plays the role of producer and populates the email queue with email addresses on which the email needs to be sent. Then there is another class, EmailSender, which plays the role of the consumer and consumes the email addresses from the email queue and sends a mail to them.
Now, inside the __init__ method of EmailQueue, we take in a Python list that we will use as a queue as a parameter, a variable defining how many items the list should hold at most, and a condition variable.
Next, we have a method, add_recipient, which appends a new email ID inside an internal data structure of the EmailQueue to hold the email addresses temporarily until they are added to the sending queue.
Now, let's move inside the run()method where the actual magic happens. First, we start an infinite loop to keep the thread in always running mode. Next, we acquire a lock by calling the acquire()method of the condition variable. We do this so as to prevent any kind of corruption of our data structures if the thread switches the context at an unexpected time.
Once we have acquired the lock, we then check whether our email queue is full or not. If it is full, we print a message and make a call to the wait()method of the condition variable. The call to the wait()method releases the lock acquired by the condition variable and makes the thread enter a blocking state. This blocking state will be over only when a notify() method is called on the condition variable. Now, when the thread receives a signal through notify(), it continues its operations, in which it first checks whether it has some data in the internal queue. If it finds some data in the internal queue, then it populates the email queue with that data and calls the notify()method of the conditional variable to inform the EmailSender consumer thread. Now, let's take a look at the EmailSender class.
Without going through every single line here, let's keep our focus on the run()method of the EmailSender class. Since this thread needs to always be running, we first start an infinite loop to do that. Then, the next thing we do is, acquire a lock on the shared condition variable. Once we have acquired the lock, we are now ready to manipulate the shared email_queue data structure. So, the first thing our consumer does is, check whether the email queue is empty or not. If it finds the queue to be empty, our consumer will call the wait()method of the condition variable, effectively causing it to release the lock and go into a blocking state until there is some data inside the email queue. This causes the transfer of control to the EmailQueue class, which is responsible for populating the queue.
Now, once the email queue has some email IDs in it, the consumer will start sending the mails. Once it exhausts the queue, it signals the EmailSender class about that by calling the condition variables notify method. This will allow the EmailSender to continue its operation of populating the email queue.
Let's take a look at what happens when we try to execute the previous example program:
python condition_variable.py
E-mail queue is empty. Entering wait state...
E-mail queue populated. Resuming operations...
Sending email to joe@example.com
E-mail queue is empty. Entering wait state...
With this example, we now have an understanding of how condition variables can be used in Python to solve producer-consumer problems. With this knowledge in mind, now let's take a look at some of the issues that can may arise when performing multithreading in our applications.