CPU (Central Processing Unit): A piece of hardware in a computer that executes binary code.
OS (Operating System): A software that schedules when programs can use the CPU.
Process: A program that is being executed.
Thread: Part of a process.
Motivation
"Blocking" is where a thread is stuck, waiting for a something to finish so it can complete its function.
When single-threaded apps get blocked, it results in poor user experience and slower overall execution time.
Multi-threaded apps can do more than one function "at the same time" (not really, but it appears that way).
While one thread is blocked, other threads can continue their execution.
The Problem with Single Thread
Consider the following code snippet:
import timedefmyfunc():print('hello')# time.sleep(10) simulates that the thread gets blocked, such as heavy I/O.# The process will be stuck here for 10 seconds.# During this time, we can't do anything. time.sleep(10)returnTrueif__name__=='__main__':myfunc()
During time.sleep(10), we should let the CPU do some other work. This introduces the idea of multithreading.
Two Threads
Consider the following code snippet:
import timeimport threading defmyfunc(name):print(f'myfunc started with {name}') time.sleep(10)print('myfunc ended')if__name__=='__main__':print('main started') t = threading.Thread(target=myfunc, args=['ret2basic']) t.start()print('main ended')
Output:
This ensure that the main thread executes without waiting for the myfuncthread.
Daemon Thread
Daemon is like a background process. The main difference between a regular thread and a daemon thread is that the main thread will not wait for daemon threads to complete before exiting. Consider the following code snippet:
import timeimport threading defmyfunc(name):print(f'myfunc started with {name}') time.sleep(10)print('myfunc ended')if__name__=='__main__':print('main started')# Pay attention to the `daemon=True` option.# The main thread will not wait for a daemon thread. t = threading.Thread(target=myfunc, args=['ret2basic'], daemon=True) t.start()print('main ended')
Output:
Using daemon thread is bad in this case since the myfunc thread did not complete its work before the main thread exits.
Joining Threads
The join() method to bring all your threads together before the main thread exits. From Python documentation:
Consider the following code snippet:
import timeimport threading defmyfunc(name):print(f'myfunc started with {name}') time.sleep(10)print('myfunc ended')if__name__=='__main__':print('main started') t = threading.Thread(target=myfunc, args=['ret2basic']) t.start()# Pay attention to this .join() method.# At t.join(), the main thread is going to wait for thread `t` to finish. t.join()print('main ended')
Output:
Multiple Threads
Consider the following code snippet:
import timeimport threading defmyfunc1(name):print(f'myfunc1 started with {name}') time.sleep(10)print('myfunc1 ended')defmyfunc2(name):print(f'myfunc2 started with {name}') time.sleep(10)print('myfunc2 ended')defmyfunc3(name):print(f'myfunc3 started with {name}') time.sleep(10)print('myfunc3 ended')if__name__=='__main__':print('main started') t1 = threading.Thread(target=myfunc1, args=['ret2basic']) t1.start() t2 = threading.Thread(target=myfunc2, args=['foo']) t2.start() t3 = threading.Thread(target=myfunc3, args=['bar']) t3.start() t1.join() t2.join() t3.join()print('main ended')
Output:
Thread Pool
The code from the "Multiple Threads" section can be refactored using concurrent.futures.ThreadPoolExecutor(). From Python documentation:
Consider the following code snippet:
import concurrent.futuresimport timedefmyfunc(name):print(f'myfunc started with {name}') time.sleep(10)print(f'myfunc ended with {name}')if__name__=='__main__':print('main begins')with concurrent.futures.ThreadPoolExecutor(max_workers=3)as e:# This is equivalent to:# myfunc('ret2basic'), and# myfunc('foo'), and# myfunc('bar') e.map(myfunc, ['ret2basic', 'foo', 'bar'])print('main ended')
Output:
Race Conditions
A race condition happens when more than one thread is trying to access a shared piece of data at the same time. Learn more:
import concurrent.futuresimport timeclassAccount:def__init__(self):# Shared data self.balance =100defupdate(self,transaction,amount):print(f'{transaction} thread updating...') local_copy = self.balance local_copy += amount time.sleep(1) self.balance = local_copyprint(f'{transaction} thread finishing...')if__name__=='__main__': account =Account()print(f'starting with balance of {account.balance}')with concurrent.futures.ThreadPoolExecutor(max_workers=2)as ex:for transaction, amount in [('deposit',50), ('withdrawal',-150)]:# This is equivalent to:# account.update('deposit', 50), and# account.update('withdrawal', -150) ex.submit(account.update, transaction, amount)print(f'ending balance of {account.balance}')
Output:
Here the deposit thread created a copy of self.balance and the withdrawl thread created another copy of self.balance. We want the result to be 0, but the actual result is either -50 or 150, depending on which thread overwrites self.balance right before the program terminates. This is no good in this case, therefore we need lock to protect our shared data.
Lock
Suppose we have a lock object called self.lock, then:
self.lock.acquire(): Lock
self.lock.release(): Unlock
Or just use with self.lock
The code between the acquire() and release() methods are executed atomically so that there is no chance that a thread will read a non-updated version after another thread has already made a change.
Consider the following code snippet:
import concurrent.futuresimport timeimport threadingclassAccount:def__init__(self): self.balance =100 self.lock = threading.Lock()defupdate(self,transaction,amount):print(f"{transaction} thread updating...")# Using `with` statement is easier than using acquire() and release()with self.lock: local_copy = self.balance local_copy += amount time.sleep(1) self.balance = local_copyprint(f"{transaction} thread finishing...")if__name__=='__main__': account =Account()print(f"starting with balance of {account.balance}")with concurrent.futures.ThreadPoolExecutor(max_workers=2)as ex:for transaction, amount in [('deposit',50), ('withdrawal',-150)]: ex.submit(account.update, transaction, amount)print(f"ending balance of {account.balance}")
Output:
This result is just what we want.
Deadlock and RLock
If you wrote lock.acquire() and forgot to do lock.release(), the lock becomes a deadlock. For example, if we lock twice, the lock becomes a deadlock:
The solution to this problem is using RLock. A Reentrant Lock (RLock) is a synchronization primitive that may be acquired multiple times by the same thread. Internally, it uses the concepts of "owning thread" and "recursion level" in addition to the locked/unlocked state used by primitive locks. In the locked state, some thread owns the lock; in the unlocked state, no thread owns it. For example:
import threadingrlock = threading.RLock()rlock.acquire()rlock.acquire()# No deadlock!rlock.release()# Locked, count=1print(rlock)print(threading.current_thread())print('------------------------------')rlock.release()# Unlockedprint(rlock)print(threading.current_thread())
Output:
The Producer-Consumer Pipeline
import randomimport concurrent.futuresimport timeimport threading# EOF tagFINISH ='THE END'classPipeline:def__init__(self,capacity):# Maximum number of messages allowed in the pipeline self.capacity = capacity# Shared data self.message =None self.producer_pipeline = [] self.consumer_pipeline = [] self.producer_lock = threading.Lock() self.consumer_lock = threading.Lock() self.consumer_lock.acquire()defset_message(self,message):"""Put a message on the pipeline."""print(f"producing message '{message}'") self.producer_pipeline.append(message) self.producer_lock.acquire() self.message = message # Shared data self.consumer_lock.release()defget_message(self):"""Grab a message from the pipeline."""print(f"consuming message '{self.message}'") self.consumer_lock.acquire() message = self.message # Share data self.producer_lock.release() self.consumer_pipeline.append(message)return messagedefproducer(pipeline):"""Put messages on the pipeline."""for _ inrange(pipeline.capacity): message = random.randint(1, 100) pipeline.set_message(message)# EOF tag pipeline.set_message(FINISH)defconsumer(pipeline):"""Grab messages from the pipeline.""" message =Nonewhile message != FINISH: message = pipeline.get_message()if message != FINISH:# Simulate an I/O or network operation# random.random() returns a random number between 0 and 1 time.sleep(random.random())if__name__=='__main__':# Create a pipeline than can hold 10 messages pipeline =Pipeline(10)with concurrent.futures.ThreadPoolExecutor(max_workers=2)as ex: ex.submit(producer, pipeline) ex.submit(consumer, pipeline)print(f"producer: {pipeline.producer_pipeline}")print(f"consumer: {pipeline.consumer_pipeline}")
Output:
The queue Module
In this section, we are going to refactor the producer-consumer pipeline using the queue module and threading events.
A queue can be declared using queue.Queue(maxsize=0), where maxsize is is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize <= 0, the queue size is infinite. A queue supports the following operations:
Queue.put(item, block=True, timeout=None)
Put item into the queue.
If optional args block is true and timeout is None (the default), block if necessary until a free slot is available.
If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time.
Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).
Queue.get(block=True, timeout=None)
Remove and return an item from the queue.
If optional args block is true and timeout is None (the default), block if necessary until an item is available.
If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time.
Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).
Queue.qsize()
Return the approximate size of the queue.
Note, qsize() > 0 doesn't guarantee that a subsequent get() will not block, nor will qsize() < maxsize guarantee that put() will not block.
Threading event replaces the lock mechanism. From Python documentation:
The set() method is equivalent to the adhoc FINISH = 'THE END' flag we invented in the producer-consumer pipeline program. Here is the refactored program:
import randomimport concurrent.futuresimport timeimport threadingimport queueclassPipeline(queue.Queue):def__init__(self):# Inherit the `queue.Queue` class.super().__init__(maxsize=20) self.producer_pipeline = [] self.consumer_pipeline = []defset_message(self,message):"""Put a message on the pipeline."""print(f"producing message '{message}'") self.producer_pipeline.append(message) self.put(message)defget_message(self):"""Grab a message from the pipeline.""" message = self.get()print(f"consuming message '{message}'") self.consumer_pipeline.append(message)return messagedefproducer(pipeline,event):"""Put messages on the pipeline."""whilenot event.is_set(): message = random.randint(1, 100) pipeline.set_message(message)defconsumer(pipeline,event):"""Grab messages from the pipeline."""whilenot pipeline.empty()ornot event.is_set():print(f"queue size is {pipeline.qsize()}") message = pipeline.get_message() time.sleep(random.random())if__name__=='__main__': pipeline =Pipeline() event = threading.Event()with concurrent.futures.ThreadPoolExecutor(max_workers=2)as executor: executor.submit(producer, pipeline, event) executor.submit(consumer, pipeline, event) time.sleep(0.5)# This sets the internal flag to True event.set()print(f"producer: {pipeline.producer_pipeline}")print(f"consumer: {pipeline.consumer_pipeline}")
Semaphore Objects
Lock and RLock only allows one thread to work at a time, but sometimes we want multiple threads to work at a time. For example, allow 10 members to access the database but only 4 members are allowed to access network connection. In such case, we need semaphore.
Semaphore can be used to limit the access to the shared resources with limited capacity. From Python documentation:
The following code demonstrates the usage of semaphore as counter: