导图社区 DataX开发使用案例讲解
当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join tab...
编辑于2022-11-14 13:57:50 广东DataX开发使用案例讲解
DataX概述
定义
异构数据源离线同步工具
致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能
设计思想
网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源
图示
框架设计
架构说明
Reader
数据采集模块
Framework
数据传输通道,负责处理缓冲,流控,并发,数据转换
Writer
数据写入模块
图示
运行原理
架构说明
Job
单个作业的管理节点,负责数据清理,子任务划分,TaskGroup监控管理
Task
由Job切分而来,DataX的最小作业单元
Schedule
将Task组成TaskGroup,单个TaskGroup并发数为5
TaskGroup
启动Task
图示
DataX使用案例
从stream流读取数据并打印到控制台
查看配置模板
DataX (DATAX-OPENSOURCE-3.0), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. Please refer to the streamreader document: https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md Please refer to the streamwriter document: https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md Please save the following configuration as a json file and use python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json to run the job. { "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "column": [], "sliceRecordCount": "" } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "", "print": true } } } ], "setting": { "speed": { "channel": "" } } } }
python datax.py -r streamreader -w streamwriter
根据模板编写配置文件
vim /opt/module/datax/job/stream2stream.json
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
运行
/opt/module/datax/bin/datax.py /opt/module/datax/job/stream2stream.json
MySQL
读取MySQL中的数据存放到HDFS
查看配置模板
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": [], "connection": [ { "jdbcUrl": [], "table": [] } ], "password": "", "username": "", "where": "" } }, "writer": { "name": "hdfswriter", "parameter": { "column": [], "compress": "", "defaultFS": "", "fieldDelimiter": "", "fileName": "", "fileType": "", "path": "", "writeMode": "" } } } ], "setting": { "speed": { "channel": "" } } } }
python /opt/module/datax/bin/datax.py -r mysqlreader -w
MySQL建表
create database datax;
use datax;
create table student(id int,name varchar(20));
insert into student values(1001,'zhangsan'),(1002,'lisi'),(1003,'wangwu');
编写配置文件
vim /opt/module/datax/job/mysql2hdfs.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://hadoop102:3306/datax"
],
"table": [
"student"
]
}
],
"username": "root",
"password": "000000"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
],
"defaultFS": "hdfs://hadoop102:9000",
"fieldDelimiter": "\t",
"fileName": "student.txt",
"fileType": "text",
"path": "/",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
执行任务
bin/datax.py job/mysql2hdfs.json
读取HDFS数据写入MySQL
改名上个上传文件
hadoop fs -mv /student.txt* /student.txt
查看配置模板
{ "job": { "content": [ { "reader": { "name": "hdfsreader", "parameter": { "column": [], "defaultFS": "", "encoding": "UTF-8", "fieldDelimiter": ",", "fileType": "orc", "path": "" } }, "writer": { "name": "mysqlwriter", "parameter": { "column": [], "connection": [ { "jdbcUrl": "", "table": [] } ], "password": "", "preSql": [], "session": [], "username": "", "writeMode": "" } } } ], "setting": { "speed": { "channel": "" } } } }
python bin/datax.py -r hdfsreader -w mysqlwriter
创建配置文件
vim job/hdfs2mysql.json
{
"job": {
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"column": ["*"],
"defaultFS": "hdfs://hadoop102:9000",
"encoding": "UTF-8",
"fieldDelimiter": "\t",
"fileType": "text",
"path": "/student.txt"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"id",
"name"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://hadoop102:3306/datax",
"table": ["student2"]
}
],
"password": "000000",
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
MySQL建表
use datax;
create table student2(id int,name varchar(20));
执行任务
bin/datax.py job/hdfs2mysql.json
Oracle
从Oracle中读取数据存到MySQL
MySQL建表
create database oracle;
use oracle;
create table student(id int,name varchar(20));
编写配置文件
vim /opt/module/datax/job/oracle2mysql.json
{
"job": {
"content": [
{
"reader": {
"name": "oraclereader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": ["jdbc:oracle:thin:@hadoop102:1521:orcl"],
"table": ["student"]
}
],
"password": "000000",
"username": "atguigu"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://hadoop102:3306/oracle",
"table": ["student"]
}
],
"password": "000000",
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
执行命令
/opt/module/datax/bin/datax.py /opt/module/datax/job/oracle2mysql.json
读取Oracle的数据存入HDFS中
编写配置文件
vim job/oracle2hdfs.json
{
"job": {
"content": [
{
"reader": {
"name": "oraclereader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": ["jdbc:oracle:thin:@hadoop102:1521:orcl"],
"table": ["student"]
}
],
"password": "000000",
"username": "atguigu"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
],
"defaultFS": "hdfs://hadoop102:9000",
"fieldDelimiter": "\t",
"fileName": "oracle.txt",
"fileType": "text",
"path": "/",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
执行
bin/datax.py job/oracle2hdfs.json
MongoDB
读取MongoDB的数据导入到HDFS
编写配置文件
vim job/mongdb2hdfs.json
{
"job": {
"content": [
{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": ["127.0.0.1:27017"],
"collectionName": "atguigu",
"column": [
{
"name":"name",
"type":"string"
},
{
"name":"url",
"type":"string"
}
],
"dbName": "test",
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name":"name",
"type":"string"
},
{
"name":"url",
"type":"string"
}
],
"defaultFS": "hdfs://hadoop102:9000",
"fieldDelimiter": "\t",
"fileName": "mongo.txt",
"fileType": "text",
"path": "/",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
执行
bin/datax.py job/mongdb2hdfs.json
读取MongoDB的数据导入MySQL
MySQL建表
create table atguigu(name varchar(20),url varchar(20));
编写配置文件
vim job/mongodb2mysql.json
{
"job": {
"content": [
{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": ["127.0.0.1:27017"],
"collectionName": "atguigu",
"column": [
{
"name":"name",
"type":"string"
},
{
"name":"url",
"type":"string"
}
],
"dbName": "test",
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://hadoop102:3306/test",
"table": ["atguigu"]
}
],
"password": "000000",
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
执行
bin/datax.py job/mongodb2mysql.json