Introduction

Welcome back to Building an Async CLI Tool for ETL Pipelines in Python! In the previous three lessons, we constructed a self-validating domain model with frozen dataclasses and descriptors, built streaming parsers that normalize CSV and JSON Lines data into validated transactions, and implemented a declarative router using structural pattern matching to dispatch transactions to appropriate business handlers.

Today's focus is Async Pipeline with Backpressure: transforming our sequential processor into a concurrent, resilient system that can handle multiple data streams efficiently while maintaining control over resource usage. We'll construct an asynchronous engine that combines producers, consumers, and a bounded queue, leveraging asyncio.TaskGroup for structured concurrency. The key insight is that a small queue naturally creates backpressure, preventing producers from overwhelming consumers and keeping memory usage predictable.

By the end of this lesson, we'll have implemented a complete async pipeline that streams parsed records through concurrent workers, collects both successful results and validation errors, and coordinates graceful shutdown across all tasks. The architecture scales naturally; you can adjust the number of workers and queue size to match your workload characteristics without restructuring the code. Let's begin by understanding why backpressure matters in async pipelines.

The Producer-Consumer Pattern with Backpressure

Asynchronous pipelines excel at maximizing throughput when I/O operations dominate the workload. Instead of waiting for each file read or network request to complete before starting the next, we can initiate many operations concurrently and process results as they arrive. However, this concurrency introduces a new challenge: what happens when producers generate data faster than consumers can process it?

Without backpressure, the producer would flood memory with unprocessed items, eventually exhausting available resources and crashing the application. The solution is a bounded queue: a queue with a maximum size that blocks producers when full. When the queue reaches capacity, the producer's await queue.put(item) suspends until a consumer removes an item, freeing space. This creates natural backpressure that adapts to the consumer's processing speed without explicit rate limiting or complex coordination logic.

The pattern works because asyncio's cooperative multitasking ensures fairness: when the producer blocks on a full queue, other tasks like consumers continue running, processing items and emptying the queue. Once space becomes available, the producer resumes automatically. This self-regulating behavior makes the system resilient; if consumers slow down due to downstream latency, the producer automatically adjusts its pace.

The Sentinel Pattern for Graceful Shutdown

Coordinating shutdown in a producer-consumer system requires careful thought. The producer knows when it has finished reading input, but consumers are sitting in infinite loops waiting for the next item. How do we signal consumers to exit gracefully without abruptly canceling tasks and losing in-flight work?

The sentinel pattern solves this elegantly: the producer sends a special marker value after all real data has been queued. When a consumer receives this sentinel, it breaks its loop and terminates. The sentinel must be a unique object that can't be confused with actual data; we'll use SENTINEL = object(), creating a singleton instance that checks by identity rather than value.

For multiple consumers, the producer must send one sentinel per consumer. If we have three workers and only send one sentinel, the first consumer to retrieve it will exit, but the other two will wait forever. By sending three sentinels, we ensure every consumer receives the shutdown signal. This approach is more explicit than cancellation and allows consumers to finish processing their current item cleanly before exiting.

Implementing the Consumer Coroutine

The consumer coroutine embodies the worker pattern: an infinite loop that retrieves items from the queue, processes them, and signals completion. Here's the implementation:

The while True loop runs indefinitely until explicitly terminated. The await q.get() suspends the coroutine until an item becomes available in the queue, yielding control to other tasks while waiting. This cooperative suspension is what makes asyncio efficient; blocked tasks consume no CPU time.

The sentinel check uses identity comparison (is SENTINEL) rather than equality, ensuring we only match the exact sentinel object we created. When detected, we immediately return, exiting the loop and terminating the consumer task. For valid transactions, we call route_transaction and append the result to the shared results list. Since asyncio uses a single-threaded event loop, this append is safe without locking; only one coroutine executes at a time.

The finally block ensures q.task_done() is called regardless of whether processing succeeded or an exception occurred. This method decrements the queue's internal counter of unfinished tasks, which the join() method monitors to determine when all work is complete. Forgetting task_done() would cause join() to hang indefinitely, waiting for tasks that already finished.

Implementing the Producer Coroutine

The producer coroutine reads input data, validates it, and queues valid transactions while collecting errors:

