Airflow [2]

ETL 华山论剑

Posted by ZYT on November 21, 2018

ETL 工具作为数据处理中的重要一环,除了之前介绍的 airflow 之外,还有很多工具可以使用,不如来一次华山论剑。

Bonobo

对于 Bonobo ,官网已经进行了清楚的定位:

Bonobo is an Extract Transform Load (or ETL) framework for the Python (3.5+) language.It is targeting small scale data.

安装

pip install bonobo

demo

流程图如下:

流程图

采用 Bonobo 编写流程:

import bonobo

def generate_date():
    yield 'foo'
    yield 'bar'

def uppercase(x: str):
    return x.upper()

def output(x: str):
    print(x)

graph = bonobo.Graph(
    generate_date,
    uppercase,
    output
)

if __name__ == '__main__':
    bonobo.run(graph)

//Output
FOO
BAR
 - generate_date in=1 out=2 [done]
 - uppercase in=2 out=2 [done]
 - output in=2 [done]

对于实现一个简单的 ETL pipeline 来说,Bonobo 已经足够,但是对于复杂任务来说,Bonobo 有点力不从心。

luigi

安装

pip install luigi

demo

luigi 有两个关键概念:TasksTargets。它们之间的关系如下:

luigi

采用 luigi 编写流程:

import luigi


class WritePipelineTask(luigi.Task):

    def output(self):
        return luigi.LocalTarget("data/output_one.txt")

    def run(self):
        with self.output().open("w") as output_file:
            output_file.write("pipeline")


class AddMyTask(luigi.Task):

    def output(self):
        return luigi.LocalTarget("data/output_two.txt")

    def requires(self):
        return WritePipelineTask()

    def run(self):
        with self.input().open("r") as input_file:
            line = input_file.read()

        with self.output().open("w") as output_file:
            decorated_line = "My "+line
            output_file.write(decorated_line)

执行任务:

# –local-scheduler 设置 luigi 不连接 scheduler 服务,仅建议在测试环境使用
$ PYTHONPATH='.' luigi --module luigi_demo AddMyTask --local-scheduler
DEBUG: Checking if AddMyTask() is complete
DEBUG: Checking if WritePipelineTask() is complete
INFO: Informed scheduler that task   AddMyTask__99914b932b   has status   PENDING
INFO: Informed scheduler that task   WritePipelineTask__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 1288] Worker Worker(salt=951591659, workers=1, host=zyt.local, username=zyt, pid=1288) running   WritePipelineTask()
INFO: [pid 1288] Worker Worker(salt=951591659, workers=1, host=zyt.local, username=zyt, pid=1288) done      WritePipelineTask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   WritePipelineTask__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 1288] Worker Worker(salt=951591659, workers=1, host=zyt.local, username=zyt, pid=1288) running   AddMyTask()
INFO: [pid 1288] Worker Worker(salt=951591659, workers=1, host=zyt.local, username=zyt, pid=1288) done      AddMyTask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   AddMyTask__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=951591659, workers=1, host=zyt.local, username=zyt, pid=1288) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 AddMyTask()
    - 1 WritePipelineTask()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

# 当前目录下出现一个 data 文件夹
$ ls
data luigi_demo.py

# 查看 data 文件夹
$ cd data
output_one.txt output_two.txt

# 分别查看文件内容
$ cat output_one.txt
pipeline

$ cat output_two.txt
My pipeline

Luigi 的特点

  • 基于 Target
  • 不支持分布式执行

Airflow

关于 Airflow 具体信息可以查看上一篇文章 Airflow 简介

特点

  • 基于 DAG
  • 支持分布式执行
  • 强大的 UI

相关链接

国外有个哥们列表详细对比了 AirflowLuigiPinball ,并建议采用 Airflow,详细信息可以移步luigi-airflow-pinball