[TOC]
ETL
概述
ETL 的英文全称是 Extract-Transform-Load 的缩写,用来描述将数据从来源迁移到目标的几个过程:
- 1.Extract 数据抽取,也就是把数据从数据源读出来。
- 2.Transform 数据转换,把原始数据转换成期望的格式和维度。如果用在数据仓库的场景下,Transform也包含数据清洗,清洗掉噪音数据。
- 3.Load 数据加载,把处理后的数据加载到目标处,比如数据仓库。
常用的 ETL 工具优缺点
工具 | 开发者 | 优点 | 缺点 |
---|---|---|---|
Datax | 阿里巴巴(开源) | 支持所有常见数据源之间的数据传输; 配置简单,日志好看 |
单机运行; 社区相对没有那么活跃 |
Sqoop | Apache(开源) | 分布式,资源调度 on yarn | 仅支持 hadoop 集群数据的导入导出; 报错隐晦,排查麻烦 |
Kettle | 国外(开源) | 支持所有常见数据源之间的数据传输; 可视化拖动配置,操作简单 |
单机运行; 似乎比datax慢 |
1. Datax
Datax 概述
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。
文档说明: https://github.com/alibaba/DataX
优点: 1、几乎支持所有常见数据源之间的数据传输 2、配置简单,日志信息输出和报错明显,易于使用
缺点: 1、单机运行 2、社区相对没有那么活跃
异常: win环境日志中文乱码,shell终端执行 chcp 65001 hdfsreader 插件有 bug,读取空文件报错,需要改源码重新编译
安装
# 下载软件包
wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
# 解压
tar -zxvf datax.tar.gz
# 测试
cd /opt/datax/bin
python datax.py ../job/job.json
源码安装(解决 hdfs 插件 bug)
(1)、下载DataX源码:
git clone https://github.com/TurboWay/DataX.git
(2)、通过maven打包:
cd DataX
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
打包成功,日志显示如下:
[INFO] BUILD SUCCESS
[INFO] -----------------------------------------------------------------
[INFO] Total time: 08:12 min
[INFO] Finished at: 2015-12-13T16:26:48+08:00
[INFO] Final Memory: 133M/960M
[INFO] -----------------------------------------------------------------
打包成功后的DataX包位于 DataX/target/datax/datax/
常用命令和模板
# 可以配置传参, 查看用法
python datax.py --help
# 执行
python ~/datax/bin/datax.py -p "-Dhost=** -Duser=root -Dpwd=utopia2020" pg2mysql.json
调优
# 执行
python ~/datax/bin/datax.py --jvm="-Xms4G -Xmx4G" xxx.json
hdfs2pg
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/user/hive/warehouse/edw.db/f_gf_area_info/*",
"defaultFS": "hdfs://nameservices1",
"hadoopConfig":{
"dfs.nameservices": "nameservices1",
"dfs.ha.namenodes.nameservices1": "nn1,nn2",
"dfs.namenode.rpc-address.nameservices1.nn1": "172.16.122.21:8020",
"dfs.namenode.rpc-address.nameservices1.nn2": "172.16.122.24:8020",
"dfs.client.failover.proxy.provider.nameservices1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
},
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "string"
},
{
"index": 5,
"type": "string"
},
{
"index": 6,
"type": "string"
},
{
"index": 7,
"type": "string"
},
{
"index": 8,
"type": "string"
}
],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": "\u0001"
}
},
"writer": {
"name": "postgresqlwriter",
"parameter": {
"username": "spider",
"password": "spider",
"column": [
"area_code","area_name","area_code_lvl1","area_name_lvl1","area_code_lvl2","area_name_lvl2","area_code_lvl3","area_name_lvl3","data_dt"
],
"preSql": [
"truncate table public.f_gf_area_info;"
],
"postSql":[
"insert into public.area (area_code,area_name,area_code_lvl1,area_name_lvl1,area_code_lvl2,area_name_lvl2,area_code_lvl3,area_name_lvl3,data_dt,sign) select a.area_code,a.area_name,a.area_code_lvl1,a.area_name_lvl1,a.area_code_lvl2,a.area_name_lvl2,a.area_code_lvl3,a.area_name_lvl3,a.data_dt, case when coalesce(a.area_code_lvl3, '') > '' then 3 when coalesce(a.area_code_lvl2, '') > '' then 2 else 1 end as sign from public.f_gf_area_info a left join public.area b on a.area_code = b.area_code where b.area_code is null;"
],
"connection": [
{
"jdbcUrl": "jdbc:postgresql://172.16.122.19:5432/spider_db",
"table": [
"public.f_gf_area_info"
]
}
]
}
}
}
]
}
}
pg2pg
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
"username": "spider",
"password": "spider",
"connection": [
{
"querySql": [
"select * from public.area;"
],
"jdbcUrl": [
"jdbc:postgresql://172.16.122.19:5432/spider_db"
]
}
]
}
},
"writer": {
"name": "postgresqlwriter",
"parameter": {
"username": "gfecp_dev",
"password": "gfecp_dev",
"column": [ "*" ],
"preSql": [
"truncate table public.area;"
],
"connection": [
{
"jdbcUrl": "jdbc:postgresql://172.16.122.19:5432/gfecp_dev",
"table": [
"public.area"
]
}
]
}
}
}
]
}
}
pg2mysql
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
"username": "spider",
"password": "spider",
"connection": [
{
"querySql": [
"select * from public.area;"
],
"jdbcUrl": [
"jdbc:postgresql://172.16.122.19:5432/spider_db"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "${user}",
"password": "${pwd}",
"column": ["*"],
"preSql": [
"truncate table area;"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://${host}:3306/spider?useUnicode=true&characterEncoding=utf-8",
"table": [
"area"
]
}
]
}
}
}
]
}
}
2. Sqoop
Sqoop 概述
Sqoop 是一款开源的工具,主要用于在 Hadoop(Hive)与传统的数据库(mysql、postgresql...) 间进行数据的传递。独立的Apache项目,可以在CDH中快速部署
优点: 1、支持 (HDFS,HIVE,HBASE) <==> RDBMS 2、分布式, 资源调度 on yarn
缺点: 1、不支持 RDBMS 之间的传输,比如 mysql <> postgre 2、报错隐晦,排查困难
注意点: 1、Sqoop1 和 Sqoop2 不兼容,Sqoop2 不打算用于为生产部署,所以一般都用Sqoop1 2、RDBMS ==> (HIVE, HBASE) 时,需要sqoop机器上有对应的集群服务
import 导入 RDBMS ===> (HDFS,HIVE,HBASE)
# 详细传参
sqoop import --help
pg2hdfs
sqoop import --connect jdbc:postgresql://172.16.122.19:5432/spider_db \
--username spider \
--password spider \
--table area \
--outdir /tmp \
--target-dir "/user/area_test" \
--num-mappers 3 \
--fields-terminated-by "\t"
pg2hive
sqoop import --connect jdbc:postgresql://172.16.122.19:5432/spider_db \
--username spider \
--password spider \
--table area \
--outdir /tmp \
--num-mappers 3 \
--hive-import \
--fields-terminated-by "\t" \
--hive-overwrite \
--hive-database edw \
--hive-table area \
--null-string '\\N' \
--null-non-string '\\N'
pg2hbase
sqoop import --connect jdbc:postgresql://172.16.122.19:5432/spider_db \
--username spider \
--password spider \
--query 'select area_code,area_name,sign from area where sign=1 and $CONDITIONS;' \
--outdir /tmp \
--num-mappers 3 \
--columns "area_code,area_name,sign" \
--column-family "cf" \
--hbase-create-table \
--hbase-row-key "area_code" \
--hbase-table "area" \
--split-by area_code
export 导出 (HDFS,HIVE,HBASE) ===> RDBMS
# 详细传参
sqoop export --help
sqoop export --connect jdbc:postgresql://172.16.122.19:5432/spider_db \
--username spider \
--password spider \
--export-dir "/user/hive/warehouse/edw.db/f_gf_area_info" \
--table f_gf_area_info \
--outdir /tmp \
--input-fields-terminated-by '\001' \
--input-lines-terminated-by '\n'
3. Kettle
Kettle 概述
Kettle是一款国外开源的ETL工具,纯java编写,可以在Window、Linux、Unix上运行, 数据抽取高效稳定。
优点: 1、几乎支持所有常见数据源之间的数据传输 2、可视化拖动配置,操作简单
缺点: 1、单机运行 2、似乎比datax慢
安装使用
# 免安装压缩包 ,国内镜像下载
http://mirror.bit.edu.cn/pentaho/Data%20Integration/
# 启动入口
Spoon.bat:在 Windows 平台上运行 spoon
Spoon.sh:在 Linux、AppleOSX、Solaris 平台上运行Spoon
# 连接数据库需要对应的驱动,放到 data-integration\lib
# 教程笔记 ,基本上和 SSIS 一样
https://www.cnblogs.com/goingforward/p/6419154.html