The producer iterates over parsed records from the records_to_transactions generator. This generator yields tuples of (bool, value), where the boolean indicates success and the value is either a valid Transaction or an error string. This design keeps the producer focused on coordination rather than validation logic.

For successful transactions, await q.put(val) adds the transaction to the queue. If the queue is full (reached maxsize), this await suspends the producer, implementing backpressure automatically. The producer resumes only when a consumer processes an item, freeing space. For errors, we append the error string to the shared errors list for later output.

After processing all records, the producer sends sentinels to signal shutdown. The range(consumers) loop ensures we send exactly one sentinel per consumer, guaranteeing all workers receive the shutdown signal. Each await q.put(SENTINEL) may also block if the queue is full, ensuring sentinels don't starve out remaining data items.

Orchestrating with TaskGroup

The run_pipeline function orchestrates the entire async pipeline, managing the queue, tasks, and file handling:

The function accepts a file path, format kind, and tuning parameters for workers and queue size. We validate that at least one worker exists, create a bounded queue with the specified maxsize, and initialize empty result and error lists that will be shared across all tasks.

The file handling logic opens the appropriate file handle and selects the matching parser function. Note that we open the file synchronously here; the async pipeline consumes the parsed records, but the parsing itself remains synchronous because file I/O in Python's standard library isn't truly async. The async benefits come from concurrent processing of multiple transactions, not from async file reads.

The async with asyncio.TaskGroup() context manager provides structured concurrency: all tasks created within the block are automatically awaited when the block exits, and if any task raises an exception, all other tasks are canceled. This guarantees cleanup and prevents orphaned tasks.

We create the producer task first, passing it the parsed records generator, the queue, the errors list, and the consumer count. Then we create multiple consumer tasks in a loop, each sharing the same queue and results list. The producer and consumers run concurrently; as soon as we call create_task, the task begins executing alongside other tasks.

Bridging Sync and Async with the CLI Entry Point

The main script serves as the synchronous entry point that bridges the async pipeline:

The helper function infers the file format from its extension, defaulting to CSV for unknown formats. This keeps the main logic clean while providing reasonable fallback behavior.

The script reads the file path from the DATA_PATH environment variable, validates that the path exists, and exits with code 2 if the configuration is invalid. After inferring the format, we invoke the async pipeline using asyncio.run(), which creates a new event loop, runs the coroutine to completion, and cleans up the loop automatically.

The call to asyncio.run(run_pipeline(...)) is where the synchronous and asynchronous worlds meet. From the perspective of the synchronous __main__ block, this is a regular blocking function call that returns results when complete. Inside asyncio.run(), however, the entire async pipeline executes with concurrent tasks, queues, and cooperative scheduling. This pattern is standard for CLI tools: a thin synchronous wrapper that launches async processing and formats output.

After the pipeline completes, we output results as compact JSON and errors as plain text. The separators=(",", ":") argument removes whitespace for smaller output, while sort_keys=True ensures consistent field ordering across runs. This output format is both human-readable and machine-parseable, making it suitable for logging or piping to downstream tools.

Running the Pipeline

When we execute the script with a sample data file containing various transaction types and deliberate errors, the async pipeline processes everything concurrently and produces this output:

The output demonstrates the pipeline's end-to-end functionality. Valid transactions appear as JSON objects with the ok: true flag and complete entry details, while errors appear as plain text lines prefixed with "ERR" and the record ID. This mixed format is intentional; it makes successful processing machine-readable while keeping errors human-friendly for quick diagnosis.

Analyzing the Output

The output reveals how the async pipeline processes records while maintaining correctness. Notice that all successful transactions appear before error messages, even though errors occurred at various positions in the input file (records 9, 10, 11, and 12 between valid records 8 and 13). This ordering reflects our pipeline architecture: valid transactions flow through the queue to consumers, getting routed and appended to results, while errors are collected separately by the producer and appended to errors. We output each list sequentially after processing completes.

The high-value flags appear correctly on transactions 1, 4, 6, 8, and 13, all with unit amounts of 10.00 or greater. Transaction 1 shows a quantity of 3 with a unit price of 10.50 USD, correctly flagged because the routing logic examines the unit amount, not the total. Transaction 8 demonstrates this with a unit price of 100.00 USD and quantity 5, resulting in a substantial total of 500.00 USD, but the flag is triggered by the unit amount exceeding the threshold.

