baby sword‘s blog baby sword‘s blog
首页
  • java基础
  • java进阶
大数据
  • mysql

    • mysql索引
    • mysql日志
  • redis

    • 单机下的redis
    • 集群下的redis
  • Spring
  • springboot
  • RPC
  • netty
  • mybatis
  • maven
  • 消息队列
  • kafka
  • zookeeper
  • rocketmq
  • 七大设计原则
  • 创建型模式
  • 结构型模式
  • 行为型模式
  • SpringCloud

    • eureka
  • SpringCloud Alibaba

    • nacos
  • 计算机网络
  • 操作系统
  • 算法
  • 个人项目
  • 个人面试面经
  • 八股记忆
  • 工作积累
  • 逻辑题
  • 面试

    • 百度后端实习二面
GitHub (opens new window)

zhengjian

不敢承担失去的风险,是不可能抓住梦想的
首页
  • java基础
  • java进阶
大数据
  • mysql

    • mysql索引
    • mysql日志
  • redis

    • 单机下的redis
    • 集群下的redis
  • Spring
  • springboot
  • RPC
  • netty
  • mybatis
  • maven
  • 消息队列
  • kafka
  • zookeeper
  • rocketmq
  • 七大设计原则
  • 创建型模式
  • 结构型模式
  • 行为型模式
  • SpringCloud

    • eureka
  • SpringCloud Alibaba

    • nacos
  • 计算机网络
  • 操作系统
  • 算法
  • 个人项目
  • 个人面试面经
  • 八股记忆
  • 工作积累
  • 逻辑题
  • 面试

    • 百度后端实习二面
GitHub (opens new window)
  • 消息队列

  • kafka

    • 如何安装kafka
    • kafka基础
      • kafka的三高设计
        • 高可用
        • ①控制器的选举
        • ②什么是leader选举
        • ③ack机制
    • 常见命令行指令
    • kafka可视化界面
    • 使用kafka时遇到的一些问题
    • ProducerConfig
    • kafka之生产者
    • kafka消息丢失
    • kafka之broker
    • kafka之消费者
    • 消费者源码
    • 消费者的多线程方式
    • kafka之缓冲区
    • kafka源码
    • kafka个人技术分享
  • zookeeper

  • rocketMq

  • 中间件
  • kafka
xugaoyi
2023-06-09
目录

kafka基础

Kafka 本质上是一个 MQ(Message Queue),使用消息队列的好处?

  1. 解耦:允许我们独立修改队列两边的处理过程而互不影响。
  2. 冗余:有些情况下,我们在处理数据的过程会失败造成数据丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险, 确保你的数据被安全的保存直到你使用完毕
  3. 峰值处理能力:不会因为突发的流量请求导致系统崩溃,消息队列能够使服务顶住突发的访问压力, 有助于解决生产消息和消费消息的处理速度不一致的情况
  4. 异步通信:消息队列允许用户把消息放入队列但不立即处理它, 等待后续进行消费处理。

术语:

  1. Producer:即消息生产者,向 Kafka Broker 发消息的客户端。
  2. Consumer:即消息消费者,从 Kafka Broker 读消息的客户端。
  3. Consumer Group:即消费者组,消费者组内每个消费者负责消费不同分区的数据,以提高消费能力。一个分区只能由组内一个消费者消费,不同消费者组之间互不影响。
  4. Broker:一台 Kafka 机器就是一个 Broker。一个集群是由多个 Broker 组成的且一个 Broker 可以容纳多个 Topic。
  5. Topic:可以简单理解为队列,Topic 将消息分类,生产者和消费者面向的都是同一个 Topic。
  6. Partition:为了实现Topic扩展性,提高并发能力,一个非常大的 Topic 可以分布到多个 Broker 上,一个 Topic 可以分为多个 Partition 进行存储,每个 Partition 是一个有序的队列。
  7. Replica:即副本,为实现数据备份的功能,保证集群中的某个节点发生故障时,该节点上的 Partition 数据不丢失,且 Kafka 仍然能够继续工作,为此Kafka提供了副本机制,一个 Topic 的每个 Partition 都有若干个副本,一个 Leader 副本和若干个 Follower 副本。
  8. Leader:即每个分区多个副本的主副本,生产者发送数据的对象,以及消费者消费数据的对象,都是 Leader。
  9. Follower:即每个分区多个副本的从副本,会实时从 Leader 副本中同步数据,并保持和 Leader 数据的同步。Leader 发生故障时,某个 Follower 还会被选举并成为新的 Leader , 且不能跟 Leader 在同一个broker上, 防止崩溃数据可恢复。
  10. Offset:消费者消费的位置信息,监控数据消费到什么位置,当消费者挂掉再重新恢复的时候,可以从消费位置继续消费。
  11. ZooKeeper服务:Kafka 集群能够正常工作,需要依赖于 ZooKeeper,ZooKeeper 帮助 Kafka 存储和管理集群元数据信息。在最新版本中, 已经慢慢要脱离 ZooKeeper。

