ctfnote
  • /home/ret2basic.eth
  • Game Hacking
    • ✅C++
    • Ghidra
    • Cheat Engine
    • Proxy
    • DLL injection
    • Keygen
    • Aimbot
  • Web3 Security Research
    • 👑Web3 Security Research Trivia
    • ✅Solidity
      • ✅Mastering Ethereum
      • ✅Storage
      • ✅Memory
      • ✅Calldata
      • ✅ABI
    • ✅Foundry
      • ✅Introduction
      • ✅How to Write Basic Tests
      • ✅Set Soliditiy Compiler Version
      • ✅Remappings
      • ✅Auto Format Code
      • ✅Console Log
      • ✅Authentication
      • ✅Error
      • ✅Event
      • ✅Time
      • ✅Send ETH
      • ✅Signature
      • ✅Fork
      • ✅Mint 1 Million DAI on Mainnet Fork
      • ✅FFI
      • ✅Fuzz
      • ✅Invariant Testing - Part 1
      • Invariant Testing - Part 2
      • Invariant Testing - Part 3
      • Differential Test
    • ✅Secureum
      • ✅Epoch 0
        • ✅Slot 1: Ethereum 101
          • ✅Notes
          • ✅Ethereum Whitepaper
          • ✅Extra Study: What happens when you send 1 DAI
          • ✅Quiz
        • ✅Slot 2: Solidity 101
          • ✅Notes
          • ✅OpenZeppelin ERC20
          • ✅OpenZeppelin ERC721
          • ✅OpenZeppelin Ownable
          • ✅OpenZeppelin Pausable
          • ✅OpenZeppelin ReentrancyGuard
          • ✅Quiz
        • ✅Slot 3: Solidity 201
          • ✅Notes
          • ✅OpenZeppelin SafeERC20
          • ✅OpenZeppelin ERC-777
          • ✅OpenZeppelin ERC-1155
          • ✅OpenZeppelin ERC-3156
          • ✅OpenZeppelin - Proxy Upgrade Pattern
          • ✅Quiz
        • ✅Slot 4: Pitfalls and Best Practices 101
          • ✅Notes
          • ✅Intro to Security First Development
          • ✅Quiz
        • ✅Slot 5: Pitfalls and Best Practices 201
          • ✅Notes
          • So you want to use a price oracle
          • The Dangers of Surprising Code
          • ✅Quiz
        • ✅Slot 6: Auditing Techniques & Tools 101
          • ✅Notes
          • ✅Quiz
        • ✅Slot 7: Audit Findings 101
          • Notes
          • ✅Fei Protocol - ConsenSys
          • ✅Uniswap V3 - Trail of Bits
          • ✅Chainlink - Sigma Prime
          • ✅Opyn Gamma - OpenZeppelin
          • ✅Quiz
        • ✅Slot 8: Audit Findings 201
          • Notes
          • 1inch Liquidity - Consensus
          • Original Dollar - Trail of Bits
          • Synthetix EtherCollateral - Sigma Prime
          • Holdefi - OpenZeppelin
          • Quiz
      • ✅Epoch ∞
        • ✅RACE #4 - ERC20 Implementation
        • ✅RACE #5 - ERC1155 Implementation
        • ✅RACE #6 - ERC721 Application
        • ✅RACE #7 - Bored Ape
        • ✅RACE #8 - ERC721 Roles
        • ✅RACE #9 - Proxy
        • ✅RACE #10 - Test Cases
        • ✅RACE #11 - Staking
        • ✅RACE #12 - ERC20 Permit
        • ✅RACE #13 - ERC20 with Callback
        • ✅RACE #14 - Lending
        • ✅RACE #15 - DEX
        • ✅RACE #16 - Flash Loan
        • ✅RACE #17
    • DeFi
      • Glossary
        • TWAP vs. VWAP
        • Tranches
      • DeFi MOOC
        • Lecture 2: Introduction to Blockchain Technologies
        • Lecture 5: DEX
        • Lecture 6: Decentralized Lending
        • Lecture 10: Privacy on the Blockchain
        • Lecture 12: Practical Smart Contract Security
        • Lecture 13: DeFi Security
      • Uniswap V2
      • Compound V3
        • ✅Whitepaper
        • ✅Interacting with Compound
          • ✅Supply and Redeem
          • ✅Borrow and Repay
          • ✅Liquidation
          • ✅Long and Short
        • ✅Interest Model
        • CToken
      • Aave
      • Chainlink
        • ✅Getting Started
        • ✅Data Feeds
        • ✅VRF
      • Optimism
        • Bedrock
      • LayerZero
      • Opensea
        • Seaport
    • EVM
      • ✅Andreas Antonopoulos - The Ethereum Virtual Machine
      • ✅Program The Blockchain - Smart Contract Storage
      • ✅EVM Codes - EVM Playground for Opcodes
      • ✅Fvictorio - EVM Puzzles
      • ✅Daltyboy11 - More EVM Puzzles
      • ✅EVM Through Huff
      • Noxx - EVM Deep Dives
      • ✅Jordan McKinney - EVM Explained
      • Openzepplin - Deconstructing a Solidity Contract
      • Jeancvllr - EVM Assembly
      • Peter Robinson - Solidity to Bytecode, Memory & Storage
      • Marek Kirejczyk - Ethereum Under The Hood
      • ✅Official Solidity Docs
      • Dissecting EVM using go-ethereum Eth client implementation - deliriusz.eth
    • Vulnerabilities
      • Rounding Issues
        • Kyberswap
      • Bridges
      • Governance / Voting Escrows
      • Bizzare Bug Classes
        • TIME - ERC2771Context + Multicall calldata manipulation
    • Fancy Topics
      • Vulnerabilities SoK
        • ✅Demystifying Exploitable Bugs in Smart Contracts
        • Blockchain Hacking Techniques 2022 Top 10 - Todo
      • yAcademy
        • Proxies
          • yAcademy - Proxy Basics
          • yAcademy - Proxies Deep Dive
          • yAcademy - Security Guide to Proxy Vulns
        • defi-fork-bugs
      • Spearbit
        • ✅Community Workshop: Riley Holterhus
        • Economic Security with fmrmf
        • Numerical Analysis for DeFi Audits: A TWAMM Case Study by Kurt Barry
  • Red Teaming
    • ✅Enumeration
      • Service Enumeration
        • SMTP (Port 25)
        • Samba (Port 139, 445)
        • SNMP (Port 161,162,10161,10162)
        • rsync (Port 873)
        • NFS (Port 2049)
        • Apache JServ Protocol (Port 8081)
        • NetBIOS
      • Nmap
      • Gobuster / Feroxbuster / FUFF / Wfuzz
      • Drupal
    • ✅Exploitation
      • Public Exploits
      • PHP Webshells
      • Reverse Shell
      • TTY
      • File Transfer
      • Metasploit
      • Password Spray
    • ✅Buffer Overflow
      • Step 0: Spiking (Optional)
      • Step 1: Fuzzing
      • Step 2: Finding the Offset
      • Step 3: Overwriting the EIP
      • Step 4: Finding Bad Characters
      • Step 5: Finding the Right Module
      • Step 6: Generating Shellcode and Gaining Root
    • ✅Privilege Escalation
      • Linux Privilege Escalation
        • Linux Permissions
        • Manual Enumeration
        • Automated Tools
        • Kernel Exploits
        • Passwords and File Permissions
        • SSH Keys
        • Sudo
        • SUID
        • Capabilities
        • Cron Jobs
        • NFS Root Squashing
        • Docker
        • GNU C Library
        • Exim
        • Linux Privilege Escalation Course Capstone
      • Windows Privilege Escalation
        • Manual Enumeration
        • Automated Tools
        • Kernel Exploits
        • Passwords and Port Forwarding
        • WSL
        • Token Impersonation and Potato Attacks
        • Meterpreter getsystem
        • Runas
        • UAC Bypass
        • Registry
        • Executable Files
        • Startup Applications
        • DLL Hijacking
        • Service Permissions (Paths)
        • CVE-2019-1388
        • HiveNightmare
        • Bypass Space Filter
    • ✅Post Exploitation
      • Linux Post Exploitation
        • Add a User
        • SSH Key
      • Windows Post Exploitation
        • windows-resources
        • Add a User
        • RDP
    • ✅Pivoting
      • Windows: Chisel
      • Linux: sshuttle
    • Active Directory (AD)
      • Initial Compromise
        • HTA Phishing
        • VBA Macro Phishing
        • LLMNR Poisoning
        • SMB Relay
        • GPP / cPassword
      • Domain Enumeration
        • Manual Enumeration
        • PowerView
        • BloodHound
      • Lateral Movement
        • PsExec
        • WMI
        • Runas
        • Pass the Hash
        • Overpass the Hash
        • Pass the Ticket
      • Kerberos
        • Kerberoast
        • AS-REP Roast
      • MS SQL Server
    • Command & Control (C2)
      • Cobalt Strike
        • Bypassing Defences
          • Artifact Kit
          • Resource Kit
          • AMSI Bypass
          • PowerPick
        • Extending Cobalt Strike
          • Elevate Kit
          • Malleable C2 Profile
      • Metasploit
        • Payloads
        • Post Exploitation
        • Automation
      • C2 Development
    • Malware Development
      • "Hot Dropper"
      • PE Format
        • Overview
      • Process Injection
      • Reflective DLL
      • x86 <=> x64
      • Hooking
      • VeraCry
      • Offensive C#
      • AV Evasion
        • AV Evasion with C# and PowerShell
        • AMSI Bypass
  • Cryptography
    • Hash Functions
    • MAC
    • AES
      • Byte at a Time
      • CBC CCA
      • CBC Bit Flipping
      • CBC Padding Oracle
    • Diffie-Hellman
    • RSA
      • Prime Factors
      • Multiple Ciphertexts
      • Low Public Exponent
      • Low Private Exponent
    • ECC
    • Digital Signature
    • JWT
    • PRNG
    • SSL/TLS
    • Research
      • ✅Lattice-based Cryptography (Lattice)
      • Elliptic Curve Cryptography (ECC)
      • Oblivious Transfer (OT)
      • Secure Multi-party Computation (MPC)
      • Learning with Error (LWE)
      • Fully Homomorphic Encryption (FHE)
      • Zero Knowledge Proof (ZKP)
      • Oblivious RAM (ORAM)
  • Computer Science
    • Linux
      • Setup
      • curl
      • Hard Link vs. Symlink
      • Man Page
      • /dev/null
    • Python
      • New Features
      • Operators, Expressions, and Data Manipulation
      • Program Structure and Control Flow
      • Objects, Types, and Protocols
      • Functions 101
      • Generators
      • Classes and Object-Oriented Programming
      • Memory Management
      • Concurrency and Parallelism
        • Multithreading and Thread Safety
        • Asynchronization
        • Multiprocessing
        • Global Interpreter Lock (GIL)
      • Built-in Functions and Standard Library
        • import collections
        • import itertools
        • import sys
        • import re
        • import pickle
        • import json
      • Third-party Library
        • from pwn import *
        • import requests
        • from bs4 import BeautifulSoup
        • from scapy.all import *
        • py2exe
    • HTML, CSS, JavaScript, and React
      • HTML
      • CSS
      • JavaScript
        • var vs. let
        • Objects
        • Arrays
        • Functions
        • Modules
        • Asynchronous JavaScript
      • React
    • Data Structures and Algorithms
      • Binary Search
    • The Linux Programming Interface
      • Processes
        • Memory Allocation
        • The Process API
        • Process Creation
        • Process Termination
        • Monitoring Child Processes
        • Program Execution
      • Signals
      • Threads
        • Thread Synchronization
        • Thread Safety and Pre-Thread Storage
      • IPC
        • Pipes and FIFOs
        • Memory Mappings
        • Virtual Memory Operations
      • Sockets
    • Computer Systems
      • Hexadecimal
      • Signedness
      • Registers
      • Instructions
      • Syscall
      • Process Memory
      • Stack Frame
      • Preemptive Multitasking
      • IPC
      • Threads
    • Databases
      • MySQL
        • Basic Syntax
        • Data Types
        • Modifying Tables
        • Duplicating and Deleting
        • SELECT
        • Transaction
      • GraphQL
    • Distributed Systems
      • Introduction
        • What is a Distributed System?
        • Design Goals
        • Scaling Techniques
        • Types of Distributed Systems
      • Architecture
        • System Architectures
        • Example Architectures
      • Communication
        • Foundations
        • Remote Procedure Call
        • Message-oriented Communication
      • Coordination
        • Clock Synchronization
        • Logical Clock
      • Consistency and Replication
        • Introduction
        • Data-centric Consistency
        • Client-centric Consistency
    • Static Analysis
      • Intermediate Representation
      • Data Flow Analysis
      • Interprocedural Analysis
      • Pointer Analysis
      • Static Analysis for Security
      • Datalog-Based Program Analysis
      • Soundness and Soundiness
      • CFL-Reachability and IFDS
  • Web
    • ✅Prerequisites
      • OWASP Top 10
        • 1. Broken Access Control
        • 2. Cryptographic Failures
        • 3. Injection
        • 4. Insecure Design
        • 5. Security Misconfiguration
        • 6. Vulnerable and Outdated Components
        • 7. Identification and Authentication Failures
        • 8. Software and Data Integrity Failures
        • 9. Security Logging and Monitoring Failures
        • 10. SSRF
      • HTTP
        • HTTP Status Codes
        • HTTP Headers
      • Burp Suite
        • Burp Intruder
        • Burp Extender
        • Burp Collaborator
      • Information Gathering
        • DNS
        • Git
        • Editor
        • Server
      • Bug Bounty Report Writing
    • File Upload
      • Webshell
      • IIS, Nginx, and Apache Vulnerabilities
      • .htaccess (Apache) / web.config (IIS)
      • Alternate Data Stream
      • Code Review: bWAPP Unrestricted File Upload
    • SQL Injection (SQLi)
      • Cheat Sheet
      • UNION Attacks
      • Examining the Database
      • Blind SQL Injection
      • WAF Bypass
      • Out-Of-Band (OOB)
      • Webshell and UDF
      • sqlmap
        • Code Review: Initialization
        • Code Review: tamper
    • Cross-Site Scripting (XSS)
      • Cheat Sheet
      • Reflected XSS
      • Stored XSS
      • DOM-Based XSS
      • XSS Contexts
      • CSP
    • CSRF and SSRF
      • Client-Side Request Forgery (CSRF)
        • XSS vs. CSRF
        • CSRF Tokens and SameSite Cookies
      • Server-Side Request Forgery (SSRF)
        • Attacks
        • Bypassing Restrictions
        • SSRF + Redis
    • XML External Entities (XXE)
    • Insecure Deserialization
      • Python Deserialization
      • PHP Deserialization
      • Java Deserialization
        • Shiro
        • FastJSON
        • WebLogic
    • HTTP Request Smuggling
    • OS Command Injection
      • Whitespace Bypass
      • Blacklist Bypass
      • Blind OS Command Injection
      • Lab 1: HITCON 2015 BabyFirst
      • Lab 2: HITCON 2017 BabyFirst Revenge
      • Lab 3: HITCON 2017 BabyFirst Revenge v2
    • ✅Directory Traversal
    • HTTP Parameter Pollution
    • Server-Side Template Injection (SSTI)
    • LDAP Injection
    • Redis
      • Authentication
      • RCE
      • Mitigations
  • Pwn
    • Linux Exploitation
      • Protections
      • Shellcoding
        • Calling Convention
        • Null-free
        • Reverse Shell
        • ORW
      • ROP
        • Stack Alignment
        • ret2text
        • ret2syscall
        • ret2libc
        • ret2csu
        • BROP
        • SROP
        • Stack Pivot
      • ptmalloc
        • chunks
        • malloc() and free()
        • bins
        • tcache
      • UAF
      • Race Conditions
        • TOCTTOU
        • Dirty Cow
        • Meltdown
        • Spectre
      • Kernel
      • Appendix: Tools
        • socat
        • LibcSearcher-ng
        • OneGadget
    • Windows Exploitation
      • Classic
      • SEH
      • Egghunting
      • Unicode
      • Shellcoding
      • ROP
      • Appendix: Tools
        • ImmunityDbg
        • Mona.py
    • Fuzzing
      • AFL++
        • Quickstart
        • Instrumentation
        • ASAN
        • Code Coverage
        • Dictionary
        • Parallelization
        • Partial Instrumentation
        • QEMU Mode
        • afl-libprotobuf-mutator
      • WinAFL
      • Fuzzilli
  • Reverse
    • Bytecode
      • Python Bytecode
    • 👑Z3 solver
    • angr
      • angr Template
