Introduction

Welcome back to Python Concurrency & Async I/O! You're now in the third lesson of this course, continuing to build your mastery of Python's asynchronous programming model. In the previous lesson, we explored the event loop and built a producer-consumer system using asyncio.create_task(), asyncio.Queue, and sentinel values for graceful shutdown. We learned how cooperative multitasking enables efficient, concurrent execution of I/O-bound operations without the overhead or complexity of threads.

Today, we're elevating our concurrency skills with asyncio.TaskGroup, a powerful construct introduced in Python 3.11 that brings structured concurrency to asyncio. While create_task() gives us fine-grained control over individual tasks, managing collections of related tasks can become complex: ensuring all tasks are awaited, handling exceptions across multiple tasks, and implementing clean cancellation require careful orchestration. TaskGroup solves these challenges by grouping related tasks together and managing their lifecycle as a unit.

Throughout this lesson, we'll enhance our producer-consumer system with TaskGroup, implement global timeouts that cancel all tasks cleanly, and handle cancellation signals with proper cleanup. We'll see how structured concurrency makes our code more robust, predictable, and maintainable. By the end, you'll understand why TaskGroup is the recommended approach for managing multiple concurrent operations and how to build systems that shut down gracefully even when time limits are exceeded. Let's begin by understanding what structured concurrency means and why it matters.

What is Structured Concurrency?

Structured concurrency is a programming paradigm in which the lifecycle of concurrent operations is tightly bound to code blocks. Just as structured programming replaced arbitrary goto statements with clear control flow (if/while/for), structured concurrency replaces fire-and-forget task creation with scoped task management. The key principle: when you enter a scope that launches concurrent operations, you cannot exit that scope until all those operations complete or are explicitly canceled.

Think about how Python's context managers work with files: when you write with open('file.txt') as f:, the file is guaranteed to close when you exit the with block, even if an exception occurs. Structured concurrency applies the same discipline to concurrent tasks. When you create tasks within a scope, you're guaranteed that the scope won't exit until those tasks finish. This eliminates common bugs like orphaned tasks that continue running after their parent has moved on, or exceptions that silently disappear because no one awaited the failed task.

Without structured concurrency, managing task lifetimes requires manual bookkeeping. You create tasks with create_task(), store them in a list, remember to await each one, and handle exceptions individually. If you forget to await a task, it becomes an orphan. If a task raises an exception after you've moved past it, the exception might go unnoticed until the program exits. These issues compound as systems grow: dozens of tasks across multiple functions, each needing careful tracking and error handling.

asyncio.TaskGroup enforces structured concurrency through a context manager. When you enter the async with asyncio.TaskGroup() block, you can create tasks that will run concurrently. When you exit the block (normally or via exception), the TaskGroup ensures all tasks have completed. If any task raises an exception, the remaining tasks are canceled, and exceptions are collected and re-raised. This automatic management eliminates manual tracking, ensures clean shutdown, and makes error handling consistent across all tasks.

Introducing asyncio.TaskGroup

Let's see how TaskGroup replaces the manual task management from our previous lesson. The core pattern uses an async context manager:

The async with asyncio.TaskGroup() as tg: line creates a TaskGroup context. The tg object has a create_task() method that schedules tasks to run within this group. We use it to launch one producer task and three consumer tasks (C1, C2, C3). Notice we pass num_consumers to the producer so it knows how many sentinels to send.

Crucially, we don't manually await these tasks. The TaskGroup handles that automatically when we exit the context. When the program reaches the end of the async with block, the TaskGroup waits for all tasks to complete before allowing execution to continue. This is structured concurrency in action: the scope defines the tasks' lifetime.

The stats dictionary tracks how many items were produced and consumed, which is useful for verifying correctness and diagnosing issues. We pass it to both producer and consumer functions so they can update these counters as they work. This shared state is safe because we're in a single-threaded async environment; concurrent tasks don't run in parallel, they interleave cooperatively.

