Skip to content

Airflow

  • Open-source tool for efficiently creating, scheduling and monitoring data tasks & workflows.
  • airflow.cfg: configuration file

Architecture

airflow_arch.png

Sample Real-World UseCase

airflow_usecase.png

DAG (Directed Acyclic Graph)

  • It is a workflow.
    • Directed: Dependencies have a specific direction
    • Acyclic: No cycles or loops
    • Graph: Diagram consisting of nodes and edges
  • DAG consists of one or more tasks.
  • It can also have schedules defining when to run & intervals.

Set-up (Docker)

compose.yaml

compose.yaml
#services:
  # source_postgres:
  #   image: postgres:9.3
  #   ports:
  #     - "5433:5422"
  #   networks:
  #     - elt_network
  #   environment:
  #     POSTGRES_DB: source_db
  #     POSTGRES_USER: postgres
  #     POSTGRES_PASSWORD: secret

  # destination_postgres:
  #   image: postgres:9.3
  #   ports:
  #     - "5434:5432"
  #   networks:
  #     - elt_network
  #   environment:
  #     POSTGRES_DB: destination_db
  #     POSTGRES_USER: postgres
  #     POSTGRES_PASSWORD: secret

x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.2}
  # build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: 'lvXNEIfNExmy87tANUpo75TVr3DNvq3nag5PaI2hhOw='
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
  volumes:
    - ./airflow/dags:/opt/airflow/dags
    - ./airflow/logs:/opt/airflow/logs
    - ./airflow/plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
  depends_on:
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always

  redis:
    image: redis:latest
    ports:
      - 6379:6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    restart: always

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    restart: always

  airflow-init:
    <<: *airflow-common
    command: version
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}

  flower:
    <<: *airflow-common
    command: celery flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

volumes:
  postgres-db-volume:

Important

Make sure to do: echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env at the same level as compose.yaml.

  • Dockerfile

    Dockerfile
    1
    2
    3
    4
    5
    # Use the official Airflow image as the base
    FROM apache/airflow:latest
    
    # Install the Docker provider for Airflow
    RUN pip install apache-airflow-providers-docker
    

  • Fernet key can be generated using python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())". For encryption, best practice in prod env.

Important

First, run the init-airflow container before starting the other ones ie. first do: docker compose up init-airflow -d, and then do: docker compose up.

Components

  • airflow.cfg: configuration file; stores stuff like dags_folder etc

Task

  • Operator is a blueprint while Task is the implementation.
  • Instantiated by specifying a particular operator. There are a lot of them like:
    • PythonOperator
    • BashOperator
    • DockerOperator
    • SnowflakeOperator
    • KubernetesPodOperator
    • AzureDataFactoryRunPipelineOperator
  • The operators take params. There can be different one. Example:
    1
    2
    3
    4
    5
    task1 = BashOperator(
        task_id='bash1',
        bash_command='curl something',
        dag=dag
    )
    

Order of task execution

Bitshift Operators (>>, <<)

task1 >> task2 >> task3

set_upstream() and set_downstream() functions

task1.set_upstream(task2)

chain() function

chain(task1, task2, task3)

Operators

Action Operators

  • executes something
  • Like:
    • PythonOperator
    • BashOperator
    • DockerOperator
    • SnowflakeOperator
    • KubernetesPodOperator

Transfer Operators

  • moves data from one place to another
  • Like:
    • S3ToRedshiftOperator
    • S3ToSnowflakeOperator

Sensor Operators

  • waits for something
  • Like:

    • S3KeySensor
    • RedshiftClusterSensor
    • SqlSensor
  • and various other types. Look for the apache airflow providers packages.

Variables

  • There may be some senstive info written in the DAG script.
  • We can provide such info as a variable in our dag scripts by creating a variable in the airflow.
  • For that:
    • Go to Admin
    • Click on Variables
    • And fill in the key:value pair.
    • Then reference in the dag script as: "{{ var.value.get('key_name') }}"
  • Helps in:
    • making dags dynamic
    • making them environment specific
    • security
    • the ease of implementing changes

