baby sword‘s blog baby sword‘s blog
首页
  • java基础
  • java进阶
大数据
  • mysql

    • mysql索引
    • mysql日志
  • redis

    • 单机下的redis
    • 集群下的redis
  • Spring
  • springboot
  • RPC
  • netty
  • mybatis
  • maven
  • 消息队列
  • kafka
  • zookeeper
  • rocketmq
  • 七大设计原则
  • 创建型模式
  • 结构型模式
  • 行为型模式
  • SpringCloud

    • eureka
  • SpringCloud Alibaba

    • nacos
  • 计算机网络
  • 操作系统
  • 算法
  • 个人项目
  • 个人面试面经
  • 八股记忆
  • 工作积累
  • 逻辑题
  • 面试

    • 百度后端实习二面
GitHub (opens new window)

zhengjian

不敢承担失去的风险,是不可能抓住梦想的
首页
  • java基础
  • java进阶
大数据
  • mysql

    • mysql索引
    • mysql日志
  • redis

    • 单机下的redis
    • 集群下的redis
  • Spring
  • springboot
  • RPC
  • netty
  • mybatis
  • maven
  • 消息队列
  • kafka
  • zookeeper
  • rocketmq
  • 七大设计原则
  • 创建型模式
  • 结构型模式
  • 行为型模式
  • SpringCloud

    • eureka
  • SpringCloud Alibaba

    • nacos
  • 计算机网络
  • 操作系统
  • 算法
  • 个人项目
  • 个人面试面经
  • 八股记忆
  • 工作积累
  • 逻辑题
  • 面试

    • 百度后端实习二面
GitHub (opens new window)
  • 消息队列

  • kafka

    • 如何安装kafka
    • kafka基础
    • 常见命令行指令
    • kafka可视化界面
    • 使用kafka时遇到的一些问题
    • ProducerConfig
    • kafka之生产者
    • kafka消息丢失
    • kafka之broker
    • kafka之消费者
    • 消费者源码
    • 消费者的多线程方式
    • kafka之缓冲区
      • 没有缓冲区的缺点
      • 加上缓冲区的优点
      • 缓冲区在整体流程中的位置
      • 缓冲区设计 结合源码
        • 成员变量
        • 成员变量
      • 缓冲区设计举一反三
    • kafka源码
    • kafka个人技术分享
  • zookeeper

  • rocketMq

  • 中间件
  • kafka
xugaoyi
2023-06-09
目录

kafka之缓冲区

# 没有缓冲区的缺点

在没有缓冲区之前,生产者生产一条消息,就向broker发送一条消息,在数据量较小的情况下,看起来没有什么缺点,但是当数据量非常大时,每条信息都要进行一次网络io,这是非常消耗性能的。所以kafka相当将这些数据先缓存起来,当达到一定的量时,在以batch的方式发送出去。

每次数据发送出去,确认发生成功后,相应的数据就应该被垃圾回收,但是当数据量非常大了后,频繁地进行垃圾回收,非常影响用户线程的性能。

  1. 频繁的网络请求:如果没有缓冲区,每次发送消息都需要立即向 Kafka 服务器发送请求。这可能会导致频繁的网络请求,降低了整个系统的性能。
  2. 性能下降:如果没有缓冲区,生产者必须等待每个消息被写入 Kafka 服务器之后,才能继续发送下一个消息。这样可能会导致生产者的性能下降。
  3. 数据丢失:如果生产者发生错误或崩溃,那么它所发送的消息将会丢失。如果有缓冲区,则可以在生产者重新启动后重新发送未发送成功的消息。
  4. 资源浪费:如果没有缓冲区,生产者可能会不断发送大量的小数据包,这会浪费网络带宽和服务器资源。

