Welcome back to Building an Async CLI Tool for ETL Pipelines in Python! You've reached lesson 2, and you're making excellent progress. In the previous lesson, you built a robust, self-validating domain model using frozen dataclasses, the Range descriptor, and the Money value type. That validation layer ensures that only well-formed transactions enter our pipeline. Now, we'll extend that foundation by building the parsers and normalization routines that feed validated data into our system.
Today's focus is Parsing & Normalization: the critical bridge between raw file formats and our strongly-typed domain model. We'll implement streaming parsers for CSV and JSON Lines formats, construct a normalization pipeline that safely converts raw dictionaries into validated Transaction objects, and build a main script that orchestrates the entire flow. The key insight is that our parsers will report errors without crashing, yielding a stream of results where each item is either a valid transaction or a clear error message. This approach maintains pipeline resilience; a few bad rows don't stop the processing of the entire file.
By the end of this lesson, you'll have a complete ingestion system that reads files, parses them format-appropriately, normalizes field names and types, validates business rules, and produces a clean output stream. Let's begin by understanding why streaming matters in data processing.
When processing data files, we face a fundamental choice: load everything into memory at once or process it incrementally, one record at a time. For small files, loading everything is convenient; for real-world ETL systems handling thousands or millions of records, it's impractical or impossible.
Streaming parsers solve this problem by yielding records one at a time using generator functions. Instead of returning a list of all records, a streaming parser uses yield to produce each record as it's parsed, maintaining a minimal memory footprint regardless of file size. The downstream code processes each record immediately, and the parser moves to the next one. This pattern enables ETL pipelines to handle files larger than available RAM and to start producing output immediately rather than waiting for the entire file to load.
Python's generator protocol is perfect for streaming. When you call a function containing yield, it returns a generator object that you can iterate over. Each iteration executes the function until it hits the next yield, which produces a value and pauses execution. The function's local state is preserved, so the next iteration resumes exactly where it left off. This lazy evaluation means we only parse what we need, when we need it, creating natural backpressure: if the consumer slows down, the parser automatically pauses.
CSV (comma-separated values) is ubiquitous in data pipelines. Python's csv module provides DictReader, which converts each row into a dictionary mapping header names to values. We'll wrap this in a generator that normalizes whitespace:
The function accepts a file handle (fh), creates a DictReader from it, and iterates through rows. For each row, we build a new dictionary using a comprehension that strips whitespace from string values while preserving non-string values unchanged. This normalization is crucial; CSV files often contain trailing spaces or inconsistent padding that would cause validation failures downstream. By cleaning values at parse time, we ensure the domain model receives consistent input. The yield statement makes this a generator; calling parse_csv(fh) returns an iterator that produces one row dictionary per iteration, maintaining constant memory usage regardless of file size.
JSON Lines (also called newline-delimited JSON or .jsonl) is a format where each line is a complete, valid JSON object. This format is ideal for streaming: you can process one line at a time without parsing the entire file structure. It's commonly used for logs, data exports, and streaming APIs:
The parser iterates through lines in the file handle, strips whitespace from each line, skips empty lines (which might appear at the end of files or between batches), and parses the remaining non-empty lines using json.loads(). Each parsed line becomes a dictionary that we yield to the caller. This simplicity is JSON Lines' strength: no complex state tracking, no nested structure parsing, just one object per line. If a line contains invalid JSON, json.loads() raises an exception; we'll handle those errors in the normalization layer, where we have access to record identifiers for better error messages.
Parsing produces raw dictionaries, but our domain model expects specific types and field names. The normalization layer bridges this gap by extracting fields, coercing types, and creating validated Transaction objects. This is where the real complexity lies; file formats vary, field names differ (qty versus quantity versus qty_in), and data quality issues emerge.
The challenge is handling errors gracefully. If a single record has an invalid amount or a missing required field, we want to report that error and continue processing the rest of the file. This requires wrapping each record's processing in a try...except block and yielding either a successful transaction or an error message. The caller receives a stream of results where each item is a tuple: (True, transaction) for success or (False, error_string) for failure. This pattern maintains type safety while allowing robust error handling.
Let's implement the normalization function that converts raw dictionaries into validated transactions:
The function accepts any iterable of dictionaries (from either parser) and yields tuples of (bool, result). Inside the try block, we extract and coerce each field: the ID is cast to int, operation and account are converted to strings and stripped, amount and currency are passed through (validation happens in Money), and quantity is extracted with a fallback chain that checks multiple possible field names before defaulting to 1. This flexibility accommodates different CSV headers or JSON field naming conventions.
The quantity extraction demonstrates a common normalization pattern:
We check multiple possible field names using short-circuit evaluation with or. If rec.get("qty") returns a value (including zero, which is falsy but unlikely for quantity), we use it; otherwise, we try quantity, then qty_in, finally defaulting to 1 if none exist. This handles files where the quantity column might be named differently or omitted entirely for single-item transactions. After extracting the raw value, we cast it to int, which will raise an exception if the value isn't numeric.
The Money and Transaction constructors perform all validation; we simply pass the extracted values through. If any validation fails (invalid currency code, negative amount, empty account), the exception is caught in the except block. This separation of concerns keeps the normalization logic clean: extract and coerce data, delegate validation to the domain model.
When processing fails, we need informative error messages:
The except block catches any exception during processing (type coercion errors, validation errors, missing fields) and constructs an error message. We extract the record ID if available, defaulting to "?" if it's missing or couldn't be parsed, and format a message that includes both the ID and the exception text. This provides context for debugging; seeing "ERR 9 amount must be > 0.00" immediately tells us which record failed and why. We yield False with the error string, maintaining the same iterator protocol as successful records but signaling failure with the boolean flag.
Now, we build the main script that ties everything together. The first step is reading configuration and validating the input file:
We read the file path from an environment variable for flexibility (production systems often inject configuration this way), convert it to a Path object for better path manipulation, and validate that it exists. If the environment variable is unset or the file doesn't exist, we print an error and exit with status code 2 (a convention for configuration errors). This fail-fast approach prevents cryptic errors later; if the input file is missing, we should report that immediately rather than attempting to parse nothing.
Different file formats require different parsers. We infer the format from the file extension:
The helper function extracts the file extension using path.suffix, converts it to lowercase for case-insensitive matching, and checks against known formats. JSON Lines files commonly use either .jsonl or .ndjson extensions, so we check both. If the extension doesn't match any known format, we default to CSV as a reasonable fallback. This simple heuristic avoids requiring explicit format configuration while remaining extensible; adding support for new formats means adding another condition.
With the file type determined, we can open and process the file:
For CSV files, we open with UTF-8 encoding and newline="" (required by the csv module to handle line endings correctly). We create a parser that yields raw dictionaries, pipe those through the normalization function, and iterate through the results. Each result is a tuple of (bool, result); if the boolean is True, we have a valid transaction and format it as compact JSON using json.dumps() with minimal separators. If False, the result is an error string we print directly. This pattern demonstrates the power of generator composition: parse_csv(fh) produces raw records, records_to_transactions(raw) validates and normalizes them, and the loop consumes the stream, formatting output appropriately.
The JSON Lines branch follows the same pattern with format-specific details:
The only difference is that we omit newline="" when opening the file (not needed for line-by-line reading) and call parse_jsonl instead of parse_csv. The rest of the pipeline is identical: normalize records, iterate through results, format successes as JSON, and print errors directly. This symmetry is intentional; by separating parsing from normalization, we can reuse the normalization logic regardless of input format. New formats only require a new parser function that yields dictionaries with the expected fields.
Let's examine the output to see how parsing, normalization, and validation work together:
The first eight lines show successful processing. Notice the compact JSON format with no spaces after separators, making the output easy to parse programmatically while remaining human-readable. Transaction 1 has quantity 3, and the total correctly reflects 10.50 × 3 = 31.50. Transaction 5 demonstrates that 2.50 EUR × 2 = 5.00 EUR, maintaining decimal precision. Operations are normalized to lowercase ("add", "refund"), and account names preserve their original capitalization with normalized spacing.
The error lines reveal where validation caught problems. Transaction 9 failed with "amount must be > 0.00", indicating the source data contained a zero or negative amount. Transaction 10 has an invalid operation type (neither "add" nor "refund"). Transaction 11 has an empty account name after whitespace normalization. Transaction 12 shows a decimal.ConversionSyntax error, meaning the amount field contained non-numeric text that couldn't be parsed as a Decimal. Each error includes the record ID for easy identification and the specific validation message for quick diagnosis.
Transaction 13 demonstrates that our validation is permissive where appropriate: the "XYZ" currency code is accepted because it's three alphabetic characters. While not a real ISO currency, the domain model doesn't enforce a whitelist; it validates format but leaves currency conversion to downstream systems that have access to exchange rate data.
Excellent work completing this lesson! You've built a complete ingestion pipeline that streams CSV and JSON Lines data, normalizes field names and types with graceful error handling, validates business rules through your domain model, and produces clean, strongly-typed output. The streaming architecture maintains constant memory usage regardless of file size, enabling production-scale processing.
The key architectural decisions we made include separating parsing from normalization (so new formats only require new parsers), using generator composition for natural backpressure, yielding success/failure tuples for robust error handling, and delegating validation to the domain model rather than duplicating rules in parsers. These patterns scale; as your ETL system grows, you can add format-specific parsers, extend field extraction logic, or enhance error reporting without restructuring the core pipeline.
In the next lesson, we'll build on this foundation by introducing structural pattern matching for advanced routing logic. You'll use Python's match/case syntax to dispatch transactions to different processing paths based on operation type, account patterns, and amount thresholds, implementing complex business logic with declarative, readable code. This will transform your pipeline from simple parsing to intelligent routing.
The upcoming practice section will challenge you to implement these parsers and normalization routines yourself. You'll complete the CSV parser, build the JSON Lines parser, construct the normalization pipeline with error handling, and orchestrate everything in the main script. These hands-on exercises will deepen your understanding of generator patterns, error handling strategies, and pipeline composition. Let's transform raw data into validated transactions together!
