安装Airflow

pip install "apache-airflow[celery]==3.0.6" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.0.6/constraints-3.9.txt"

初始化

# 初始化数据库
airflow db init

# 创建用户
airflow users create \
    --username admin \
    --firstname Peter \
    --lastname Parker \
    --role Admin \
    --email [email protected]

# 启动web服务
airflow webserver --port 8080

# 启动调度器
airflow scheduler

# 单点模式启动
airflow standalone

第一个DAG命令

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

# 默认参数,会传递给每个 Operator
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# 定义 DAG
with DAG(
    'my_first_dag',           # DAG 的唯一 ID
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1), # 每天执行一次
    start_date=datetime(2023, 1, 1),     # 开始日期
    catchup=False,            # 是否补跑过去错过的任务,False 表示不补跑
    tags=['example'],
) as dag:

    # 任务 1:使用 BashOperator 执行一个 echo 命令
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    # 任务 2:使用 PythonOperator 调用一个 Python 函数
    def print_hello():
        print("Hello from Airflow!")

    t2 = PythonOperator(
        task_id='print_hello',
        python_callable=print_hello,
    )

    # 任务 3:再执行一个 Bash 命令
    t3 = BashOperator(
        task_id='echo_done',
        bash_command='echo "All done!"',
    )

    # 定义任务之间的依赖关系
    # 这意味着 t1 先执行,然后是 t2,最后是 t3
    t1 >> t2 >> t3

    # 另一种写法:
    # t1.set_downstream(t2)
    # t2.set_downstream(t3)