Connections

  • they are configuration objects
  • store the details required to connect to external systems
  • For a new one:
    • Go to Admin
    • Go to Connections
    • Add a new one.
    • Then use the connection_id in the dag script.

Sensors

  • if sensed or event happens, trigger the workflow
  • for example, an s3 event. For S3, we can use S3KeySensor like this:
    wait_for_file = S3KeySensor(
        task_id='xxx',
        bucket_name='xxx',
        bucket_key='xxx',
        aws_conn_id='xxx',
        mode = 'poke', # poke or reschedule
        timeout=60*60*5,
        soft_fail=True, # if true, if timeouts, sensor task is skipped, downstream tasks are not executed and the dag is marked as success otherwise everything fails
        dag=dag
    )
    
  • Under Admin/Pools, the running slots means the running worker. If the task spends most of its time in sleeping state, better to switch the mode from poke to reschedule.
  • In reschedule mode, workers are associated only when it is checking or working and released when idle.

Deferrable & Triggers

  • Let's say we have an S3Sensor looking for any S3 event. Now, even if it is in reschedule mode, the worker nodes are occupied from time-to-time, deferrable creates a trigger.
  • The trigger is highly optimized async function which is triggered when a S3 event occurs.
  • Then that trigger triggers a worker node and performs the job allocated.
  • Hence, worker node are only called when required while in the meantime the job is deferred to the triggers.

XComs

  • pass data between the tasks
  • can only pass small amount of data to the metadata database:
    • Postgres: 1 Gb
    • SQLite: 2 Gb
    • MySQL: 64 Kb
  • can also configure xcom to use an external storage like S3, Azure Blob Storage
  • In the following code, the file in S3 will not be overwritten, instead a new file will be created
    xcom.py
    def fetch_fn(ti):
        filename = 'xxx'
        ti.xcom_push(key='file', value=filename)
    
    
    fetch_xrate = PythonOperator(
        task_id='fetch',
        python_callable=fetch_fn,
        provide_context=True,
        dag=dag
    )
    
    upload_to_s3 = LocalFilesystemToS3Operator(
        task_id='upload_to_s3',
        filename="/tmp/{{ ti.xcom_pull(task_ids='fetch', key='file') }}",
        dest_bucket='sleekdata',
        dest_key=f'oms/{{{{ ti.xcom_pull(task_ids="fetch", key="file") }}}}',
        aws_conn_id='aws_conn',
        dag=dag
    )
    
    # Define Dependencies
    fetch_xrate >> upload_to_s3
    
Info

view the XCom console from the Admin drop-down menu.

Hooks

  • both operators and hooks are python classes, the difference is in the level of abstraction
  • Level of abstraction:
    • Operators -> Hooks -> Low-level code
  • they are built-in to airflow. For example, S3 Hook offers various degrees of control of S3 bucket.
  • In simple words, if an operator is not available, the next stop is to look for hooks and if an hook is not available, the next stop is to write our own low-level code.

Datasets

  • logical representation of underlying data

