Declarative DAG Reference

The actual schema definition can be found in the airflow_declarative/schema.py module. Some examples of complete DAGs are also available in the tests/dags/good directory.

This document contains a verbose description of the declarative DAG schema.

The anatomy of a declarative DAG:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# The comments below specify the names of the schema definition
# atoms which can be found in the `airflow_declarative/schema.py`
# module.

dags:
  my_dag:  # the DAG name

    defaults:
      sensors:
        # `SENSOR`
        args:
          # `SENSOR_ARGS`
          queue: my_sensors_queue
      operators:
        # `OPERATOR`
        args:
          # `OPERATOR_ARGS`
          queue: my_operators_queue

    args:
      # `DAG_ARGS`
      start_date: 2019-07-01
      schedule_interval: 1d
      default_args:
        # `SENSOR_ARGS` | `OPERATOR_ARGS`
        owner: my_name

    sensors:
      my_sensor:
        # `SENSOR`
        callback: myproject.mymodule:my_sensor_callback
        callback_args:
          my_kwarg: my_value
        args:
          # `SENSOR_ARGS`
          poke_interval: 1m

    operators:
      my_operator:
        # `OPERATOR`
        callback: myproject.mymodule:my_operator_callback
        callback_args:
          my_kwarg: my_value
        args:
          # `OPERATOR_ARGS`
          retries: 3

    flow:
      # `FLOW`
      my_sensor:
      - my_operator

    do:
    # `DO_TEMPLATE`
    - operators:
        my_crops_{{ item.name }}:
          # `OPERATOR`
          callback: myproject.myfruits:my_crops
          callback_args:
            fruit: '{{ item.fruit_props }}'
      # `sensors` can be used there too!
      flow:
        # `FLOW`
        my_operator:
        - my_crops_{{ item.name }}
      with_items:
      # `WITH_ITEMS`
      - name: pineapple
        fruit_props:
          shape: ellipsoid
          color: brown
      - name: watermelon
        fruit_props:
          shape: round
          color: green

DAG_ARGS

DAG_ARGS atom defines the __init__ arguments of an Airflow DAG. The actual meaning of these args can be found in the airflow.models.DAG doc page.

OPERATOR / SENSOR

OPERATOR and SENSOR atoms look similarly, except that their args schemas are different. They both define an Airflow operator (note that Sensors in Airflow are considered to be Operators).

For an Operator, the args (the OPERATOR_ARGS atom) are the __init__ args of the airflow.models.BaseOperator.

For a Sensor, the args (the SENSOR_ARGS atom) are the __init__ args of the airflow.sensors.base_sensor_operator.BaseSensorOperator.

The OPERATOR/SENSOR callable might be specified as a class. Example for airflow.operators.bash_operator.BashOperator:

class: airflow.operators.bash_operator:BashOperator
args:
  bash_command: 'echo "Hello World {{ ds }}"'

… or as a Python callable:

callback: myproject.mymodule:my_operator_callback
callback_args:
  my_kwarg: my_value
args:
  retries: 3

If callback value is a function, then it should look like this:

def my_operator_callback(context, my_kwarg):
    print("Execution date", conext["ds"])
    print("my_kwarg", my_kwarg)

The callback might also be a class:

class MyOperatorCallback:
    def __init__(self, context, my_kwarg):
        self.ds = context["ds"]
        self.my_kwarg = my_kwarg

    def __call__(self):
        print("Execution date", self.ds)
        print("my_kwarg", self.my_kwarg)

callback_args key is relevant only when callback is used (i.e. it cannot be defined with class). The distinction between the args and the callback_args is simple:

  • args are the __init__ args for the airflow.models.BaseOperator, which is used under the hood to wrap the callback;
  • callback_args are the additional kwargs which would be passed to the callback along with the task context.

default_args / defaults

default_args is a standard airflow.models.DAG __init__ arg which specifies the default args of a airflow.models.BaseOperator. These args would be supplied to all DAG’s operators and sensors.

The defaults dict is a Declarative’s extension which allows to specify the args more granularly: only to sensors or only to operators (note that defaults specified in operators would not be applied to sensors).

FLOW

The FLOW atom defines the DAG links between the operators.

FLOW is a dict of lists, where a key is a downstream operator name, and a value is a list of upstream operators.

Consider the following flow:

my_sensor:
- my_task_1
- my_task_2

my_task_1:
- my_task_3

Assuming that the Airflow operators are assigned to variables, the Python equivalent would be:

my_sensor.set_upstream(my_task_1)
my_sensor.set_upstream(my_task_2)

my_task_1.set_upstream(my_task_3)

This would be rendered in the Airflow web-interface like this:

  • Tree view:

    _images/flow_tree_view.png
  • Graph view:

    _images/flow_graph_view.png

DO (with_items)

The do block allows to make the DAG schema dynamic.

A do value is a list of dicts, each dict (a DO_TEMPLATE) must contain a with_items key and might optionally contain operators, sensors and flow – these have the same schema as the corresponding keys of the DAG.

with_items defines a list of items, which should be used to render a single DO_TEMPLATE block. Operators, Sensors and Flow within the block would be merged together (as dict unions).

There’re 3 different ways to define with_items:

  1. As a static list of items:

    with_items:
    - some_name: John
    - some_name: Jill
    
  2. As a Python callback, which returns a list of items:

    with_items:
      using: myproject.mymodule:my_with_items
    

    Where my_with_items is a Python function which might look like this:

    def my_with_items():
        return [
            {"some_name": "John"},
            {"some_name": "Jill"},
        ]
    
  3. As an external program, which prints to stdout a list of items in JSON:

    with_items:
      from_stdout: my_command --my-arg 42
    

    Where my_command is an executable in $PATH, which might look like this:

    #!/usr/bin/env ruby
    
    require 'json'
    
    print [
      {some_name: "John"},
      {some_name: "Jill"},
    ].to_json
    

operators, sensors and flow within the DO_TEMPLATE block should use Jinja2 templates to render the items.

The following DAG defined by a do block:

operators:
  my_operator:
    callback: myproject.mymodule:my_operator_callback
do:
- operators:
    my_crops_{{ item.name }}:
      callback: myproject.myfruits:my_crops
      callback_args:
        fruit: '{{ item.fruit_props }}'
  flow:
    my_operator:
    - my_crops_{{ item.name }}
  with_items:
  - name: pineapple
    fruit_props:
      shape: ellipsoid
      color: brown
  - name: watermelon
    fruit_props:
      shape: round
      color: green

… is equivalent to the following DAG defined statically:

operators:
  my_operator:
    callback: myproject.mymodule:my_operator_callback
  my_crops_pineapple:
    callback: myproject.myfruits:my_crops
    callback_args:
      fruit:
        shape: ellipsoid
        color: brown
  my_crops_watermelon:
    callback: myproject.myfruits:my_crops
    callback_args:
      fruit:
        shape: round
        color: green
flow:
 my_operator:
 - my_crops_pineapple
 - my_crops_watermelon