导图社区 ETL执行流程2
ETL执行流程2,执行ETL任务: 1.准备列映射 2.构建获取源数据sql脚本 3.构建是否有重复数据语句 4.创建临时表 5.取源表数据 6.源表数据插入临时表 7.清除目标表 8.临时表Merge到目标表 9.设置下一次执行起始点 10.其他后续动作
社区模板帮助中心,点此进入>>
项目时间管理6大步骤
项目管理的五个步骤
电商部人员工作结构
电费水费思维导图
D服务费结算
组织架构-单商户商城webAPP 思维导图。
暮尚正常运转导图
批判性思维导图
产品经理如何做好项目管理
车队管理
ETL执行流程
1.准备映射数据
获取当前最大批次号(加锁)
加载映射
根据执行类型加载所有映射
1.单个任务
2.多个任务(批量触发)
3.按包执行
4.按业务域执行(待开发)
获取任务列表
加载任务详情
初始化数据库日志类(每条任务一个)
准备存储过程参数
判断是否命中本趟执行
根据任务类型加载指定类型Python-Merge脚本
从元数据库加载Python-Merge脚本
加载增量开始时间(StartDate|StartID)
1.时间类型:StartDate前移SleepingMinute分钟
2.数值类型:StartID前移SleepingMinute
2.执行ETL任务
1.准备列映射
目标表所有列
源表所有列
非存储过程:读取元数据表获取所有列
存储过程:正则表达式解析存储过程DDL获取插入Staging表的列
Staging表所有列
1.打印源表在目标表中不存在的列
2.打印目标表在源表不存在的列
3.取源表和目标表公共字段作为临时表的列
2.构建获取源数据sql脚本
源为表或者视图
自定义SQL:直接返回CustomSql即可
非自定义SQL
源和目标是否为同源
是:直接构建insert into staging_table select xxx from source_table
否:构造获取源数据的去重sql
1.SourceTable按照MergeKeys先做一次内部去重
2.构建Select sourceColumns from sourceTableName语句
构建Where条件
1.上次执行成功时间前移5分钟
2.SQL语句Sql语句约束参数类型
时间字段
循环任务:直接拼接
单个任务:添加"增量种子"字段拼接
数值型字段
不使用约束字段(直接查询视图或者存储过程即可)
源和目标是同源 ->
追加一个SELECT COUNT(1) FROM StagingTable语句
源为脚本
1.准备脚本参数
准备@StartDate和@EndDate参数
2.判断数据库种类
数仓类型
1.从python元数据中获取DDL脚本
2.追加一个SELECT COUNT(1) FROM StagingTable语句
传统关系型数据类型
1.根据源表名获取存储过程脚本
2.附加存储过程参数
3.构建是否有重复数据语句
是否为视图->是否为数仓型数据库?
1.根据MergeKeys构建 GROUP BY HAVING语句
2.添加Where条件语句
4.创建临时表
创建临时表所在库
存在跳过,不存在创建
清空/删除之前的临时表
目标表是tasklog表,直接truncate掉
目标表非tasklog表,直接drop掉
刷新临时表元数据
创建临时表
获取目标表Mergekeys字段
对字段进行排序(某些数仓对字段排序有要求)
创建临时表(重试三次)
支持like语法:直接使用like语法创建
不支持like语法:根据准备好的创表语句创建
5.取源表数据
从Python脚本中获取
加载形参&实参
执行带参数Python脚本
获取成功标识
获取返回值
从数据库命令中获取
执行预取命令
执行取数命令
执行计算行数命令
将数据存放到内存表变量中
执行发生异常??
Order by常量异常? 替换为变量或者mergekeys重新执行一次
Partition by常量异常? 替换为变量或者mergekeys重新执行一次
MySqlException异常?
是否当前时间-ETL开始时间<5分??
等1分钟,重跑
6.源表数据插入临时表
内存表变量是否有值?? 当前源和目标为同源 或 当前为Python方式
是:
修正源表数据
如果是存储过程,重新获取临时表和源表列(之前获取源表未获取)
剔除sourceTable中多余的列,否则bulkcopy会提示列不匹配
写入临时表(buckcopy方式)
配置buckcopy参数(每批次行数、目标表、并行任务数、回调函数)
初始化
构建insert-sql批量执行脚本
构建update-sql批量执行脚本
执行insert-sql批量执行脚本
开启多线程
取批量insert脚本
执行完一个批次执行回调函数
执行update-sql批量执行脚本
返回自增主键ID
否:什么都不做
判断是否有重复数据
是
创建找出重复主键sql
执行创建找出重复主键sql
将重复数据合并到staging表
执行将重复数据合并到staging表
否
打印"源表数据无重复"
7.清除目标表
ETL执行是否成功?
总是清除
源表有数据清除
判断源表是否有数据??
有: 清除
无: 不清除
永不清除(默认)
8.临时表Merge到目标表
根据数仓表类型读取Python-Merge脚本
获取Python-Merge脚本形参 & 实参
添加Python打印日志回调函数
执行Python-Merge脚本
根据返回值判断是否成功
9.设置下一次执行起始点
表/视图类型任务
是否为自定义SQL或不用约束字段类型任务
是: 直接跳过
否: 继续执行
生成查询临时表约束条件最大值SQL
执行SQL获取临时表约束条件最大值
更新ETL任务约束条件字段
约束条件参数类型
时间类型:
循环任务:直接更新
单次任务:添加增量种子天数后,更新
数值类型:
单次任务:添加增量种子数值后,更新
二进制类型:
脚本类型任务
获取脚本参数
拼接获取临时表约束条件参数SQL
执行临时表约束条件参数SQL
更新脚本参数到数据库
10.其他后续动作
是否为历史包任务??
是:将ETL任务状态更新为禁用状态
3.更新任务状态
循环任务->未执行(0)
非循环任务->成功(1)