Powered by GitBook
On this page
  • Overview
  • Topics Covered
  • Terminology
  • Motivation
  • The Problem with Single Thread
  • Two Threads
  • Daemon Thread
  • Joining Threads
  • Multiple Threads
  • Thread Pool
  • Race Conditions
  • Example: Bank Account Program
  • Lock
  • Deadlock and RLock
  • The Producer-Consumer Pipeline
  • The queue Module
  • Semaphore Objects
  • Reference

Was this helpful?

  1. Computer Science
  2. Python
  3. Concurrency and Parallelism

Multithreading and Thread Safety

Multithreading

Overview

Topics Covered

  • Threads

  • Python modules

    • threading

    • concurrent.futures

    • queue

  • Learn "the hard way" then "the easy way"

  • Build a thread-safe message queue system

Terminology

  • CPU (Central Processing Unit): A piece of hardware in a computer that executes binary code.

  • OS (Operating System): A software that schedules when programs can use the CPU.

  • Process: A program that is being executed.

  • Thread: Part of a process.

Motivation

  • "Blocking" is where a thread is stuck, waiting for a something to finish so it can complete its function.

  • When single-threaded apps get blocked, it results in poor user experience and slower overall execution time.

  • Multi-threaded apps can do more than one function "at the same time" (not really, but it appears that way).

  • While one thread is blocked, other threads can continue their execution.

