导图社区 Kafka
kafka框架的学习笔记整理,其中包括了框架搭建步骤,常用命令、API的使用及框架原理内容总结。
编辑于2021-01-15 14:01:23Kafka
概述
消息队列MQ
两种处理
同步
消息及时,反馈速度慢
异步
反馈速度快,消息不及时
两种模式
点对点模式
一对一,消费者主动拉取数据,消息收到后消息清除
发布/订阅模式
一对多,消费者消费数据之后不会清除消息
定义
Kafka是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。也会和flume结合,用于离线缓冲
基础架构
笔记
为什么要分区:1 分区后上传HDFS建立分布式 2 提高吞吐量
一个分区只能被一个消费者组中的一个消费者所消费
Zookeeper用于监控broker是否正常
多个分区分区时,在每个分区存储的数据顺序是无序的,consumer取出的数据也是无序的
组成
Producer
消息生产者
Broker
通常一个节点有一个broker,一个broker可以容纳多个topic
Topic队列
是逻辑上的概念,一般一个业务一个主题
Partition
一个topic可以分为多个partition,每个partition对应于一个log文件,存储producer生产的数据
Replica
副本,保障kafka在故障过后也能继续工作,默认副本数为 1
leader
生产者发送数据的对象,以及消费者消费数据的对象
follower
实时从leader中同步数据,保持和leader数据的同步,上位leader
Consumer
消息消费者
Consumer Group
消费者组,由多个consumer组成,消费者组内每个消费者负责消费不同分区的数据,并行消费。
入门
安装部署
1 安装、改名
2 修改配置文件
3 为各个节点分发安装包
4 编写群起脚本
命令行操作
笔记
nohup .. & (后台启动)
不能修改对应副本数
为什么不能减少分区数
减小分区数会导致数据迁移,导致原本时间写入有序的标记混乱,进而导致fink产生误差,也会影响生产者写入数据
consumer拉取数据默认拉取consumer启动后的数据,用beginning拉取以前的无序的(offset)
加--group 组名 后会记录拉取数据的位置,不会重复拉取数据,不写的话每次启动都是一个新消费者组
logs中的consumer_offset在第一次消费数据后出现--用来记录上次消费的位置
命令
kafka-topics.sh
创建topics
--create
kafka-topics.sh --create --topic 名字 --bootstrap-server hadoop104:9092,备份节点地址 [--partitions 分区数] [--replication-factor 副本数]
删除topic
--delete
kafka-topics.sh --delete --topic 名字 --bootstrap-server hadoop104:9092,备份节点地址
查看详细信息
--describe
kafka-topics.sh --describe --topic 名字 --bootstrap-server hadoop104:9092,备份节点地址
修改topic分区数量
--alter
kafka-topics.sh --alter --topic 名字 --bootstrap-server hadoop104:9092,备份节点地址 --partitions 只能增大不能减小的数量
列出所有topics
--list
kafka-topics.sh --list --bootstrap-server hadoop104:9092:hadoop105:9092
现在不用zookeeper的2181,而是用bootstrap的9092,究其原因是因为前者是老版本的用法,0.8以前的kafka,消费的进度(offset)是写在zk中的,所以consumer需要知道zk的地址。这个方案有性能问题,0.9 的时候整体大改了一次,brokers 接管了消费进度,consumer 不再需要和 zookeeper 通信了,所以就用bootstrap-server了。 新版的 Kafka 使用一个选举出来的 controller 来监听 zookeeper,其他 node 再去和 controller 通信,这么做的目的是为了减少 zookeeper 的压力。bootstrap-servers 会自动发现其他 broker,这也是 bootstrap 的含义
-bootstrap需要告诉是哪个集群
往topic写数据
kafka-console-producer.sh --topic 名字 --broker-list hadoop104:9092,备份节点地址(告知哪个集群)
往topic取数据
kafka-console-consumer.sh --topic 名字 ----bootstrap-server hadoop104:9092,备份节点地址
从每一个分区最后一个offset+1开始拉
查看group组消费记录
kafka-consumer-group.sh --all-gourp --all-topics --describe --bootstrap-server hadoop104:9092
LAG-还剩几条没消费
查看分区log数据
kafka-dump-log.sh --files 数据文件路径 --print-data-log
Kafka API
Producer API
消息发送流程
发送条件
满批次大小 batch.size
满规定时间 linger.ms
流程
1 将数据封装成ProducerRecords对象
2 拦截器筛选数据
3 序列化
4 确定数据发送到那个分区
在mian方法中执行
5 发送到RecordAccumulator共享变量中(相当于一个Map集合)
Map<topic-分区号,List<ProduceRecord....>>
依照各个分区储存
6 满一个批次发送给sender线程,由其发送给kafka
异步发送API
package com.atguigu.producer; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; public class ProducerDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { //1、创建生产者客户端 Properties props = new Properties(); //指定key的序列化器 props.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); //指定value的序列化器 props.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); //设置ack props.setProperty("acks","1"); //设置批次大小 props.setProperty("batch.size","1024"); //指定kafka集群 props.setProperty("bootstrap.servers","hadoop102:9092,hadoop103:9092"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); //2、封装数据 for(int i=401;i<=500;i++){ final int x = i; //3、发送 ProducerRecord<String, String> record = new ProducerRecord<String, String>("first1", "send " + i + " message"); //异步发送 //producer.send(record); System.out.println("开始发送第"+i+"条消息"); /* producer.send(record, new Callback() { //kafka返回确认消息的时候调用 public void onCompletion(RecordMetadata metadata, Exception exception) { System.out.println("kafka已经收到第"+x+"条消息"); long offset = metadata.offset(); long partition = metadata.partition(); String topic = metadata.topic(); System.out.println("topic="+topic+",partition="+partition+",offset="+offset); } });*/ //同步发送 producer.send(record, new Callback() { //kafka返回确认消息的时候调用 public void onCompletion(RecordMetadata metadata, Exception exception) { System.out.println("kafka已经收到第"+x+"条消息"); long offset = metadata.offset(); long partition = metadata.partition(); String topic = metadata.topic(); System.out.println("topic="+topic+",partition="+partition+",offset="+offset); } }) //阻塞 .get(); System.out.println("第"+i+"条消息已经发送完成"); } //4、关闭客户端 producer.close(); } }
同步发送API
只需在调用Future对象的get方发即可
Kafka储存数据的是ConsumerRecords<K,V>值
Consumer API
自动提交offset
package com.atguigu.consumer; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.time.Duration; import java.util.*; public class ConsumerDemo { public static void main(String[] args) { //1、创建消费者客户端 Properties props = new Properties(); //设置key的反序列化器 props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); //设置key的反序列化器 props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); //设置kafka集群 props.setProperty("bootstrap.servers","hadoop102:9092"); //设置消费者组的名称 props.setProperty("group.id","g6"); //设置消费者组第一次消费topic的时候应该从哪里开始消费 props.setProperty("auto.offset.reset","earliest"); //是否自动提交offset props.setProperty("enable.auto.commit","false"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); //2、消费数据 while (true){ //设置消费的topic名称 List<String> topics = new ArrayList<String>(); topics.add("first1"); consumer.subscribe(topics); //开始拉取数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); Iterator<ConsumerRecord<String, String>> it = records.iterator(); while (it.hasNext()){ ConsumerRecord<String, String> record = it.next(); String topic = record.topic(); int partition = record.partition(); String message = record.value(); long offset = record.offset(); System.out.println("消费的:topic="+topic+",partition="+partition+",offset="+offset+",message="+message); //手动提交offset consumer.commitAsync(new OffsetCommitCallback() { //提交完成之后调用 @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { for(TopicPartition topicPartition: offsets.keySet()){ OffsetAndMetadata offsetAndMetadata = offsets.get(topicPartition); int partition1 = topicPartition.partition(); long offset1 = offsetAndMetadata.offset(); String topic1 = topicPartition.topic(); System.out.println("提交的:topic="+topic1+",partition="+partition1+",offset="+offset1); } } }); } } } }
手动提交offset
同步提交offset
异步提交offset
数据漏消费和重复消费分析
手动提交Offset可以控制提交时间,达到重头再消费一边的目的
Kafka Eagle(监控)
安装部署
页面查看
查看broker、topic信息
1 生产者产生数据的速度
2 消费者消费数据的速度
2 < 1 造成 1 topic消息积压 2 信息处理不及时
解决方法
增大分区数
batch.size,增大消费者拉取数据速率
架构深入
Kafka工作流程
offset
1 消费者实时记录自己消费到了哪个offset
2 每一条数据的唯一标识
1 producer往broker的topic写数据
2 topic的不同分区会在不同的broker上备份
3 consumer开始拉数据消费
文件存储机制
topic
逻辑上的概念
Partition(物理上真实存在,以目录形式存在)
Segment切分文件夹(多个同样大小) 该文件夹的命名规则为:topic名称+分区序号 好处:可以快速定位offset处于哪个segment
由log.segment.bytes 决定,控制每个segment的大小 默认是超过7天,或者是1GB大小就分文件
为什么要切分:避免数据的膨大导致的查找缓慢
index -- log文件中数据的索引
log中的offset对应的文件字节大小每达到4kb建立一个索引 对应这个offset之前所有字节数的总和(从这里开始查找这个offset)
offset | 字节数
log -- 数据存储文件
offset | 数据
以当前segment的第一条消息的offset命名
timestampIndex -- log文件中数据的时间索引
查询某个时间段的文件
作用: 防止log文件过大导致数据定位效率低下
每个分区 第一个segment文件名 都是00000000000000000000 后续 第二个segment文件名 = 第一个segment最后一个offset+1 后续 第N个segment文件名 = 第(N-1)个segment最后一个offset+1
查询机制的步骤
1 先通过Segment的分区号确定offset的大概位置(二分查找法), 2 再通过index二次确认位置, 3 在log中从所记录的offset物理地址扫到下一个物理地址找少量数据
Kafka生产者
分区策略
为什么要分区的原因
原则
将producer发送的数据封装成一个ProducerRecord对象
1、直接指定分区号,发到指定分区
2、如果有key,数据发到 key.hashCode % 分数区
3、没有分区号,也没有key
新版本 [0,1,2]
1、第一次发送数据的时候,会生成一个随机数,数据发到 随机数%分区数 = 分区号 [ 5 % 3 = 2]
2、第二次发送数据的时候,会排除掉上一次发送的分区号,从剩余的分区中随机选择一个发送 1
3、第N次发送数据的时候,会排除掉(N-1)次发送的分区号,从剩余的分区中随机选择一个发送 2
会产生数据倾斜
旧版本: 采用轮询
1、第一次发送数据的时候,会生成一个随机数,数据发到 随机数%分区数 = 分区号
2、第二次发送的时候,会将 (第一次生成的随机数+1) % 分区数
3、第N次发送数据的时候,会将 (第一次生成的随机数+(N-1)) % 分区数
企业多用
数据可靠性保证
生成发送的消息是否真实到达Kafka
笔记
什么是ack
每个partition在收到数据后,都要向producer发送ack,否则重新发送数据
因为要选leader所以要全部同步,才发生ack
producer没有收到partition的回应后,会再重新发送一次
zookeeper: 负责大于breker上下线及leader选举
副本同步策略
全部follower完成同步,才发送ack
1 冗余度低
2 Kafka对网络延迟敏感性差
为什么要全部同步的原因
ISR
定义
与leader同步[当前副本LEO>=当前分区的HW]到了一定程度的follower
作用
避免Follower在同步数据时时间过长问题
Leader发生故障之后,从ISR中选举新的leader
ack应答机制
0
broker一接收到还没有写入磁盘就已经返回
丢失数据
1
partition的leader落盘成功后返回ack
丢失数据
-1
partition的leader和follower全部落盘成功后才返回ack
在follower成功同步后,旧leader挂了,此刻producer没有收到回应,会向新leader重新发送数据,造成数据重复
数据重复
partition故障处理
follower故障
1 follower发生故障后会被临时踢出ISR
2 待该follower恢复后,follower会读取本地磁盘记录的上次的HW
3 将log文件高于HW的部分截取掉
为什么要把HW之后的数据清掉? 答:因为HW之前的数据全部partition备份的节点都有,是已经向producer发送OK的数据,是没问题的,而HW之后的数据就不确定了
4 从HW开始向leader进行同步。等该follower的LEO大于等于该集群的HW
5 重新加入ISR
leader故障
负责数据的读写
1 从ISR中选出一个新的leader
2 其余的follower会先将各自的log文件高于HW的部分截掉
为了保证foolower之间数据的一致性 leader不能同步删HW之后的数据,因为不能一边删一边读
3 从新的leader同步数据
只能保证数据的一致性,会导致数据重复
LEO
当前副本最大的offset
HW
副本间最小的LEO
HW之前的数据才对Consumer可见
Exactly Once
如何开启?
将Producer的参数中enable.idompotence设置为true
三种容错语句
at-lest-once
数据最少一条【数据重复】
at-most-once
数据最多一条【数据丢失】
exactly-once
数据有且仅有一条
作用: 防止数据重复
开启幂等性 + ack=-1 才能保证数据有且仅有一次
前提是生产者不挂,一旦挂了,分区随机,生产者唯一标识也迭代更新了
broker会将主键缓存起来,并且每次往里存数据都会调用对比
主键: producerid + partitionid +sequenceNumber
producerid: 生产者唯一标识,是生产者在启动的时候生成的
partitionid: 分区号
sequenceNumber: 写到分区的第几条数据
Kafka消费者
消费方式
采用pull(拉)模式
为什么不采用push(推)模式? 答:因为不同的消费者有着不同的消费速率,如果采用由broker决定速率的Push模式,则会导致消费者来不及处理消息(拒绝服务及网络堵塞)。 pull有什么不足之处吗? 答: 有,如果kafka没有数据,消费者会一直返回空数据,可以通过添加timeout时长解决
分区分配策略
判断消费者究竟消费哪个分区的数据
roundrobin
轮询: 每个分区轮流发给各个消费者
range
1 确定每个消费者大概消费几个分区的数据: 分区数/消费者个数
2 确定前多少个消费者多消费一个分区的数据: 分区数%消费者个数
offset的维护
记录消费位置的topic为__consumer_offsets
Kafka高效读写数据
1 Producer生成数据写入到log的数据是顺序写
减少磁头寻址时间
2 broker中开辟了pagecache内存快存
1 由producer直接写到pagecache内存,到一定大小再写到硬盘,减少了producer与磁盘交互的次数
2 每个批次数据在pagecache中对磁盘地址进行排序,减少磁头移动的时间
3 不在JVM中,不增加垃圾回收机制的负担
4 有副本机制同步,不必担心宕机丢失
5 生产和消费速率相当时,可直接由pagecache写到消费者
3 零复制技术
一般流程
1 文件通过IO流读到内核区
2 如果要做操作,需要从内核区拷到用户区
3 做完操作通过IO流写到socket缓冲区
kafka不要2 3
4 再写出到网卡
不用写到用户区中改写
不用写到sokect缓冲区
Zookeeper在Kafka中的作用
Controller
节点先到先得注册
作用
1 负责管理集群broker的上下线
监听
2 分区副本分配
3 leader选举
Partition-Leader的选举过程
过程
1 broker在启动的时候会选举出一个Controller,同时会在[brokers/ids/]下注册
2 Controller监听获取broker挂掉信息
3 Controller从对应的聚合信息获取ISR
4 随机选择Leader,但要考虑负载均衡
5 更新聚合信息
负载均衡:优先考虑leader少的broker中的partition
在对应的broker节点下面,会聚合全部Leader/Follower的状态 包括topic名字、partition区号、Leader所在节点、isr的信息
Kafka事务
Producer事务
作用: 避免重复的流程
过程
1 在生产者端代码中写死transactionID
2 将Producer获得的PID和Transaction ID绑定
3 将这个映射及事务状态保存到kafka内部topic中:__transaction_state
4 producer重启后不会直接创建新ID,而是从kafka内部topic中获得事务状态及旧ID
如果事务状态是uncommit,那么事务会回滚
效率低,一般不开启:会影响效率,在下游spark处理
Consumer事务
Kafka对消费者事务无法控制