Airflow [1]

Airflow 简介

Posted by ZYT on November 21, 2018

Airflow

Airflow

Airflow is a platform to programmatically author, schedule and monitor workflows.

Use airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.

DAG

DAG

In mathematics and computer science, a directed acyclic graph (DAG /ˈdæɡ/ (About this sound listen)), is a finite directed graph with no directed cycles. That is, it consists of finitely many vertices and edges, with each edge directed from one vertex to another, such that there is no way to start at any vertex v and follow a consistently-directed sequence of edges that eventually loops back to v again. Equivalently, a DAG is a directed graph that has a topological ordering, a sequence of the vertices such that every edge is directed from earlier to later in the sequence.

在 Airflow 中所有的工作流都是 DAGs 。一个 DAG 由一系列 Operator 组成(以下是 Airflow 内置的一些 Operator ):

  • BashOperator - executes a bash command
  • PythonOperator - calls an arbitrary Python function
  • EmailOperator - sends an email
  • SimpleHttpOperator - sends an HTTP request
  • MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, etc. - executes a SQL command

同时,用户可以自定义 Operator

安装

pip

# airflow need a home, ~/airflow is the default.
export AIRFLOW_HOME=~/airflow

# install from pypi using pip
pip install apache-airflow

# initialize the database
airflow initdb

# start the web server
airflow webserver -p 8080 -D

# 在 web 界面中可以看到很多 example,可以在启动时闭关
# 在 airflow.cfg 中设置 load_examples 为 FALSE

# start the scheduler
airflow scheduler

docker [建议采用 docker 安装]

# 本 image 基于 puckel/docker-airflow 进行构建,在其中安装 vim,并且修改 config.cfg 文件,禁止加载 examples
# pull image
docker pull zyt312074545/docker-airflow

# start the web server
docker run -d -p 8080:8080 zyt312074545/docker-airflow webserver

# bash
docker exec -it <container-id> bash

Example

创建 DAG 文件,文件夹的位置可以查看 airflow.cfg[core]dags_folder 参数:

cat airflow.cfg | grep dags_folder

cd <dags_folder>

touch test_dag.py

test_dag.py 文件:

import datetime as dt

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator


def greet():
    print('Writing in file')
    with open('greet.txt', 'a+', encoding='utf8') as f:
        now = dt.datetime.now()
        t = now.strftime("%Y-%m-%d %H:%M")
        f.write(str(t) + '\n')
    return 'Greeted'

def respond():
    return 'Greet Responded Again'

default_args = {
    'owner': 'airflow',
    'start_date': dt.datetime(2018, 11, 18, 00, 00, 00),
    'concurrency': 1,
    'retries': 0
}

# 如果你的 start_date 是过去某个时间,设置 catchup=False 可以避免回填任务
with DAG('test_dag', catchup=False, default_args=default_args, schedule_interval='*/10 * * * *') as dag:
    opr_hello = BashOperator(task_id='say_Hi', bash_command='echo "Hi!!"')
    opr_greet = PythonOperator(task_id='greet', python_callable=greet)
    opr_respond = PythonOperator(task_id='respond', python_callable=respond)

测试代码:

python test_dag.py

查看此次 DAG 中的所有 task:

$ airflow list_tasks test_dag
[2018-11-18 08:31:31,531]  INFO - Using executor SequentialExecutor
[2018-11-18 08:31:31,628]  INFO - Filling up the DagBag from /usr/local/airflow/dags
greet
respond
say_Hi

运行 DAG

airflow scheduler

现在在 Web UI 界面就可以看到任务正在运行。