The Problem with Single Thread

Consider the following code snippet:

import time

def myfunc():
    print('hello')
    # time.sleep(10) simulates that the thread gets blocked, such as heavy I/O.
    # The process will be stuck here for 10 seconds.
    # During this time, we can't do anything.
    time.sleep(10)
    return True

if __name__ == '__main__':
    myfunc()

During time.sleep(10), we should let the CPU do some other work. This introduces the idea of multithreading.

Two Threads

Consider the following code snippet:

import time
import threading 

def myfunc(name):
    print(f'myfunc started with {name}')
    time.sleep(10)
    print('myfunc ended')

if __name__ == '__main__':
    print('main started')
    t = threading.Thread(target=myfunc, args=['ret2basic'])
    t.start()
    print('main ended')

Output:

This ensure that the main thread executes without waiting for the myfuncthread.

Daemon Thread

Daemon is like a background process. The main difference between a regular thread and a daemon thread is that the main thread will not wait for daemon threads to complete before exiting. Consider the following code snippet:

import time
import threading 

def myfunc(name):
    print(f'myfunc started with {name}')
    time.sleep(10)
    print('myfunc ended')

if __name__ == '__main__':
    print('main started')
    # Pay attention to the `daemon=True` option.
    # The main thread will not wait for a daemon thread.
    t = threading.Thread(target=myfunc, args=['ret2basic'], daemon=True)
    t.start()
    print('main ended')