Refund transactions (2, 5, 7) all show negative signs (sign: -1), distinguishing them from additions. The pipeline correctly handles multiple currencies; EUR refunds appear alongside USD additions without currency conversion or validation conflicts. This demonstrates that the pipeline focuses on structural correctness and business logic, leaving domain-specific rules like currency consistency to downstream systems.

The error messages reveal the validation layers working together. Error 9 indicates an amount validation failure caught by the Money or Transaction dataclass validation. Error 10 shows an unsupported operation type caught during normalization. Error 11 reveals an empty account name, triggering the account validation in the Transaction constructor. Error 12 shows a decimal parsing exception, indicating the amount field contained non-numeric data that couldn't be converted.

Transaction 13, appearing after multiple errors, confirms the pipeline's resilience. Despite encountering invalid records, processing continued through the entire file, collecting valid transactions and errors independently. This behavior is crucial for ETL systems; a single bad record shouldn't halt processing of thousands of valid records.

Backpressure in Action

While the output doesn't visibly show backpressure, the mechanism is constantly at work during execution. With maxsize=3, the queue holds at most three transactions at once. When all three consumers are busy processing, the queue fills, and the producer's next await q.put() blocks until a consumer finishes and calls task_done(), freeing space.

This small queue size creates rapid backpressure feedback. If we increased maxsize to 1000, the producer would queue hundreds of transactions before consumers could catch up, consuming more memory but reducing coordination overhead. The optimal queue size depends on your workload: smaller queues provide tighter backpressure and lower memory usage at the cost of more frequent producer blocking, while larger queues allow producers to run further ahead, smoothing out processing rate variations but using more memory.

The three-worker configuration balances concurrency and resource usage. Each consumer processes one transaction at a time, so three workers can handle three transactions concurrently. If routing and processing were I/O-bound (such as making API calls or database writes), more workers would increase throughput. For CPU-bound routing like ours, three workers provide modest concurrency benefits while keeping the code simple.

Coordination Guarantees

The combination of task_done(), join(), and sentinels provides strong coordination guarantees. The producer doesn't exit until it has queued all transactions and all sentinels. Consumers don't exit until they've processed everything, including their sentinel. The await q.join() doesn't return until every item that was put has had task_done() called, ensuring no in-flight work is lost.

The TaskGroup context manager adds another layer of safety. When the async with block exits, it waits for all tasks to complete naturally. If a consumer raised an exception during processing, the TaskGroup would cancel all other tasks and re-raise that exception, preventing silent failures. This structured concurrency approach eliminates common pitfalls like forgetting to await tasks or losing exceptions in background tasks.

Conclusion and Next Steps

Excellent work! You've built a complete async ETL pipeline with backpressure, structured concurrency, and graceful shutdown coordination. The architecture demonstrates production-ready patterns: bounded queues naturally regulate flow, sentinels enable clean shutdown, TaskGroups ensure proper task lifecycle management, and the separation between the sync entry point and async core logic keeps the codebase maintainable.

The patterns you've learned scale beyond this CLI tool. Bounded queues with backpressure apply to any producer-consumer system, from web scrapers to message processors. The sentinel pattern works for any scenario where workers need coordinated shutdown signals. TaskGroups simplify error handling in concurrent systems by automatically canceling related tasks when any task fails. The asyncio.run() bridge pattern enables incremental async adoption in existing codebases; you can introduce async processing in hot paths while keeping the surrounding infrastructure synchronous.

This completes the core curriculum for building async ETL pipelines. You've progressed from self-validating domain models through streaming parsers, declarative routers, and finally to concurrent async processing with backpressure. Each lesson built on the previous, adding layers of capability while maintaining clean separation of concerns. The result is a robust, testable, and extensible foundation for data processing tools.

Now it's time to put these concepts into practice! The upcoming exercises will challenge you to implement each component step by step, building the async pipeline from the ground up. You'll start by completing the consumer coroutine with proper task coordination, then implement the producer with sentinel handling, orchestrate everything with TaskGroups, and finally bridge the sync-async boundary in the CLI entry point. These hands-on exercises will solidify your understanding and give you the confidence to build production-grade async systems in your own projects!

Sign up
Join the 1M+ learners on CodeSignal
Be a part of our community of 1M+ users who develop and demonstrate their skills on CodeSignal