每个topic的partition数量不同,每个partition的副本数量也不同都是可以在创建的时候指定的。

快速进行安装:

https://colobu.com/2019/09/27/install-Kafka-on-Mac/

java中使用kafka

# kafka的三高设计

# 高可用

kafka中的选举:控制器选举、leader选举、消费者的选举

# ①控制器的选举

什么是控制器

在Kafka集群中,控制器是一个特殊的Broker节点,它负责监管整个集群的状态,并执行一些重要的管理任务。

控制器的主要功能包括:

  1. 监控Broker是否在线。
  2. 监控Partition的分配和副本备份情况。
  3. 处理新的Topic和Partition的创建请求。
  4. 处理Broker故障和副本迁移。
  5. 管理ISR(in-sync replicas)列表。

控制器是Kafka集群的核心组件之一,它的稳定性对整个集群的可靠性和高可用性都非常重要。

那控制器的选举是怎么进行的呢?

  1. 当第一个控制器启动时,会在zookeeper的controller路径下创建相应的临时节点,并写入相应的注册信息,该节点便是控制器。
  2. 当其他节点也进行启动的时候,也会尝试在zk上建立临时节点,但是因为/controller目录上已经存在了节点,所以创建节点会失败,此时就认为最开始的节点就是一个唯一一个控制器
  3. 其他的节点,也会在控制器上注册相应的监听器,各个监听器负责监听各自代理节点的状态变化。当监听到节点状态发生变化时,会触发相应的监听函数进行处理

# ②什么是leader选举

首先在topic中,一个partition是有多个副本的,那么如果记录一个leader有多少个副本呢?kafka采用了ISR机制。

什么是ISR:指的是那些已经同步到最新Leader副本状态的所有副本集合。每个领导者都有一个ISR,它由一组与领导者保持同步的副本组成。在分区的ISR中,如果某个副本因为某种原因无法和领导者进行通信,则该副本会被视为“落后”,并且从ISR中移除。当ISR缩小时,Kafka会触发重新平衡操作,以确保数据可用性和负载均衡。

总的来说,分区的ISR是指当前保持与领导者同步的副本集合,它反映了当前可靠地接收消息的副本集合。

必须同时满足两个条件才认为是可靠的:必须可靠的向zookeeper发送心跳;在规定时间内从Leader副本 低延迟 地获取过消息。

kafka中的默认配置:

Kafka 判断 Follower 是否与 Leader 同步的条件就是 Broker 端参数 replica.lag.time.max.ms 参数值。这个参数的含义就是 Follower 副本能够落后 Leader 副本的最长时间间隔, 当前默认值为10秒, 也就是说, 只要一个Follower 副本落后 Leader 副本的时间不连续超过10秒, Kafka 就认为两者是同步的, 即使 Follower 副本中保持的消息要少于 Leader 副本中的消息。

ISR的维护

1) Controller来维护:Kafka 集群中的其中一个 Broker 会被选举为Controller,主要负责 Partition 管理和副本状态管理,也会执行重分配 Partition 之类的管理任务。在符合某些特定条件下,Controller 下的 LeaderSelector 会选举新的 Leader,ISR 和新的 leader_epoch 及controller_epoch 写入 Zookeeper 的相关节点中。同时发起 leaderAndIsrRequest 通知所有的 Replicas。

2) Leader来维护:Leader 有单独的线程定期检测 ISR 中 Follower 是否脱离 ISR , 如果发现 ISR 变化,则会将新的 ISR 信息返回到 Zookeeper 的相关节点中。

综上可以看出:

​ 由于ISR中的备份副本在所有方面(包括数据)都与主副本保持同步,因此在主副本故障时,任何一个ISR副本都可以被提升为新的主副本,从而实现高可用性。

# ③ack机制

ack机制非常重要,关系到了ISR机制、选举机制等。

  • ack=0