Output:

Using daemon thread is bad in this case since the myfunc thread did not complete its work before the main thread exits.

Joining Threads

The join() method to bring all your threads together before the main thread exits. From Python documentation:

Consider the following code snippet:

import time
import threading 

def myfunc(name):
    print(f'myfunc started with {name}')
    time.sleep(10)
    print('myfunc ended')

if __name__ == '__main__':
    print('main started')
    t = threading.Thread(target=myfunc, args=['ret2basic'])
    t.start()
    # Pay attention to this .join() method.
    # At t.join(), the main thread is going to wait for thread `t` to finish.
    t.join()
    print('main ended')

Output:

Multiple Threads

Consider the following code snippet:

import time
import threading 

def myfunc1(name):
    print(f'myfunc1 started with {name}')
    time.sleep(10)
    print('myfunc1 ended')

def myfunc2(name):
    print(f'myfunc2 started with {name}')
    time.sleep(10)
    print('myfunc2 ended')

def myfunc3(name):
    print(f'myfunc3 started with {name}')
    time.sleep(10)
    print('myfunc3 ended')

if __name__ == '__main__':
    print('main started')
    t1 = threading.Thread(target=myfunc1, args=['ret2basic'])
    t1.start()
    t2 = threading.Thread(target=myfunc2, args=['foo'])
    t2.start()
    t3 = threading.Thread(target=myfunc3, args=['bar'])
    t3.start()
    t1.join()
    t2.join()
    t3.join()
    print('main ended')