Building the Producer with Cancellation Handling

Our producer follows the same pattern as before but adds cancellation handling. Let's examine its structure:

The producer generates n items, each with a simulated delay and a payload (the square of the id). After producing all items, it sends one SENTINEL per consumer to signal shutdown. The stats["produced"] counter tracks how many items were created, incremented after each successful q.put().

The new addition is the outer try block. When a task is canceled (because a timeout expires or an exception occurs in another task), asyncio.CancelledError is raised at the next await point. Without proper handling, this exception would propagate up, potentially leaving the system in an inconsistent state. Let's see the exception handler:

When cancellation occurs, we catch CancelledError, log the cancellation with timing, and then re-raise the exception with raise. Re-raising is critical: it allows the TaskGroup to register the cancellation and coordinate with other tasks. If we didn't re-raise, the producer would exit normally, potentially confusing the shutdown protocol. The cancellation message helps us observe when and why the producer stopped, which is essential for debugging timeout and error scenarios.

Building Consumers with Cancellation Handling

Consumers retrieve items from the queue and process them, checking for the sentinel to know when to stop. Here's the consumer structure with cancellation handling:

The outer try block wraps the entire consumer loop. Inside, await q.get() retrieves items, yielding control when the queue is empty. When an item arrives, we check if it's the sentinel; if so, we print a stop message and return, exiting the consumer cleanly.

For regular items, we simulate processing with a random sleep and print the item details. The inner try...finally ensures q.task_done() is called for every item retrieved, even if an exception occurs during processing. This maintains the queue's invariant for join() synchronization, which we'll discuss shortly.

Now let's see the cancellation handler at the consumer level:

Just like the producer, when cancellation occurs (due to timeout or TaskGroup shutdown), we log it and re-raise. This pattern is consistent across all tasks: acknowledge cancellation for observability, then propagate the signal so the TaskGroup can complete its shutdown protocol. Without re-raising, the TaskGroup might wait indefinitely for a task that thinks it's done but hasn't properly signaled completion.

Coordinating with Queue Join

Before the TaskGroup exits, we need to ensure all produced items have been consumed and processed. This is where q.join() becomes essential:

The await q.join() call blocks until the queue's internal task counter reaches zero. This counter tracks "unfinished tasks": each q.get() increments it, and each q.task_done() decrements it. When all retrieved items have been marked complete, join() returns.

Why is this necessary? Without q.join(), the TaskGroup would only wait for task completion, not work completion. The producer might finish and consumers might retrieve all items, but consumers could still be processing when the TaskGroup exits. By placing q.join() inside the TaskGroup block, we guarantee that all work items have been fully processed before allowing the TaskGroup to proceed with shutdown.

This coordination is subtle but powerful. The producer finishes generating items and sending sentinels. Consumers continue processing items and calling task_done() for each one. When the last item is marked complete, q.join() returns, and the TaskGroup can begin its shutdown sequence. At this point, only sentinels remain in the queue; consumers will retrieve them and exit cleanly.

Enforcing a Global Timeout

Real-world systems often need time limits: an API request must respond within 30 seconds, a batch job must complete within an hour. asyncio.timeout provides this capability:

The async with asyncio.timeout(deadline): wraps the entire TaskGroup. The deadline of 0.8 seconds means if execution takes longer, a TimeoutError is raised. This timeout applies to everything inside: task creation, work processing, and queue synchronization.

When the timeout expires, all tasks within the TaskGroup are canceled. The cancellation propagates to each task as asyncio.CancelledError at their next await point. Our producer and consumers catch this error, log cancellation, and re-raise, allowing the TaskGroup to coordinate the shutdown. The TaskGroup waits for all tasks to finish handling their cancellation, ensuring a clean exit.

Outside the timeout block, we catch TimeoutError:

If the timeout expires, we print a "TIMEOUT" message along with the stats, showing how many items were produced and consumed before cancellation. If execution completes within the deadline, we print "DONE" with the same stats. This dual-path reporting helps distinguish successful completion from timeout-triggered cancellation.

