Last week, we discussed the rise of Apache Airflow as the de facto orchestrator for data science and machine learning and its critical shortcomings: It ignores data, exposes (and exacerbates) infrastructure complexity, and deals poorly with quickly evolving pipelines.
We’ve been focused on solving these problems with Aqueduct; our goal is to make running machine learning in the cloud simple. However, many data teams already have spent time and resources setting up Airflow, so ripping-and-replacing this infrastructure can be extremely costly. In this context, we’ve been thinking about how to get the best of both worlds.
We’ve built an Aqueduct integration with Airflow that allows you to define, deploy, and monitor a workflow in Aqueduct and use your Airflow cluster as the execution engine. In this post, we’ll use a simple prediction pipeline as a running example to show you:
A typical ML workflow can have many steps: data retrieval, cleaning, featurization, inference, post-processing, publishing, etc. Each step may have different resource requirements and more complicated pipelines will have parallel operators. Both Aqueduct and Airflow support any DAG structure, so to keep our diagrams & code simple, we’re going to abstract the intermediary stages into a single box. Our simplified pipelines has 3 stages: extracting data from the warehouse, running a model, and publishing the predictions:
One of Airflow’s biggest pain points is its complex process for authoring workflows. For our simple example workflow, the Airflow DAG file looks like the following. Defining this simplified workflow takes about 80 lines of code:
import io | |
import pandas as pd | |
from sklearn.linear_model import LinearRegression | |
from airflow.models import DAG | |
from airflow.operators.python import PythonVirtualenvOperator | |
from airflow.contrib.hooks.snowflake_hook import SnowflakeHook | |
from airflow.providers.amazon.aws.hooks.s3 import S3Hook | |
# Assume that you already created connections for Snowflake and AWS S3 | |
# following the instructions at: https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html. | |
# This is part of Airflow's API for credential management. | |
SNOWFLAKE_CONN_ID = "snowflake_default_conn" | |
S3_CONN_ID = "s3_default_conn" | |
S3_BUCKET_NAME = "prod_bucket" | |
S3_DATA_KEY = "us_customers" | |
S3_PREDICTIONS_KEY = "predictions" | |
SNOWFLAKE_PREDICTIONS_TABLE = "predictions" | |
# Assume the model is trained with some sample data | |
data = pd.read_csv('data.csv') | |
X = data.iloc[:, 0].values.reshape(-1, 1) | |
Y = data.iloc[:, 1].values.reshape(-1, 1) | |
linear_model = LinearRegression() | |
linear_model.fit(X, Y) | |
dag = DAG( | |
dag_id='prediction_workflow', | |
default_args={ | |
'retries': 0, | |
}, | |
start_date=datetime(2023, 1, 1, 1), | |
schedule_interval='0 8 * * *', | |
) | |
# The task_id is needed as a unique task identifier for this DAG | |
@task(task_id='extract') | |
def extract(): | |
''' | |
Performs the entire data extraction stage. This includes | |
reading the data from the data warehouse and then writing | |
it to a location that can be accessed by subsequent tasks. | |
''' | |
sf_hook = SnowflakeHook(snowflake_conn_id=SNOWFLAKE_CONN_ID) | |
with sf_hook.get_conn() as conn: | |
with conn.cursor() as cur: | |
cur.execute("SELECT * FROM customers WHERE location = 'US';") | |
# res is a list of dictionaries that map column name to value | |
res = cur.fetchall() | |
# Determine the column names | |
col_names = list(map(lambda t: t[0], cur.description)) | |
df = pd.DataFrame(res) | |
df.columns = col_names | |
# Save the DataFrame to AWS S3, so it can be accessed by the next operator | |
s3_hook = S3Hook(aws_conn_id=S3_CONN_ID) | |
buffer = io.BytesIO() | |
df.to_parquet(buffer, index=False) | |
buffer.seek(0, 0) # Reset stream position of buffer | |
s3_hook.load_file_obj(buffer, S3_DATA_KEY, S3_BUCKET_NAME) | |
@task(task_id='predict') | |
def predict(task_id='predict'): | |
''' | |
Performs linear regression on the input data. | |
Predictions are added as a new column named `score` in the input DataFrame. | |
In addition, we need to read the input data from the | |
location it was written to by the previous extract task. | |
The output then needs to be written to a location from where it can | |
be accessed by subsequent tasks. | |
''' | |
# Load the DataFrame from AWS S3 | |
s3_hook = S3Hook(aws_conn_id=S3_CONN_ID) | |
data_bytes = s3_hook.get_key(S3_DATA_KEY, S3_BUCKET_NAME) | |
data_file = io.BytesIO(data_bytes) | |
df = pd.read_parquet(data_file) | |
df['score'] = pd.DataFrame({"linear": linear_model.predict_proba(df)[:, 1]}) | |
# Save the predictions to AWS S3, so it can be accessed by the next operator | |
buffer = io.BytesIO() | |
df.to_parquet(buffer, index=False) | |
buffer.seek(0, 0) # Reset stream position of buffer | |
s3_hook.load_file_obj(buffer, S3_PREDICTIONS_KEY, S3_BUCKET_NAME) | |
@task(task_id='save') | |
def save(task_id='save'): | |
''' | |
Performs the data saving stage. This involves first reading the output of the | |
previous stage and then writing it to the data warehouse. | |
''' | |
# Load the predictions DataFrame from AWS S3 | |
s3_hook = S3Hook(aws_conn_id=S3_CONN_ID) | |
data_bytes = s3_hook.get_key(S3_PREDICTIONS_KEY, S3_BUCKET_NAME) | |
data_file = io.BytesIO(data_bytes) | |
df = pd.read_parquet(data_file) | |
# Save the predictions into a Snowflake table | |
sf_hook = SnowflakeHook(snowflake_conn_id=SNOWFLAKE_CONN_ID) | |
df.to_sql( | |
SNOWFLAKE_PREDICTIONS_TABLE, | |
con=sf_hook.get_conn(), | |
index=False, | |
) | |
extract_stage = PythonVirtualenvOperator( | |
task_id='extract', | |
python_callable=extract, | |
requirements=["pandas", "snowflake-sqlalchemy", "SQLAlchemy"], | |
dag=dag, | |
) | |
prediction_stage = PythonVirtualenvOperator( | |
task_id='prediction', | |
python_callable=predict, | |
requirements=["pandas", "scikit-learn"], | |
dag=dag, | |
) | |
save_stage = PythonVirtualenvOperator( | |
task_id='prediction', | |
python_callable=save, | |
requirements=["pandas", "snowflake-sqlalchemy", "SQLAlchemy"], | |
dag=dag, | |
) | |
extract_stage >> prediction_stage >> save_stage |
The biggest pain point in defining this workflow is data movement. While Airflow’s hooks allow you to access data systems, Airflow itself has no conception of your data. As a result, you must write custom logic to serialize and deserialize data into the correct format. For example, we use the SnowflakeHook
in the extract
operator above but wrote custom code to convert the query results into a DataFrame.
Once your data is in Airflow, data movement between operators is a challenge. Airflow uses XComs for data movement, but it is only designed for small data. Its documentation explicitly warns against moving “large values, like dataframes.” Depending on what metadata database you are using, you’re limited to [1]: Postgres: 1 GB, SQLite: 2 GB, MySQL: 64 KB. The alternative is to use a custom XComs backend, which supports AWS S3, Google Cloud Store, etc. However, configuring a custom XComs backend again requires custom serialization logic per-data type.
Aqueduct, on the other hand, has a simple Python-native API: You annotate your functions with @op
and invoke them regularly — Aqueduct automatically constructs the workflow DAG on your behalf.
Like Airflow, Aqueduct abstracts away data system access. Unlike Airflow’s limitations, Aqueduct treats data as a first class citizen; data objects are moved seamlessly between data systems and operators. Aqueduct handles the serialization process, which means your code can operate on Python-native data types and avoid reinventing the wheel for data movement. Put together, our workflow can be defined in just 22 lines of code (4x shorter than Airflow!) and it looks like this:
import aqueduct as aq | |
import pandas as pd | |
from sklearn.linear_model import LinearRegression | |
client = aq.Client("API_KEY", "SERVER_ADDRESS") | |
# Extract data from warehouse | |
# These 2 lines replace the entire `extract_stage` task in the Airflow. | |
warehouse = client.integration(name="prod_database") | |
input_data = warehouse.sql(query="SELECT * FROM customers WHERE location = 'US';") | |
# Assume the model is trained with some sample data | |
data = pd.read_csv('data.csv') | |
X = data.iloc[:, 0].values.reshape(-1, 1) | |
Y = data.iloc[:, 1].values.reshape(-1, 1) | |
linear_model = LinearRegression() | |
linear_model.fit(X, Y) | |
@aq.op | |
def predict(df: pd.DataFrame) -> pd.DataFrame: | |
''' | |
This is the exact same predict function as defined above. | |
The only difference is the decorator @aq.op, which indicates | |
that this Python code is part of your workflow. | |
''' | |
df['score'] = pd.DataFrame({"linear": linear_model.predict_proba(df)[:, 1]}) | |
return df | |
predictions = predict(input_data) | |
# Save predictions to warehouse | |
# This 1 line replaces the entire `save_stage` task in the Airflow file. | |
warehouse.save(predictions, table_name="predictions") | |
# Schedule this workflow to run daily on your existing Airflow cluster | |
# This assumes that you have connected your Airflow cluster as an integration | |
# with the name `airflow_cluster` | |
# This 1 line replaces the entire `DAG(...)` clause in the Airflow file. | |
flow = client.publish_flow( | |
name="Customer Churn", | |
artifacts=[predictions], | |
schedule=aq.daily(), | |
# The engine is configured by registering an Airflow integration | |
# with Aqueduct. When registering it, you provide the necessary credentials | |
# for accessing the cluster. | |
engine="airflow_cluster", | |
) |
To deploy an Aqueduct workflow on Airflow, we transpile our internal DAG representation into Airflow-compatible code. We built a DAG generator that first transforms each Aqueduct operator into an Airflow task. Each task includes configuration for the required Python environment and an Aqueduct wrapper that handles data movement, Python version management, and metadata capture. These tasks are composed into an Airflow DAG.
Operators you write can accept and return Python objects (DataFrames, JSON blobs, etc.). The Aqueduct wrapper manages data serialization, automatically picking a per-data type serialization method chosen to balance performance and storage cost. In between operators, data is automatically written to S3 to avoid overloading XComs.
Once a workflow is running, Airflow provides minimal visibility into its execution. Airflow is a task orchestrator, so it captures task statuses but has no context around the data in your workflow — both from data systems and in between operators. Debugging issues in your data requires you to snapshot and version your data manually and likely to build custom solutions to visualize or analyze it.
Worse yet, Airflow does not visualize changes to a workflow’s structure, since it does not explicitly track this evolution. Consider the example below of a workflow that starts off with 3 operators.
If we add a 4th operator, subsequent runs show the new structure, as expected.
However, the updated structure is shown for past runs as well. The transform
operator is shown with a white border, indicating that it did not exist for this particular run. More alarmingly, if we remove the transform
operator, all past runs would be shown without it — including runs that had the operator!
Airflow’s lack of visibility into your data and workflow changes makes debugging an absolute nightmare.
Unlike Airflow, Aqueduct explicitly tracks the evolution of your workflow. This enables the UI to show the workflow structure at the time of each run, as operators are added or removed.
Within each workflow run, Aqueduct provides a preview of all data (including intermediate results), for a wide range of data types: tabular, numeric, text, images, and more.
Finally, Aqueduct also captures the code, logs, and stack traces for every operator:
Beyond visibility, Aqueduct also makes it easy for you to monitor your workflows: Metrics are numerical measurements of the data generated by your workflow, and checks allow you to specify correctness constraints.
Aqueduct’s data versions and metadata (logs, errors, stack traces) are critical for debugging and monitoring. To keep the critical path of workflow execution lean, we do not synchronously communicate metadata from Airflow to Aqueduct. Instead, Aqueduct periodically queries the Airflow API to capture and sync workflow runs and task statuses. Unfortunately, the other metadata generated by the Aqueduct wrapper is not available in this API.
Avoiding synchronous communication results in a new coordination challenge: determining where data and metadata are stored. We solve this by having Aqueduct generate a UUID prefix for each operator’s metadata — this UUID is specified in the generated Airflow DAG. At execution time, the Aqueduct wrapper concatenates the prefix with Airflow’s run ID to form a unique storage location. This allows Aqueduct to deterministically access each workflow’s data and metadata.
Airflow adoption has grown rapidly in recent years, and it isn’t going away. It excels at orchestrating workflow and managing resources, but as we’ve discussed, it’s not effective for machine learning. With Aqueduct, you can take advantage of Airflow's orchestration and resource management, while using our simple Python API and purpose-built monitoring UI.
Aqueduct on Airflow still has some key limitations. In particular, it doesn’t completely abstract away infrastructure concerns. You must still configure and run an Airflow cluster; most Airflow deployments are hosted on Kubernetes, so you will also need to manage the Docker containers required for execution.[2] It’s not perfect, but we think it’s a whole lot better!
We’ll be sharing more on other integrations we’re building soon, like Apache Spark + Databricks and Ray. If you’re interested in learning more, check out what we’re building or join our community Slack to share your thoughts (even if you think we’re wrong!).
We'd love to hear from you! Star us on GitHub, join our community, or start a discussion.
© 2023 Aqueduct, Inc. All rights reserved.