Output:

Thread Pool

The code from the "Multiple Threads" section can be refactored using concurrent.futures.ThreadPoolExecutor(). From Python documentation:

Consider the following code snippet:

import concurrent.futures
import time

def myfunc(name):
    print(f'myfunc started with {name}')
    time.sleep(10)
    print(f'myfunc ended with {name}')

if __name__ == '__main__':
    print('main begins')
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as e:
        # This is equivalent to:
        # myfunc('ret2basic'), and
        # myfunc('foo'), and
        # myfunc('bar')
        e.map(myfunc, ['ret2basic', 'foo', 'bar'])
    print('main ended')

Output:

Race Conditions

A race condition happens when more than one thread is trying to access a shared piece of data at the same time. Learn more:

Example: Bank Account Program

From Python documentation:

Consider the following code snippet:

import concurrent.futures
import time

class Account:
    def __init__(self):
        # Shared data
        self.balance = 100
    def update(self, transaction, amount):
        print(f'{transaction} thread updating...')

        local_copy = self.balance
        local_copy += amount
        time.sleep(1)
        self.balance = local_copy

        print(f'{transaction} thread finishing...')

if __name__ == '__main__':
    account = Account()
    print(f'starting with balance of {account.balance}')
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as ex:
        for transaction, amount in [('deposit', 50), ('withdrawal', -150)]:
            # This is equivalent to:
            # account.update('deposit', 50), and
            # account.update('withdrawal', -150)
            ex.submit(account.update, transaction, amount)
    print(f'ending balance of {account.balance}')