# 加上缓冲区的优点

  1. 提高性能:生产者将消息写入缓冲区,而不是立即发送到 Kafka 服务器。这意味着生产者可以更有效地利用网络 I/O 和处理资源,提高整体系统性能。
  2. 减少网络流量:通过使用缓冲区,生产者可以批量发送多个消息,从而减少了网络流量。这降低了网络拥塞和带宽问题的风险。
  3. 提高可靠性:缓冲区可以存储未发送的消息,因此即使在发生一些故障的情况下,也可以保证消息不会丢失。如果某些消息发送失败,生产者会尝试重新发送它们,以确保所有消息都被正确处理。
  4. 提高吞吐量:通过缓冲区,生产者可以批量发送多个消息。这将极大地增加 Kafka 生产者的吞吐量,从而提高整个 Kafka 系统的性能。
  5. 提高灵活性:使用缓冲区还允许生产者控制消息发送的速率。生产者可以根据需要调整缓冲区大小、发送批处理大小等参数,以满足特定的需求。

# 缓冲区在整体流程中的位置

image-20230411204853162

如果所示,生产者发送信息时,经历消息封装、序列化、分区器路由后,就会进入缓冲区进行累计缓冲。

# 缓冲区设计 结合源码

RecordAccumulator整体设计

# 成员变量

public final class RecordAccumulator {
    private final Logger log;
    private volatile boolean closed;
    private final AtomicInteger flushesInProgress;
    private final AtomicInteger appendsInProgress;
    private final int batchSize;
    private final CompressionType compression;
    private final int lingerMs;
    private final long retryBackoffMs;
    private final int deliveryTimeoutMs;
    private final BufferPool free;
    private final Time time;
    private final ApiVersions apiVersions;
    private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
    private final IncompleteBatches incomplete;
    private final Set<TopicPartition> muted;
    private int drainIndex;
    private final TransactionManager transactionManager;
    private long nextBatchExpiryTimeMs = Long.MAX_VALUE; // the earliest time (absolute) a batch will expire.
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

①重点1 :batches

batches是一个CopyOnWriteMap类型的map,通过ConcurrentMap<TopicPartition, Deque<ProducerBatch>>可以得知java堆中存在一个map,其中key为主题分区,value是对应的producerBatch队列,当producer发送消息时,就会根据producerRecord对应的主题分区找到对应的deque队列,并加到deque队列的尾部batch中。

疑问:为什么使用的是CopyOnWriteMap呢?

copyOnWrite即写时复制技术。

它内部的集合其实就是一个非线程安全的map,通过对这个map做一系列的包装按CopyOnWrite的思想实现了线程安全。

  • 非线程安全的Map变量用volatile去修饰,保证了线程间的可见性,只要更新了map这个引用指向的对象地址那么别的线程可以立即看到。
  • 读的时候完全不用加锁,因为读的是一个只读副本,写不会发生在只读副本上,这样读的性能就会非常高,N多线程不加锁读。
  • 写的时候会多个线程调用加锁的putIfAbsent方法,这个方法保证了线程安全,同时所有的操作都用一个锁。如果有了这个元素存在就直接返回,不会再写入写的元素。
  • 保证了KafkaProducer线程的总体线程安全。

②重点2:BufferPool

# 成员变量

public class BufferPool {

    static final String WAIT_TIME_SENSOR_NAME = "bufferpool-wait-time";

    private final long totalMemory;//默认大小32M
    private final int poolableSize;//池化大小16k
    private final ReentrantLock lock;
    private final Deque<ByteBuffer> free; //池化内存
    private final Deque<Condition> waiters;//阻塞线程对应的condition
    private long nonPooledAvailableMemory;//非池化可使用的内存
    private final Metrics metrics;
    private final Time time;
    private final Sensor waitTime;
    private boolean closed;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize

bufferPool的总内存 = 所有池化的内存+未池化的内存

那么buffer pool中的内存是怎么进行申请和释放的呢?

这里的 size 即申请的内存大小,它等于 Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));

