Welcome to our first lesson on "Automating Retraining with Apache Airflow"! In this lesson, we'll start our journey into the world of workflow automation with Apache Airflow, a powerful open-source platform that allows us to programmatically author, schedule, and monitor workflows.
As machine learning practitioners, we often need to retrain our models regularly as new data becomes available. This process involves several steps: data extraction, preprocessing, model training, evaluation, and deployment. Manually executing these steps can be time-consuming and error-prone. This is where Apache Airflow comes in—it provides a framework to automate and orchestrate complex computational workflows.
In this course, we focus on Apache Airflow 2.x and its modern TaskFlow API. The TaskFlow API, introduced in Airflow 2, allows us to define workflows using Python functions and decorators, making DAGs more readable and maintainable compared to the older operator-based approach. All examples and practices in this course will use Airflow 2 and the TaskFlow API.
By the end of this lesson, we'll understand what Apache Airflow is, learn about Directed Acyclic Graphs (DAGs), and implement a simple workflow using Airflow's TaskFlow
API.
Apache Airflow is a platform created by Airbnb (now an Apache Software Foundation project) to programmatically author, schedule, and monitor workflows. At its core, Airflow uses Directed Acyclic Graphs (DAGs) to represent workflows. But what exactly is a DAG? Let's break it down:
- Graph: A graph is a mathematical structure made up of nodes connected by edges, and in the context of Airflow, it helps us visually and logically organize the sequence and dependencies of tasks in our workflow.
- Directed: The relationships between tasks have a specific direction. Task A may lead to Task B, but not vice versa.
- Acyclic: There are no cycles or loops in the workflow. You can't create circular dependencies where Task A depends on Task B, which depends on Task A.
In Airflow, each task in a workflow is represented as a node in the DAG, and the dependencies between tasks are represented as directed edges. This allows us to define complex workflows with multiple tasks and their dependencies in a clear, programmatic way.
For example, a simple ML retraining workflow might include these tasks in sequence: extract new data, preprocess data, train model, evaluate model, and deploy model (if evaluation metrics exceed a threshold). Airflow ensures these tasks execute in the correct order and handles scheduling, retries on failure, and provides visibility into the workflow's execution.
Now that we understand the concept of DAGs, let's create a simple Airflow DAG. We'll start with the basic structure and imports:
In this code, we're importing the necessary modules from Python's datetime
library and Airflow's decorators. The default_args
dictionary defines global settings for our DAG, specifying who owns it (owner
), whether it depends on past executions (depends_on_past
), email notification preferences, and retry configurations. These settings help Airflow know how to handle the DAG's execution—for instance, it will automatically retry a failed task once after waiting for 1 minute.
With our default arguments in place, we can now define the DAG using Airflow's TaskFlow API, which provides a more intuitive way to define workflows:
Here, we use the @dag
decorator to transform our Python function into an Airflow DAG. The parameters define critical aspects of our workflow's behavior: the dag_id
provides a unique name, schedule
sets it to run daily, and start_date
indicates when scheduling should begin. Setting catchup=False
prevents Airflow from executing the DAG for past periods, which is especially useful when first deploying a DAG with a start date in the past. The docstring clearly documents what our simple workflow will do, enhancing readability for anyone maintaining this code.
Now it's time to define the individual tasks for our DAG using the @task
decorator:
The @task
decorator transforms regular Python functions into Airflow tasks. Our first task, say_hello
, simply prints a message and returns the string "Hello". The second task, say_goodbye
, takes the output from the first task as a parameter, allowing us to demonstrate how data flows between tasks in Airflow. This is one of the powerful features of the TaskFlow API—it automatically handles the serialization, storage, and retrieval of data between tasks, making workflow development more intuitive and less boilerplate-heavy.
Note: Although it looks like the value is passed directly as a Python variable, Airflow actually passes data between tasks using its XCom (cross-communication) mechanism. The TaskFlow API makes this seamless, but under the hood, the result is serialized and stored by Airflow, not passed in-memory like a normal Python function call.
The final step is defining how our tasks should interact. In the TaskFlow API, this happens naturally through function calls:
This is where the elegant simplicity of the TaskFlow API shines. In older versions of Airflow, you had to use special symbols (like >>
or <<
) to manually set the order in which tasks run. With the TaskFlow API, you simply call your Python functions and pass data between them: this automatically creates the correct order and dependencies. When we call say_hello()
, it returns the string "Hello," which we assign to first_result
. By passing this variable to say_goodbye()
, we tell Airflow that the second task should wait for the first one to finish and use its result.
The final line creates our DAG object and assigns it to the variable dag
. This is a common convention, but it's not a strict requirement, since Airflow will still discover the DAG as long as the function is called at the module level.
When the DAG runs, it executes say_hello
first, then passes the returned value to say_goodbye
(via XCom). In the Airflow UI logs, you'd see "Hello from Airflow!" from the first task, followed by "Previous task said: Hello" and "Goodbye from Airflow!" from the second task.
When your DAG runs, Airflow generates detailed logs that provide insight into the execution process. While these logs can be quite verbose, they are invaluable for debugging and monitoring. Let's look at a simplified version of the output you might see for our hello_airflow_dag
:
Let's break down what these key lines tell us:
INFO - dagrun id: mlops_pipeline
: This indicates the start of a new DAG run for ourmlops_pipeline
DAG.- The next set of lines shows the execution of
hello_task
:INFO - [DAG TEST] starting task_id=hello_task ...
: Airflow begins executing thehello_task
.Hello from Airflow!
: This is theprint()
output from oursay_hello
function.INFO - Done. Returned value was: Hello
: The task completed and returned "Hello", which Airflow passes via XComs.INFO - Marking task as SUCCESS. ... task_id=hello_task, ...
: Thehello_task
finished successfully.
- Following that, we see the execution of
goodbye_task
:INFO - [DAG TEST] starting task_id=goodbye_task ...
: Airflow starts thegoodbye_task
.Previous task said: Hello
andGoodbye from Airflow!
: These are theprint()
outputs fromsay_goodbye
, confirming it received the "Hello" string from the first task.INFO - Done. Returned value was: None
: Thesay_goodbye
task completed (returningNone
as it has no explicit return).INFO - Marking task as SUCCESS. ... task_id=goodbye_task, ...
: Thegoodbye_task
also finished successfully.
INFO - Marking run <DagRun mlops_pipeline ...> successful
: Finally, this line confirms that the entire DAG run completed successfully.
This output confirms that our tasks executed in the correct order, data was passed between them as expected, and the overall workflow was successful. In the Airflow UI, you would see a graphical representation of this execution, with green boxes indicating successfully completed tasks.
For Airflow to discover and execute your DAGs, your Python files must be placed in a specific directory known as the DAGs folder. By default, this is the dags/
directory inside your Airflow home directory ($AIRFLOW_HOME/dags/
), but it can be configured differently in your Airflow settings.
When Airflow runs, it continuously scans the DAGs folder for Python files. Any file that contains a DAG definition (i.e., a variable or function that returns a DAG object) will be automatically detected and made available to the Airflow scheduler. This means you don't need to manually register your DAGs: just save your .py
file in the correct folder, and Airflow will handle the rest.
Best practices for organizing your DAG code:
- Place each DAG in its own Python file for clarity and maintainability.
- If you have shared code (such as utility functions or custom operators), consider placing them in a separate
utils/
orplugins/
directory and importing them into your DAG files.
Please note that, in this course, the CodeSignal environment is pre-configured so that any DAG code you write is automatically placed in the correct folder. You don't need to worry about file placement or Airflow configuration—just focus on writing your DAGs, and they'll be picked up and executed by Airflow behind the scenes.
In this lesson, we've taken our first steps with Apache Airflow by creating a simple DAG with two tasks. We've learned about the core concepts of Airflow: defining workflows as DAGs, creating tasks with the @task
decorator, and establishing dependencies between them. The TaskFlow API has made this process intuitive by letting us express workflows as regular Python functions while handling the complexities of data passing and dependency management behind the scenes.
While we've built a simple example, these fundamentals form the building blocks for creating complex, production-grade ML retraining pipelines. As you practice these concepts, experiment with adding more tasks, passing different types of data between them, and visualizing the resulting workflow graphs. In the upcoming lessons, we'll expand on these basics to build more sophisticated workflows that handle real machine learning tasks from data processing to model deployment.