Output:

Here the deposit thread created a copy of self.balance and the withdrawl thread created another copy of self.balance. We want the result to be 0, but the actual result is either -50 or 150, depending on which thread overwrites self.balance right before the program terminates. This is no good in this case, therefore we need lock to protect our shared data.

Lock

Suppose we have a lock object called self.lock, then:

  • self.lock.acquire(): Lock

  • self.lock.release(): Unlock

  • Or just use with self.lock

The code between the acquire() and release() methods are executed atomically so that there is no chance that a thread will read a non-updated version after another thread has already made a change.

Consider the following code snippet:

import concurrent.futures
import time
import threading

class Account:
    def __init__(self):
        self.balance = 100
        self.lock = threading.Lock()

    def update(self, transaction, amount):
        print(f"{transaction} thread updating...")
        
        # Using `with` statement is easier than using acquire() and release()
        with self.lock:
            local_copy = self.balance
            local_copy += amount
            time.sleep(1)
            self.balance = local_copy

        print(f"{transaction} thread finishing...")

if __name__ == '__main__':
    account = Account()

    print(f"starting with balance of {account.balance}")

    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as ex:
        for transaction, amount in [('deposit', 50), ('withdrawal', -150)]:
            ex.submit(account.update, transaction, amount)

    print(f"ending balance of {account.balance}")

