Who this guide is for
- Learners building programs that handle many tasks at once
- Developers deciding between threads, processes, and async approaches
- Engineers optimizing I/O-heavy or CPU-heavy workloads
What you’ll learn
- The difference between concurrency and parallelism
- How the GIL affects threading in CPython
- When to use
threading,multiprocessing, andasyncio - How
async/awaitand event loops work in practice - A practical selection guide for real workloads
Why this topic matters
Modern applications often perform multiple operations at once: network calls, file I/O, background jobs, and user requests. Choosing the wrong execution model can waste resources or make code unnecessarily complex.
Python gives multiple concurrency options, each with strengths and trade-offs. Understanding when to use each approach helps you build systems that are responsive and efficient.
Core concepts
Concurrency vs parallelism and the GIL
Concurrency means managing multiple tasks over time. Parallelism means tasks execute at the same time on multiple cores.
In CPython, the Global Interpreter Lock (GIL) allows only one thread to execute Python bytecode at a time. This matters mostly for CPU-bound tasks.
Rule of thumb:
- I/O-bound tasks -> threads or asyncio
- CPU-bound tasks -> multiprocessing
Threading and multiprocessing
Threading is useful for waiting tasks (network, disk, APIs).
import threading
import time
def task(name):
time.sleep(1)
print(f"Done: {name}")
threads = [threading.Thread(target=task, args=(f"t{i}",)) for i in range(3)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
Multiprocessing runs separate processes and can use multiple CPU cores for heavy computation.
Asyncio for high-concurrency I/O
asyncio is event-loop based and shines when many I/O tasks must be coordinated efficiently.
import asyncio
async def fetch(name, delay):
await asyncio.sleep(delay)
return f"{name} done"
async def main():
results = await asyncio.gather(
fetch("A", 1),
fetch("B", 1),
fetch("C", 1),
)
print(results)
asyncio.run(main())
This avoids creating many threads for high I/O concurrency scenarios.
Step-by-step walkthrough
Step 1 — Classify workload type first
Before coding concurrency, identify dominant cost:
- Network/file waiting -> I/O-bound
- Heavy numerical computation -> CPU-bound
This single decision drives tool choice.
Step 2 — Implement a small concurrent version
For I/O-like waits, compare sequential vs concurrent.
import time
import asyncio
async def io_task():
await asyncio.sleep(1)
async def run_concurrent():
await asyncio.gather(*(io_task() for _ in range(5)))
start = time.perf_counter()
asyncio.run(run_concurrent())
print(f"Elapsed: {time.perf_counter() - start:.2f}s")
You should observe runtime near one second instead of five.
Step 3 — Add safety and observability
Concurrency increases complexity. Add:
- Timeouts
- Error handling per task
- Logging/task naming
Reliability matters as much as speed gains.
Practical examples
Example 1 — CPU-bound with multiprocessing
from multiprocessing import Pool
def square(n):
return n * n
if __name__ == "__main__":
with Pool() as pool:
print(pool.map(square, [1, 2, 3, 4, 5]))
Expected output:
[1, 4, 9, 16, 25]
This pattern scales CPU tasks better than threads in many CPython cases.
Example 2 — Async task timeout handling
import asyncio
async def slow():
await asyncio.sleep(3)
return "done"
async def main():
try:
result = await asyncio.wait_for(slow(), timeout=1)
print(result)
except asyncio.TimeoutError:
print("Task timed out")
asyncio.run(main())
Expected output:
Task timed out
Example 3 — Performance comparison snapshot (I/O-like workload)
import asyncio
import threading
import time
def io_sleep():
time.sleep(1)
def run_sequential():
start = time.perf_counter()
for _ in range(5):
io_sleep()
return time.perf_counter() - start
def run_threading():
start = time.perf_counter()
threads = [threading.Thread(target=io_sleep) for _ in range(5)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
return time.perf_counter() - start
async def run_asyncio():
start = time.perf_counter()
await asyncio.gather(*(asyncio.sleep(1) for _ in range(5)))
return time.perf_counter() - start
seq_time = run_sequential()
thr_time = run_threading()
async_time = asyncio.run(run_asyncio())
print(f"Sequential: {seq_time:.2f}s")
print(f"Threading : {thr_time:.2f}s")
print(f"Asyncio : {async_time:.2f}s")
Expected output (typical):
Sequential: ~5.00s
Threading : ~1.00s
Asyncio : ~1.00s
This simple comparison shows why concurrency strategy should match workload type.
Common mistakes and how to avoid them
- Using threads for CPU-heavy work in CPython -> Prefer multiprocessing for parallel CPU tasks.
- Mixing blocking calls inside async functions -> Use async-compatible libraries and avoid blocking operations in event loop.
- Adding concurrency before measuring bottlenecks -> Profile first, then optimize targeted areas.
- Ignoring cancellation/timeouts -> Add timeout and cancellation paths for resilience.
Quick practice
- Implement one I/O task both sequentially and with
asyncio.gather, then compare runtimes. - Implement a CPU-heavy function and test with multiprocessing pool.
- Add timeout handling to an async operation and log timeout events.
Key takeaways
- Select concurrency models based on workload type, not trend.
- Threads and asyncio suit I/O-bound tasks; multiprocessing suits CPU-bound tasks.
- The GIL is an important design constraint for CPython thread behavior.
- Concurrency needs robust error handling and observability to be production-safe.
Next step
Continue to Web Frameworks & Application Development. In the next guide, you will compare major Python frameworks and choose the right one for your app type.
No Comments