Dagster & Airflow with components
dg
and Dagster Components are under active development. You may encounter feature gaps, and the APIs may change. To report issues or give feedback, please join the #dg-components channel in the Dagster Community Slack.
The dagster-airlift library provides an AirflowInstanceComponent
which can be used to represent Airflow DAGs in Dagster.
Setup and peering
1. Create a components-ready Dagster project
To begin, you'll need a components-ready Dagster project. To create one, follow the steps in "Creating a project with dg".
Next, you will need to add the dagster-airlift
library to the project:
uv add dagster-airlift[core]
2. Scaffold an AirflowInstanceComponent
Currently dagster-airlift
only supports basic authentication against an Airflow instance.
To scaffold a new component in your project, use the dg scaffold defs
command:
dg scaffold dagster_airlift.core.components.AirflowInstanceComponent airflow --name my_airflow --auth-type basic_auth
This will create a component definition file called component.yaml
in your project under the defs/airflow
directory.
tree src/my_project/defs
src/my_project/defs
├── __init__.py
└── airflow
└── component.yaml
2 directories, 2 files
4. Update component.yaml
with Airflow configuration
By default, the Airlift component reads values from the environment variables AIRFLOW_WEBSERVER_URL
, AIRFLOW_USERNAME
, and AIRFLOW_PASSWORD
. While you should never include your password directly in this file, you can update component.yaml
to add the webserver URL and username:
cat src/my_project/defs/airflow/component.yaml
type: dagster_airlift.core.components.AirflowInstanceComponent
attributes:
name: my_airflow
auth:
type: basic_auth
webserver_url: '{{ env("AIRFLOW_WEBSERVER_URL") }}'
username: '{{ env("AIRFLOW_USERNAME") }}'
password: '{{ env("AIRFLOW_PASSWORD") }}'
Once you have added these values, the following will happen:
- Dagster will create a sensor called
your_airlift_instance__airflow_monitoring_job_sensor
that is responsible for detecting runs in your Airflow instance and pulling them into Dagster. - Your Airflow DAGs will be represented in Dagster in the "Jobs" page, and any jobs pulled from Airflow will be marked with an Airflow icon.
- Airflow datasets will be represented in Dagster as assets.
- When an Airflow DAG executes, that run will be represented in Dagster.
Mapping Dagster assets to Airflow tasks
Once you have represented your Airflow instance in Dagster using the Airflow instance component, you may want to represent the graph of asset dependencies produced by that DAG as well, which you can do in component.yaml
.
DAG-level mapping
You can manually define which assets are produced by a given Airflow DAG by editing mappings
in component.yaml
:
type: dagster_airlift.core.components.AirflowInstanceComponent
attributes:
name: my_airflow
auth:
type: basic_auth
webserver_url: '{{ env("AIRFLOW_WEBSERVER_URL") }}'
username: '{{ env("AIRFLOW_USERNAME") }}'
password: '{{ env("AIRFLOW_PASSWORD") }}'
mappings:
- dag_id: upload_source_data
assets:
- spec:
key: order_data
- spec:
key: activity_data
- spec:
key: aggregated_user_data
deps: [order_data, activity_data]
Task-level mapping
If you have a more specific mapping from a task within the dag to a set of assets, you can also set these mappings at the task level:
type: dagster_airlift.core.components.AirflowInstanceComponent
attributes:
name: my_airflow
auth:
type: basic_auth
webserver_url: '{{ env("AIRFLOW_WEBSERVER_URL") }}'
username: '{{ env("AIRFLOW_USERNAME") }}'
password: '{{ env("AIRFLOW_PASSWORD") }}'
mappings:
- dag_id: upload_source_data
task_mappings:
- task_id: upload_orders
assets:
- spec:
key: order_data
- task_id: upload_activity
assets:
- spec:
key: activity_data
- task_id: aggregate_user_data
assets:
- spec:
key: aggregated_user_data
deps: [order_data, activity_data]