Output:

This result is just what we want.

Deadlock and RLock

If you wrote lock.acquire() and forgot to do lock.release(), the lock becomes a deadlock. For example, if we lock twice, the lock becomes a deadlock:

import threading

lock = threading.Lock()
lock.acquire()
lock.acquire() # Deadlock!

The solution to this problem is using RLock. A Reentrant Lock (RLock) is a synchronization primitive that may be acquired multiple times by the same thread. Internally, it uses the concepts of "owning thread" and "recursion level" in addition to the locked/unlocked state used by primitive locks. In the locked state, some thread owns the lock; in the unlocked state, no thread owns it. For example:

import threading

rlock = threading.RLock()
rlock.acquire()
rlock.acquire() # No deadlock!

rlock.release() # Locked, count=1
print(rlock)
print(threading.current_thread())

print('------------------------------')

rlock.release() # Unlocked
print(rlock)
print(threading.current_thread())

Output:

The Producer-Consumer Pipeline

import random
import concurrent.futures
import time
import threading

# EOF tag
FINISH = 'THE END'

class Pipeline:
    def __init__(self, capacity):
        # Maximum number of messages allowed in the pipeline
        self.capacity = capacity
        # Shared data
        self.message = None

        self.producer_pipeline = []
        self.consumer_pipeline = []

        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()

    def set_message(self, message):
        """Put a message on the pipeline."""

        print(f"producing message '{message}'")

        self.producer_pipeline.append(message)

        self.producer_lock.acquire()
        self.message = message # Shared data
        self.consumer_lock.release()

    def get_message(self):
        """Grab a message from the pipeline."""

        print(f"consuming message '{self.message}'")

        self.consumer_lock.acquire()
        message = self.message # Share data
        self.producer_lock.release()

        self.consumer_pipeline.append(message)

        return message

def producer(pipeline):
    """Put messages on the pipeline."""

    for _ in range(pipeline.capacity):
        message = random.randint(1, 100)
        pipeline.set_message(message)

    # EOF tag
    pipeline.set_message(FINISH)

def consumer(pipeline):
    """Grab messages from the pipeline."""

    message = None
    
    while message != FINISH:
        message = pipeline.get_message()
        if message != FINISH:
            # Simulate an I/O or network operation
            # random.random() returns a random number between 0 and 1
            time.sleep(random.random())

if __name__ == '__main__':
    # Create a pipeline than can hold 10 messages
    pipeline = Pipeline(10)

    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as ex:
        ex.submit(producer, pipeline)
        ex.submit(consumer, pipeline)

    print(f"producer: {pipeline.producer_pipeline}")
    print(f"consumer: {pipeline.consumer_pipeline}")

Output:

The queue Module

In this section, we are going to refactor the producer-consumer pipeline using the queue module and threading events.