Understanding the Output

Let's examine the actual output to see structured concurrency and timeout cancellation in action:

The output shows rich concurrent interleaving. The producer generates items (PROD messages) while consumers process them simultaneously (C1, C2, C3 messages). At 0.023 seconds, item 1 is produced; by 0.079 seconds, C3 processes item 3 while the producer continues generating. This confirms our tasks are truly concurrent, not sequential.

The timeout triggers at 0.801 seconds (our 0.8-second deadline, plus minor overhead). At this moment, the producer has generated 29 items (PROD_CANCEL at 0.801), but consumers have only processed 28 (C1: 10, C2: 8, C3: 10). One item remains in the queue unprocessed. The CANCEL messages appear simultaneously because cancellation propagates to all tasks at the same instant when the timeout expires.

The final line, "TIMEOUT 29 28 0.801," confirms timeout-triggered shutdown. The stats show 29 items produced but only 28 consumed, demonstrating that cancellation can interrupt work midstream. This is exactly what we want: when time runs out, the system shuts down cleanly without waiting for pending work to complete. In production, this prevents runaway jobs from consuming resources indefinitely.

Comparing with Manual Task Management

It's worth contrasting this structured approach with the manual pattern from the previous lesson. Without TaskGroup, we'd write:

This works, but consider the maintenance burden. Adding a fourth consumer requires creating another task and adding another await line. If any task raises an exception, we must catch it individually or risk unnoticed failures. Implementing a global timeout requires asyncio.wait_for() with careful exception handling.

With TaskGroup, scaling is trivial: the loop for i in range(1, num_consumers + 1): automatically adjusts to any number of consumers. Exception handling is unified: if any task fails, all tasks are canceled and exceptions are collected. The timeout wrapper applies to everything automatically. This declarative style scales far better than imperative task tracking.

Moreover, TaskGroup prevents common mistakes. Forgetting to await a task is impossible; the TaskGroup waits for all tasks by design. Orphaned tasks can't exist because the TaskGroup scope defines their lifetime. Exception suppression is avoided because exceptions are always propagated. These guarantees make concurrent code more reliable and easier to reason about.

Conclusion and Next Steps

Congratulations on completing the third lesson of Python Concurrency & Async I/O! Today, we explored structured concurrency with asyncio.TaskGroup, a powerful tool for managing collections of related asynchronous tasks. You learned how TaskGroup enforces scoped task lifetimes, ensuring all tasks complete or are explicitly canceled before the scope exits. This discipline eliminates orphaned tasks, ensures consistent exception handling, and simplifies scaling from a few tasks to hundreds.

We enhanced our producer-consumer system with TaskGroup, implemented global timeouts with asyncio.timeout, and added proper cancellation handling with try...except asyncio.CancelledError. You saw how q.join() coordinates work completion, guaranteeing all items are processed before shutdown. The output analysis revealed how tasks interleave naturally and how timeout cancellation stops work midstream, a critical capability for building responsive, resource-conscious systems.

The patterns you practiced today are essential for production asyncio applications. TaskGroup is now the recommended approach for launching multiple tasks; its automatic lifecycle management prevents entire classes of bugs. Timeout enforcement with asyncio.timeout protects against runaway operations. Proper cancellation handling ensures clean shutdown even under error conditions. These techniques apply to web scrapers, API clients, data pipelines, and any system managing concurrent I/O operations.

Up next, you'll dive into hands-on practice where you'll apply these structured concurrency patterns yourself. You'll launch task groups, coordinate shutdown with sentinels and queue synchronization, enforce deadlines, and debug systems that hang due to missing coordination. By building these systems from scratch, you'll internalize the principles of structured concurrency and develop the confidence to manage complex concurrent workflows in your own projects!

Sign up
Join the 1M+ learners on CodeSignal
Be a part of our community of 1M+ users who develop and demonstrate their skills on CodeSignal