如果acks设置为0,那么 Producer 是不会等待 Broker 的反馈。该消息会被立刻添加到 Socket Buffer 中就认为已经发送完成。在这种情况下,服务器端是否收到请求是无法保证的,并且参数 Retries 也不会生效(因为客户端无法获得失败信息)。

这个时候每个记录返回的 Offset 总是被设置为-1。这个模式下 Kafka 的吞吐量最大,并发最高,但是数据非常容易丢失,通常适用在一些记录应用日志,对数据要求不高的业务场景。

  • ack=1

​ 如果acks设置为1,这个时候 Leader 节点会将记录先写入本地日志,并且在所有 Follower 节点反馈之前就先确认成功。在这种情况下,如果 Leader 节点在接收记录之后,并且在 Follower 节点复制数据完成之前发生错误,那么这条记录会丢失。这个模式和 Mysql 的主从异步复制一样,主从之间会有数据差异,此配置为 Kafka 默认配置。它平衡了数据安全和性能。

  • acks = all || acks >=2

如果acks设置为all,这个时候 Leader 节点会等待所有同步中的LSR副本确认之后再确认这条记录是否发送完成。只要至少有一个同步副本存在,记录就不会丢失。

如果说 Leader 这时候刚接收到了消息,但是 Follower 没有收到消息,此时 Leader 宕机了,那么客户端会感知到这个消息没发送成功,他会重试再次发送消息过去。

其中Broker有个配置项min.insync.replicas(默认值为1)代表了正常写入生产者数据所需要的最少ISR个数, 当ISR中的副本数量小于min.insync.replicas时,Leader停止写入生产者生产的消息,并向生产者抛出NotEnoughReplicas异常,阻塞等待更多的 Follower 赶上并重新进入ISR, 因此能够容忍min.insync.replicas-1个副本同时宕机

这种方式是牺牲了性能为代价,适合对数据要求比较高的业务场景。

从上面可以看出:随着ack的增大,其对数据的可用性保证是越来越高的

producer缓存

https://juejin.cn/post/6983614310224232462

[深度剖析 Kafka Producer 的缓冲池机制【图解 + 源码分析】 - 腾讯云开发者社区-腾讯云 (tencent.com)](https://cloud.tencent.com/developer/article/1698563#:~:text=在新版的 Kafka Producer 中,设计了一个消息缓冲池,在创建 Producer 时会默认创建一个大小为 32M 的缓冲池,也可以通过,时传的 batch.size 大小,默认大小 16384,而每个 Batch 都会包含一个 batch.size 大小的内存块,消息就是存放在内存块当中。)

Kafka中的内存池设计主要用于提高内存分配与释放的效率。Kafka使用了一种基于slab的内存池管理方式,这种方式可以有效地减少内存碎片,提高内存的利用率。

在Kafka源码中,内存池的实现主要分为三个部分:SlabAllocator、Slab和ByteBuffer。

SlabAllocator是整个内存池的管理者,它负责对所有的Slab进行管理和分配。Slab是内存池中的一个固定大小的内存块,被划分为多个大小相等的ByteBuffer,每个ByteBuffer可以被用来存储消息等数据。ByteBuffer则是Kafka内存池中真正被使用的内存单元,它是由Slab中的部分内存组成,用来存储具体的数据。

SlabAllocator会预先申请一定数量的Slab,并将它们组织成链表结构,以便进行快速的分配和回收。在SlabAllocator中,通过一个数组记录每个ByteBuffer的状态,如是否被分配、所属的Slab等信息。

当需要分配内存时,SlabAllocator会从空闲的Slab中查找可以满足需求的ByteBuffer,并将其标记为已分配。同样地,在释放内存时,SlabAllocator会根据要释放的ByteBuffer确定其所属的Slab,并将其标记为未分配。如果某个Slab中的所有ByteBuffer都已经被分配,则该Slab会被释放,以便回收内存。

通过这种方式,Kafka内存池可以大大减少内存碎片,提高内存使用效率。此外,由于预先申请了一定数量的Slab,因此在消息发送等高频操作中,内存的分配和释放也可以更加快速和稳定。

推荐:

个人中心 - 腾讯云开发者社区-腾讯云 (tencent.com) (opens new window)

编辑 (opens new window)
上次更新: 2024/02/22, 14:03:19
如何安装kafka
常见命令行指令

← 如何安装kafka 常见命令行指令→

最近更新
01
spark基础
02-22
02
mysql读写分离和分库分表
02-22
03
数据库迁移
02-22
更多文章>
Theme by Vdoing | Copyright © 2019-2024 Evan Xu | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式