Key benefits of Datasets

  • Producer-Consumer Use Cases
  • Cross DAG Dependencies (by just defining dummy Dataset("some_name"))
  • Cost Reduction (Datasets work without using worker slots)
  • Instead based on time-based scheduling, it is based on a Dataset-based scheduling
  • Common use case:
    • Say we create a producer.py which produces or loads the data into say S3
      dag = DAG(
          "data_producer_dag",
          default_args={"start_date": days_ago(1)},
          schedule_interval="0 23 * * *",
      )
      
      http_to_s3_task = HttpToS3Operator(
          task_id="data_producer_task",
          endpoint=Variable.get("web_api_key"),
          s3_bucket="sleek-data",
          s3_key="oms/xrate.json",
          aws_conn_id="aws_conn",
          http_conn_id=None,
          replace=True,
          dag=dag,
          outlets=[Dataset("s3://sleek-data/oms/xrate.json")] # outlets means it is a producer
      
    • Then, we create consumer.py which is used by analysts to query the data into snowflake.
      # Define the DAG
      dag = DAG(
          'data_consumer_dag',
          default_args={'start_date': days_ago(1)},
          schedule=[Dataset("s3://sleek-data/oms/xrate.json")], # schedule means consumer
      
      #   We usually code cron-based (or time based) schedule_interval as below:
      #   schedule_interval="0 23 * * *",
      )
      
      # Define the Task
      load_table = SnowflakeOperator(
          task_id='data_consumer_task',
          sql='./sqls/xrate_sf.sql', # a sql query file
          snowflake_conn_id='snowflake_conn',
          dag=dag
      )
      
  • we can check for dag dependencies under the Browser tab
  • we can also have multi-dataset dependency

Trigger Rules & Conditional Branching

  • look out for this
  • a good start is here: Video

TaskFlow API

  • offers significant boilerplate code reduction
  • offers python decorators for the same tasks Example
    Without API
    def extract(ti=None, **kwargs):
        # Extract logic here
        raw_data = "Raw order data"
        ti.xcom_push(key="raw_data", value=raw_data)
    
    def transform(ti=None, **kwargs):
        # Transformation logic here
        raw_data = ti.xcom_pull(task_ids="extract", key="raw_data")
        processed_data = f"Processed: {raw_data}"
        ti.xcom_push(key="processed_data", value=processed_data)
    
    def validate(ti=None, **kwargs):
        # Validation logic here
        processed_data = ti.xcom_pull(task_ids="transform", key="processed_data")
        validated_data = f"Validated: {processed_data}"
        ti.xcom_push(key="validated_data", value=validated_data)
    
    def load(ti=None, **kwargs):
        # Load logic here
        validated_data = ti.xcom_pull(task_ids="validate", key="validated_data")
        print(f"Data loaded successfully: {validated_data}")
    
    dag = DAG(
        'traditional_dag',
        default_args={'start_date': days_ago(1)},
        schedule_interval='0 21 * * *',
        catchup=False
    )
    
    extract_task = PythonOperator(
        task_id="extract",
        python_callable=extract,
        dag=dag,
    )
    
    transform_task = PythonOperator(
        task_id="transform",
        python_callable=transform,
        dag=dag,
    )
    
    validate_task = PythonOperator(
        task_id="validate",
        python_callable=validate,
        dag=dag,
    )
    
    load_task = PythonOperator(
        task_id="load",
        python_callable=load,
        dag=dag,
    )
    
    # Set Dependencies
    extract_task >> transform_task >> validate_task >> load_task
    

Example

With API
from airflow.decorators import task
from airflow.utils.dates import days_ago

@task
def extract():
    # Extract logic here
    return "Raw order data"
@task
def transform(raw_data):
    # Transform logic here
    return f"Processed: {raw_data}"
@task
def validate(processed_data):
    # Validate logic here
    return f"Validated: {processed_data}"
@task
def load(validated_data):
    # Load logic here
    print(f"Data loaded successfully: {validated_data}")

dag = DAG(
    'taskflow_api',
    default_args={'start_date': days_ago(1)},
    schedule_interval='0 21 * * *',
    catchup=False
)

with dag:
    load_task = load(validate(transform(extract())))
- we can also have a hybrid approach:
Hybrid Approach
from airflow import DAG
from airflow.decorators import task
from airflow.utils.dates import days_ago
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

@task
def extract():
    # Extract logic here
    return "Raw order data"

@task
def transform(raw_data):
    # Transform logic here
    return f"Processed: {raw_data}"

@task
def validate(processed_data):
    # Validate logic here
    return f"Validated: {processed_data}"

@task
def load(validated_data):
    # Load logic here
    print(f"Data loaded successfully: {validated_data}")

dag = DAG(
    'hybrid_dag',
    default_args={'start_date': days_ago(1)},
    schedule_interval='0 21 * * *',
    catchup=False
)

with dag:
    load_task = load(validate(transform(extract())))

    snowflake_task = SnowflakeOperator(
        task_id='Snowflake_task',
        sql='select 1',
        snowflake_conn_id='snowflake_conn',
    )

    load_task >> snowflake_task