ETL 工具作为数据处理中的重要一环,除了之前介绍的 airflow
之外,还有很多工具可以使用,不如来一次华山论剑。
Bonobo
对于 Bonobo
,官网已经进行了清楚的定位:
Bonobo is an Extract Transform Load (or ETL) framework for the Python (3.5+) language.It is targeting small scale data.
安装
pip install bonobo
demo
流程图如下:
采用 Bonobo
编写流程:
import bonobo
def generate_date():
yield 'foo'
yield 'bar'
def uppercase(x: str):
return x.upper()
def output(x: str):
print(x)
graph = bonobo.Graph(
generate_date,
uppercase,
output
)
if __name__ == '__main__':
bonobo.run(graph)
//Output
FOO
BAR
- generate_date in=1 out=2 [done]
- uppercase in=2 out=2 [done]
- output in=2 [done]
对于实现一个简单的 ETL pipeline
来说,Bonobo
已经足够,但是对于复杂任务来说,Bonobo
有点力不从心。
luigi
安装
pip install luigi
demo
luigi
有两个关键概念:Tasks
和 Targets
。它们之间的关系如下:
采用 luigi
编写流程:
import luigi
class WritePipelineTask(luigi.Task):
def output(self):
return luigi.LocalTarget("data/output_one.txt")
def run(self):
with self.output().open("w") as output_file:
output_file.write("pipeline")
class AddMyTask(luigi.Task):
def output(self):
return luigi.LocalTarget("data/output_two.txt")
def requires(self):
return WritePipelineTask()
def run(self):
with self.input().open("r") as input_file:
line = input_file.read()
with self.output().open("w") as output_file:
decorated_line = "My "+line
output_file.write(decorated_line)
执行任务:
# –local-scheduler 设置 luigi 不连接 scheduler 服务,仅建议在测试环境使用
$ PYTHONPATH='.' luigi --module luigi_demo AddMyTask --local-scheduler
DEBUG: Checking if AddMyTask() is complete
DEBUG: Checking if WritePipelineTask() is complete
INFO: Informed scheduler that task AddMyTask__99914b932b has status PENDING
INFO: Informed scheduler that task WritePipelineTask__99914b932b has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 1288] Worker Worker(salt=951591659, workers=1, host=zyt.local, username=zyt, pid=1288) running WritePipelineTask()
INFO: [pid 1288] Worker Worker(salt=951591659, workers=1, host=zyt.local, username=zyt, pid=1288) done WritePipelineTask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task WritePipelineTask__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 1288] Worker Worker(salt=951591659, workers=1, host=zyt.local, username=zyt, pid=1288) running AddMyTask()
INFO: [pid 1288] Worker Worker(salt=951591659, workers=1, host=zyt.local, username=zyt, pid=1288) done AddMyTask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task AddMyTask__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=951591659, workers=1, host=zyt.local, username=zyt, pid=1288) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 2 ran successfully:
- 1 AddMyTask()
- 1 WritePipelineTask()
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
# 当前目录下出现一个 data 文件夹
$ ls
data luigi_demo.py
# 查看 data 文件夹
$ cd data
output_one.txt output_two.txt
# 分别查看文件内容
$ cat output_one.txt
pipeline
$ cat output_two.txt
My pipeline
Luigi 的特点
- 基于
Target
- 不支持分布式执行
Airflow
关于 Airflow
具体信息可以查看上一篇文章 Airflow 简介。
特点
- 基于
DAG
- 支持分布式执行
- 强大的 UI
相关链接
国外有个哥们列表详细对比了 Airflow
、 Luigi
和 Pinball
,并建议采用 Airflow
,详细信息可以移步luigi-airflow-pinball。