public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
    //1.验证申请的内存是否大于总内存
    if (size > this.totalMemory)
        throw new IllegalArgumentException("Attempt to allocate " + size
                                           + " bytes, but there is a hard limit of "
                                           + this.totalMemory
                                           + " on memory allocations.");

    ByteBuffer buffer = null;
    //2.加锁,保证线程安全。
    this.lock.lock();
    if (this.closed) {
        this.lock.unlock();
        throw new KafkaException("Producer closed while allocating memory");
    }
    try {
        //3.申请内存的大小是否是池化的内存大小,16k
        if (size == poolableSize && !this.free.isEmpty())
            //如果是就从池里Bytebuffer
            return this.free.pollFirst();
            // 池化内存空间的大小
        int freeListSize = freeSize() * this.poolableSize;
        //4.如果非池化空间加池化内存空间大于等于要申请的空间
        if (this.nonPooledAvailableMemory + freeListSize >= size) {
                    // 如果申请的空间大小小于池化的大小,就从free队列里拿出一个池化的大小的Bytebuffer加到nonPooledAvailableMemory中
            // 5.如果一个池化的大小的Bytebuffer不满足size,就持续释放池化内存Bytebuffer直到满足为止。
            freeUp(size);
            this.nonPooledAvailableMemory -= size;
            //如果非池化可以空间加池化内存空间小于要申请的空间
        } else {
            int accumulated = 0;
            //创建对应的Condition
            Condition moreMemory = this.lock.newCondition();
            try {
                //线程最长阻塞时间
                long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                //放入waiters集合中
                this.waiters.addLast(moreMemory);
                // 没有足够的空间就一直循环
                while (accumulated < size) {
                    long startWaitNs = time.nanoseconds();
                    long timeNs;
                    boolean waitingTimeElapsed;
                    try {
                        //空间不够就阻塞,并设置超时时间。
                        waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                    } finally {
                        long endWaitNs = time.nanoseconds();
                        timeNs = Math.max(0L, endWaitNs - startWaitNs);
                        recordWaitTime(timeNs);
                    }

                    if (this.closed)
                        throw new KafkaException("Producer closed while allocating memory");
                    if (waitingTimeElapsed) {
                        this.metrics.sensor("buffer-exhausted-records").record();
                        throw new BufferExhaustedException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
                    }
                    remainingTimeToBlockNs -= timeNs;
                    // 当申请的空间的是池化大小且ByteBuffer池化集合里有元素
                    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                        buffer = this.free.pollFirst();
                        accumulated = size;
                    } else {
                        //尝试给nonPooledAvailableMemory扩容
                        freeUp(size - accumulated);
                        int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
                        this.nonPooledAvailableMemory -= got;
                        //累计分配了多少空间
                        accumulated += got;
                    }
                }
                accumulated = 0;
            } finally {
                this.nonPooledAvailableMemory += accumulated;//把已经分配的内存还回nonPooledAvailableMemory
                this.waiters.remove(moreMemory);//删除对应的condition
            }
        }
    } finally {
        try {
            if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
                this.waiters.peekFirst().signal();
        } finally {
            lock.unlock();
        }
    }
    if (buffer == null)
        //  返回非池化ByteBuffer分配内存
        return safeAllocateByteBuffer(size);
    else
        //  返回池化的ByteBuffer分配内存
        return buffer;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public void deallocate(ByteBuffer buffer, int size) {
    lock.lock();
    try {
        // 释放的空间是否是池化大小,如果是,free上加一个ByteBuffer对象
        if (size == this.poolableSize && size == buffer.capacity()) {
            buffer.clear();
            this.free.add(buffer);
        } else {
            // 否则增加非池化空间大小
            this.nonPooledAvailableMemory += size;
        }
        // 释放第一个wait();
        Condition moreMem = this.waiters.peekFirst();
        if (moreMem != null)
            moreMem.signal();
    } finally {
        lock.unlock();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 缓冲区设计举一反三

参考:https://juejin.cn/post/7109099213111164942#heading-2 (opens new window)

编辑 (opens new window)
上次更新: 2024/02/22, 14:03:19
消费者的多线程方式
kafka源码

← 消费者的多线程方式 kafka源码→

最近更新
01
spark基础
02-22
02
mysql读写分离和分库分表
02-22
03
数据库迁移
02-22
更多文章>
Theme by Vdoing | Copyright © 2019-2024 Evan Xu | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式