airflow
安装Airflow
pip install "apache-airflow[celery]==3.0.6" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.0.6/constraints-3.9.txt"
初始化
# 初始化数据库
airflow db init
# 创建用户
airflow users create \
--username admin \
--firstname Peter \
--lastname Parker \
--role Admin \
--email [email protected]
# 启动web服务
airflow webserver --port 8080
# 启动调度器
airflow scheduler
# 单点模式启动
airflow standalone
配置文件修改
# airflow.cfg
# 修改为你的自定义路径
dags_folder = /path/to/your/dags/directory
常用任务命令
# 查看任务队列
airflow dags list-runs my_first_dag
# 查看当前dags列表
airflow dags list | grep my_first
# 查看当前运行的任务实例
airflow tasks list daily_datacheck
# 查看特定运行的任务状态
airflow dags state daily_datacheck
# 检查调度器
airflow jobs check --job-type SchedulerJob
# 查看任务
airflow tasks list my_first_dag
# 启动调度器
airflow scheduler --daemon
# 检查引用错误
airflow dags list-import-errors
# 暂停任务
airflow dags pause daily_datacheck
测试
# 语法测试
python -m py_compile datacheck/mincheck_smk_dag.py
# 查看DAG详情
airflow dags show daily_mincheck_smk
# 查看任务列表
airflow tasks list daily_mincheck_smk
# 测试特定任务(不实际运行)
airflow tasks test daily_datacheck check_prerequisites 2025-09-30
# 完整测试运行
airflow dags test daily_datacheck 2025-09-30
# 手动触发运行
airflow dags trigger daily_mincheck_smk
# 查看运行状态
airflow dags list-runs daily_mincheck_smk
第一个DAG命令
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
# 默认参数,会传递给每个 Operator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# 定义 DAG
with DAG(
'my_first_dag', # DAG 的唯一 ID
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1), # 每天执行一次
start_date=datetime(2023, 1, 1), # 开始日期
catchup=False, # 是否补跑过去错过的任务,False 表示不补跑
tags=['example'],
) as dag:
# 任务 1:使用 BashOperator 执行一个 echo 命令
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
# 任务 2:使用 PythonOperator 调用一个 Python 函数
def print_hello():
print("Hello from Airflow!")
t2 = PythonOperator(
task_id='print_hello',
python_callable=print_hello,
)
# 任务 3:再执行一个 Bash 命令
t3 = BashOperator(
task_id='echo_done',
bash_command='echo "All done!"',
)
# 定义任务之间的依赖关系
# 这意味着 t1 先执行,然后是 t2,最后是 t3
t1 >> t2 >> t3
# 另一种写法:
# t1.set_downstream(t2)
# t2.set_downstream(t3)