airflow
安装Airflow
pip install "apache-airflow[celery]==3.0.6" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.0.6/constraints-3.9.txt"
初始化
# 初始化数据库
airflow db init
# 创建用户
airflow users create \
--username admin \
--firstname Peter \
--lastname Parker \
--role Admin \
--email [email protected]
# 启动web服务
airflow webserver --port 8080
# 启动调度器
airflow scheduler
# 单点模式启动
airflow standalone
第一个DAG命令
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
# 默认参数,会传递给每个 Operator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# 定义 DAG
with DAG(
'my_first_dag', # DAG 的唯一 ID
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1), # 每天执行一次
start_date=datetime(2023, 1, 1), # 开始日期
catchup=False, # 是否补跑过去错过的任务,False 表示不补跑
tags=['example'],
) as dag:
# 任务 1:使用 BashOperator 执行一个 echo 命令
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
# 任务 2:使用 PythonOperator 调用一个 Python 函数
def print_hello():
print("Hello from Airflow!")
t2 = PythonOperator(
task_id='print_hello',
python_callable=print_hello,
)
# 任务 3:再执行一个 Bash 命令
t3 = BashOperator(
task_id='echo_done',
bash_command='echo "All done!"',
)
# 定义任务之间的依赖关系
# 这意味着 t1 先执行,然后是 t2,最后是 t3
t1 >> t2 >> t3
# 另一种写法:
# t1.set_downstream(t2)
# t2.set_downstream(t3)