A queue can be declared using queue.Queue(maxsize=0), where maxsize is is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize <= 0, the queue size is infinite. A queue supports the following operations:

  • Queue.put(item, block=True, timeout=None)

    • Put item into the queue.

    • If optional args block is true and timeout is None (the default), block if necessary until a free slot is available.

    • If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time.

    • Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

  • Queue.get(block=True, timeout=None)

    • Remove and return an item from the queue.

    • If optional args block is true and timeout is None (the default), block if necessary until an item is available.

    • If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time.

    • Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

  • Queue.qsize()

    • Return the approximate size of the queue.

    • Note, qsize() > 0 doesn't guarantee that a subsequent get() will not block, nor will qsize() < maxsize guarantee that put() will not block.

Threading event replaces the lock mechanism. From Python documentation:

The set() method is equivalent to the adhoc FINISH = 'THE END' flag we invented in the producer-consumer pipeline program. Here is the refactored program:

import random
import concurrent.futures
import time
import threading
import queue

class Pipeline(queue.Queue):
    def __init__(self):
        # Inherit the `queue.Queue` class.
        super().__init__(maxsize=20)

        self.producer_pipeline = []
        self.consumer_pipeline = []

    def set_message(self, message):
        """Put a message on the pipeline."""

        print(f"producing message '{message}'")
        self.producer_pipeline.append(message)
        self.put(message)

    def get_message(self):
        """Grab a message from the pipeline."""
        
        message = self.get()
        print(f"consuming message '{message}'")
        self.consumer_pipeline.append(message)

        return message

def producer(pipeline, event):
    """Put messages on the pipeline."""

    while not event.is_set():
        message = random.randint(1, 100)
        pipeline.set_message(message)

def consumer(pipeline, event):
    """Grab messages from the pipeline."""

    while not pipeline.empty() or not event.is_set():
        print(f"queue size is {pipeline.qsize()}")
        message = pipeline.get_message()
        time.sleep(random.random())

if __name__ == '__main__':

    pipeline = Pipeline()
    event = threading.Event()

    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)
        time.sleep(0.5)
        # This sets the internal flag to True
        event.set()

    print(f"producer: {pipeline.producer_pipeline}")
    print(f"consumer: {pipeline.consumer_pipeline}")

Semaphore Objects

Lock and RLock only allows one thread to work at a time, but sometimes we want multiple threads to work at a time. For example, allow 10 members to access the database but only 4 members are allowed to access network connection. In such case, we need semaphore.

Semaphore can be used to limit the access to the shared resources with limited capacity. From Python documentation:

The following code demonstrates the usage of semaphore as counter:

import threading

sem = threading.Semaphore(value=50)
print(sem._value) # 50

sem.acquire()
sem.acquire()
sem.acquire()
print(sem._value) # 47

sem.release()
print(sem._value) # 48

Here is a sample program:

import concurrent.futures
import random
import threading
import time

def welcome(semaphore, reached_max_users):
    while True:
        visitor_number = 0
        while not reached_max_users.is_set():
            print(f"welcome visitor #{visitor_number}")
            semaphore.acquire()
            visitor_number += 1
            time.sleep(random.random())

def monitor(semaphore, reached_max_users):
    while True:
        print(f"[monitor] semaphore={semaphore._value}")
        time.sleep(3)
        if semaphore._value == 0:
            reached_max_users.set()
            print('[monitor] reached max users!')
            print('[monitor] kicking a user out...')
            semaphore.release()
            time.sleep(0.05)
            reached_max_users.clear()

if __name__ == '__main__':
    number_of_users = 50
    reached_max_users = threading.Event()
    semaphore = threading.Semaphore(value=50)

    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(welcome, semaphore, reached_max_users)
        executor.submit(monitor, semaphore, reached_max_users)

Reference

PreviousConcurrency and ParallelismNextAsynchronization

Last updated 3 years ago

Was this helpful?

Race Conditions
Two threads
Daemon thread
.join()
Joining threads
Multiple threads
.map()
Thread Pool Executor
.submit()
Bank account program
Lock objects
RLock
Producer-consumer pipeline with Lock
Threading event
Semaphore
Threading in Python – Real Pythonrealpython
Threading in Python - Real Python
Logo