导图社区 FlinkApi
Flink 的 datatream api 和 table api / sql api的一些简单介绍
编辑于2020-07-21 19:56:53Flink
DataStreamApi(streams,windows)
DataStream基本转换
单条记录操作
filter
map
多条记录操作
windows
Tumbling Window(翻滚窗口,没有重叠)
Sliding Window(滚动窗口,有重叠)
Session WIndow(会话窗口,有活动间隙)
多个流操作并转换为单个流
union
join
connect
合并对称操作
split
类型系统
基本类型
Java基本类型(包装类)、void、String、Date、BigDecimal、BigInteger
复合类型
Scala case Class(不支持null)、Row、POJO
辅助集合类型
Option、Either、list、map
其他
Table/SQL API
Table Api
获取table
1.注册对应的TableSource
Table descriptor
connect
withFormat
withSchema
field("",DataTypes)
registerTableSource
自定义 source
Data Stream
2.调用tableEnv的scan方法,获取Table对象
操作table
和sql对齐的操作
提升易用性的操作
addColumns
dropColumns
addOrReplaceColumns
renameColumns
withColumns
withoutColumns
增强table Api功能性的操作
1:1 ScalarFunction
1:n AggregateFunction
n:1 TableFunction
n:n TableAggregateFunction
Accumulator 存取状态
实现accumnlate 方法,处理输入的数据
实现emitValue方法,输出结果
输出table
Table descriptor
自定义 sink
Data Stream
SQL Api
语法介绍
create table
CREATE TABLE tableName (columnName dataType [, columnName dataType ]*) [ WITH (propertyName=propertyValue [, propertyName=propertyValue ]*) ];
参数配置:connector.type connector的类型 connector.xxx connector需要的配置信息 update-mode sink的类型(append, upsert) format.xxx 数据的格式信息(thrift, json, csv), 一般作用于不带有schema的系统(talos,kafka,es)
connector
talos
context
参数名 值 update-mode append connector.type talos
properties
参数名 默认值 可选值 说明 connector.topic xxx talos的topic名 connector.secret-key xxx xxx 认证所需要的secretKey connector.secret-key-id xxx xxx 认证需要的secretKeyId connector.startup-mod earliest-offset earliest-offset,latest-offset 消费的启始位置,默认从最早的offset消费 connector.properties.{galaxy.talos.xxx} xxx xxx talos的sdk需要的properties配置
example
create table TalosSource ( `timestamp` BIGINT, metricName VARCHAR, clusterName VARCHAR, topicName VARCHAR ) with ( 'connector.properties.galaxy.talos.service.endpoint'='http://staging-cnbj2-talos.api.xiaomi.net', 'connector.version'='universal', 'connector.secret-key'='xxxxxxx', 'connector.secret-key-id'='xxxxxxx', 'connector.topic'='TalosSource', 'connector.type'='talos', 'connector.startup-mode'='latest-offset', 'update-mode'='append', 'format.type'='thrift', 'format.thrift-class'='com.xiaomi.data.spec.log.micloud.LcsMetricTopicV2', 'format.thrift-package'='com.xiaomi.data:data-platform-spec-micloud:0.0.1-SNAPSHOT', 'format.derive-schema'='true' );
hdfs
context
参数名 值 update-mode append connector.type hdfs
properties
参数名 默认值 可选值 说明 connector.path xxx hdfs:// hdfs文件路径 connector.bucket-assigner datetime-assigner datatime-assigner 默认按时间来进行切分 connector.datetime-assigner-format ‘year’=yyyy/‘month’=MM/‘day’=dd/ 目录切分规则 connector.datetime-assigner-type proctime proctime/eventtime 默认使用proctime时间来作为划分目录 connector.format-type bulk row/bulk 默认使用bulk方式写 connector.bulk-type sequence sequence/parquet 默认使用sequence来写 connector.sequence-compression-type BLOCK 默认sequence的compression type为BLOCK connector.sequence-codec-name default 默认的codecname为 default connector.rolling-policy default default/oncheckpoint 默认文件切分的规则为default connector.default-rolling-size 1024L * 1024L * 1024L * 1024L 默认的切分的文件大小为1T connector.default-rolling-interval 24L * 60L * 60L * 1000L 默认的文件切分时间为24小时 connector.default-rolling-inactive-interval 6L * 60L * 60L * 1000L 默认文件未写入超时时间为6小时
example
create table HdfsTable( topicName VARCHAR, serviceName VARCHAR ) with ( 'connector.path'='hdfs//tjwqstaging-hdd/user/s_lcs/flink/flink-test', 'connector.type'='hdfs', 'update-mode'='append', 'format.type'='json', 'format.derive-schema'='true' )