Skip to main content

Dagster & Airflow with components

info

dg and Dagster Components are under active development. You may encounter feature gaps, and the APIs may change. To report issues or give feedback, please join the #dg-components channel in the Dagster Community Slack.

The dagster-airlift library provides an AirflowInstanceComponent which can be used to represent Airflow DAGs in Dagster.

Setup and peering

1. Create a components-ready Dagster project

To begin, you'll need a components-ready Dagster project. To create one, follow the steps in "Creating a project with dg".

Next, you will need to add the dagster-airlift library to the project:

uv add dagster-airlift[core]

2. Scaffold an AirflowInstanceComponent

note

Currently dagster-airlift only supports basic authentication against an Airflow instance.

To scaffold a new component in your project, use the dg scaffold defs command:

dg scaffold dagster_airlift.core.components.AirflowInstanceComponent airflow --name my_airflow --auth-type basic_auth

This will create a component definition file called component.yaml in your project under the defs/airflow directory.

tree src/my_project/defs
src/my_project/defs
├── __init__.py
└── airflow
└── component.yaml

2 directories, 2 files

4. Update component.yaml with Airflow configuration

By default, the Airlift component reads values from the environment variables AIRFLOW_WEBSERVER_URL, AIRFLOW_USERNAME, and AIRFLOW_PASSWORD. While you should never include your password directly in this file, you can update component.yaml to add the webserver URL and username:

cat src/my_project/defs/airflow/component.yaml
type: dagster_airlift.core.components.AirflowInstanceComponent

attributes:
name: my_airflow
auth:
type: basic_auth
webserver_url: '{{ env("AIRFLOW_WEBSERVER_URL") }}'
username: '{{ env("AIRFLOW_USERNAME") }}'
password: '{{ env("AIRFLOW_PASSWORD") }}'

Once you have added these values, the following will happen:

  1. Dagster will create a sensor called your_airlift_instance__airflow_monitoring_job_sensor that is responsible for detecting runs in your Airflow instance and pulling them into Dagster.
  2. Your Airflow DAGs will be represented in Dagster in the "Jobs" page, and any jobs pulled from Airflow will be marked with an Airflow icon.
  3. Airflow datasets will be represented in Dagster as assets.
  4. When an Airflow DAG executes, that run will be represented in Dagster.

Mapping Dagster assets to Airflow tasks

Once you have represented your Airflow instance in Dagster using the Airflow instance component, you may want to represent the graph of asset dependencies produced by that DAG as well, which you can do in component.yaml.

DAG-level mapping

You can manually define which assets are produced by a given Airflow DAG by editing mappings in component.yaml:

type: dagster_airlift.core.components.AirflowInstanceComponent

attributes:
name: my_airflow
auth:
type: basic_auth
webserver_url: '{{ env("AIRFLOW_WEBSERVER_URL") }}'
username: '{{ env("AIRFLOW_USERNAME") }}'
password: '{{ env("AIRFLOW_PASSWORD") }}'
mappings:
- dag_id: upload_source_data
assets:
- spec:
key: order_data
- spec:
key: activity_data
- spec:
key: aggregated_user_data
deps: [order_data, activity_data]

Task-level mapping

If you have a more specific mapping from a task within the dag to a set of assets, you can also set these mappings at the task level:

type: dagster_airlift.core.components.AirflowInstanceComponent

attributes:
name: my_airflow
auth:
type: basic_auth
webserver_url: '{{ env("AIRFLOW_WEBSERVER_URL") }}'
username: '{{ env("AIRFLOW_USERNAME") }}'
password: '{{ env("AIRFLOW_PASSWORD") }}'
mappings:
- dag_id: upload_source_data
task_mappings:
- task_id: upload_orders
assets:
- spec:
key: order_data
- task_id: upload_activity
assets:
- spec:
key: activity_data
- task_id: aggregate_user_data
assets:
- spec:
key: aggregated_user_data
deps: [order_data, activity_data]