baby sword‘s blog baby sword‘s blog
首页
  • java基础
  • java进阶
大数据
  • mysql

    • mysql索引
    • mysql日志
  • redis

    • 单机下的redis
    • 集群下的redis
  • Spring
  • springboot
  • RPC
  • netty
  • mybatis
  • maven
  • 消息队列
  • kafka
  • zookeeper
  • rocketmq
  • 七大设计原则
  • 创建型模式
  • 结构型模式
  • 行为型模式
  • SpringCloud

    • eureka
  • SpringCloud Alibaba

    • nacos
  • 计算机网络
  • 操作系统
  • 算法
  • 个人项目
  • 个人面试面经
  • 八股记忆
  • 工作积累
  • 逻辑题
  • 面试

    • 百度后端实习二面
GitHub (opens new window)

zhengjian

不敢承担失去的风险,是不可能抓住梦想的
首页
  • java基础
  • java进阶
大数据
  • mysql

    • mysql索引
    • mysql日志
  • redis

    • 单机下的redis
    • 集群下的redis
  • Spring
  • springboot
  • RPC
  • netty
  • mybatis
  • maven
  • 消息队列
  • kafka
  • zookeeper
  • rocketmq
  • 七大设计原则
  • 创建型模式
  • 结构型模式
  • 行为型模式
  • SpringCloud

    • eureka
  • SpringCloud Alibaba

    • nacos
  • 计算机网络
  • 操作系统
  • 算法
  • 个人项目
  • 个人面试面经
  • 八股记忆
  • 工作积累
  • 逻辑题
  • 面试

    • 百度后端实习二面
GitHub (opens new window)
  • 消息队列

  • kafka

    • 如何安装kafka
    • kafka基础
    • 常见命令行指令
    • kafka可视化界面
    • 使用kafka时遇到的一些问题
    • ProducerConfig
    • kafka之生产者
    • kafka消息丢失
    • kafka之broker
    • kafka之消费者
    • 消费者源码
    • 消费者的多线程方式
    • kafka之缓冲区
    • kafka源码
    • kafka个人技术分享
  • zookeeper

  • rocketMq

  • 中间件
  • kafka
xugaoyi
2023-06-09

kafka源码

代码质量好

版本0.10.1版本。

gradle

根据代码,分析代码的架构技巧,编程技巧

先从producer开始

从producer开始debug进行流程。源码中examples/src/java中的代码demo可以直接运行。

生产环境一般用异步发送

image-20230414145855948

环境搭建https://xie.infoq.cn/article/ece8077adf7f6e8aaca047da9 (opens new window)

基础:jdk 1.8、gradle 6.8、scala 2.13.1(kafka的broker是用scala编写的)

scala 2.13.1 (opens new window)

gradle 6.8 (opens new window)

下载好后,将上面的环境变量进行相应的配置

tar –xvf file.tar //解压 tar包 tar -xzvf file.tar.gz //解压tar.gz tar -xjvf file.tar.bz2 //解压 tar.bz2 tar –xZvf file.tar.Z //解压tar.Z unrar e file.rar //解压rar unzip file.zip //解压zip

环境配置

mac环境怎么配置环境变量

首先陪你mac环境变量的文件是当前用户的根目录中的.bash_profile文件中

我们可以使用vim去编辑这个环境变量的文件

我们需要在其中配置哪些环境变量呢?

JAVA_HOME、SCALA_HOME、GRADLE_HOME

参考下面的配置

export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_351.jdk/Contents/Home
export SCALA_HOME=/Users/zhengjian/kafkaPack/scala
export GRADLE_HOME=/Users/zhengjian/kafkaPack/gradle/gradle-6.8
export PATH=$JAVA_HOME/bin:$SCALA_HOME/bin:$GRADLE_HOME/bin:$PATH:.

CLASSPATH=$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar:.

export CLASSPATH

export M="/Users/zhengjian/local/apache-maven-3.8.7"
export PATH="$M/bin:$PATH"

___MY_VMOPTIONS_SHELL_FILE="${HOME}/.jetbrains.vmoptions.sh"; if [ -f "${___MY_VMOPTIONS_SHELL_FILE}" ]; then . "${___MY_VMOPTIONS_SHELL_FILE}"; fi
~                                                                                                                                                            
~                                                                                                                                                            
~                                                                                                                                                            
~                                                                                                                                                            
~                                                                                                                                                            
~                                                                                                                                                            
~                                                                                                                                                            
~                                                                                                                                                            
~                                                                                                                                                            
~                                                                                                                                                            
~                                                                                                                                                            
".bash_profile" 13L, 573B
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

最后使用source .bash_profile刷新文件

使用scala -version和gradle -version看是否环境配置成功

Zookeeper

kafka 在 2.8.0 版本之前是依赖 Zookeeper 来存储元数据信息的,从 2.8.0 版本开始,kafka 不再强依赖 Zookeeper ,而是自己实现了 raft 协议来存储元数据。

当然我们看源码时也可以使用zookeepr (Zookeeper-3.6.3下载地址 (opens new window))

分区器

public interface Partitioner extends Configurable, Closeable {

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
  
  	…………
}

1
2
3
4
5
6
7

返回指定主题的分区下标

  • 默认实现 DefaultPartitioner
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
                         int numPartitions) {
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        }
  			//如果key存在,就对key进行hash然后对分区长度取模
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
1
2
3
4
5
6
7
8
9

Utils.murmur2(keyBytes) 对keybytes进行hash,toPositive将整数转化为非负整数

stickyPartitionCache粘性分区器

粘性分区器通过选择单个分区来发送所有非键记录,解决了将没有键的记录分散成较小批次的问题。 一旦该分区的批次被填满或以其他方式完成,粘性分区程序会随机选择并“粘”到一个新分区。 这样,在更长的时间内,记录大致均匀地分布在所有分区中,同时获得更大批量的额外好处。

image-20230416175721580

为什么有粘粘分区

首先,我们知道,Producer在发送消息的时候,会将消息放到一个ProducerBatch中, 这个Batch可能包含多条消息,然后再将Batch打包发送。关于这一块可以看看我之前的文章 图解Kafka Producer 消息缓存模型 (opens new window)

消息缓存

这样做的好处就是能够提高吞吐量,减少发起请求的次数。

但是有一个问题就是, 因为消息的发送它必须要你的一个Batch满了或者linger.ms时间到了,才会发送(当当然具体的条件会更多)。如果生产的消息比较少的话,迟迟难以让Batch塞满,那么就意味着更高的延迟。

在之前的消息发送中,就将消息轮询到各个分区的, 本来消息就少,你还给所有分区遍历的分配,那么每个ProducerBatch都很难满足条件。

那么假如我先让一个ProducerBatch塞满了之后,再给其他的分区分配是不是可以降低这个延迟呢?

详细的可以看看下面这张图、

这张图的前提是:

Topic1 有3分区, 此时给Topic1 发9条无key的消息, 这9条消息加起来都不超过batch.size . 那么以前的分配方式和粘性分区的分配方式如下

在这里插入图片描述

可以看到,使用粘性分区之后,至少是先把一个Batch填满了发送然后再去填充另一个Batch。不至于向之前那样,虽然平均分配了,但是导致一个Batch都没有放满,不能立即发送。这不就增大了延迟了吗(只能通过linger.ms时间到了才发送)

划重点:

  1. 当一个Batch发送之后,需要选择一个新的粘性分区的时候 ①. 可用分区<1 ;那么选择分区的逻辑是在所有分区中随机选择。 ②. 可用分区=1; 那么直接选择这个分区。 ③. 可用分区>1 ; 那么在所有可用分区中随机选择。

  2. 当选择下一个粘性分区的时候,不是按照分区平均的原则来分配。而是随机原则(当然不能跟上一次的分区相同)

    例如刚刚发送到的Batch是 1号分区,等Batch满了,发送之后,新的消息可能会发到2或者3, 如果选择的是2,等2的Batch满了之后,下一次选择的Batch仍旧可能是1,而不是说为了平均,选择3分区。

roundrobin RoundRobinPartitioner 轮询分区器

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (!availablePartitions.isEmpty()) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
  • 如果消息中指定了分区,则使用它
  • 将消息平均的分配到每个分区中。
  • 与key无关

上面是具体代码。有个地方需要注意;

  1. 当可用分区是0的话,那么就是遍历的是所有分区中的。
  2. 当有可用分区的话,那么遍历的是所有可用分区的。

这是一个Java方法,其目的是为给定的主题(topic)生成下一个整数值。该方法使用了Java中并发安全的AtomicInteger类来保证线程安全。

具体来说,该方法会先检查 topicCounterMap 中是否已经存在给定主题对应的计数器(counter),如果不存在则新建一个计数器并将其设置为 0;如果已经存在则直接获取该计数器。然后,通过调用 counter.getAndIncrement() 方法,返回当前计数器的值并将其加一,实现递增计数的效果。

总的来说,这个方法的作用是为给定的主题产生唯一的、递增的整数值,以便进行进一步处理。

cluster.partitionsForTopic(topic) 与cluster.availablePartitionsForTopic(topic); 有什么区别

在 Apache Kafka 中,cluster.partitionsForTopic(topic) 和 cluster.availablePartitionsForTopic(topic) 都是用于获取指定 topic 的分区信息的方法,但它们的返回结果略有不同。

cluster.partitionsForTopic(topic) 方法会返回指定 topic 所有分区的信息,包括分区 ID、分区 leader、分配到该分区的副本列表等。如果该 topic 不存在,则会返回 null。

cluster.availablePartitionsForTopic(topic) 方法则只返回指定 topic 当前可用的分区信息。也就是说,如果某个分区当前不可用(如正在进行副本重平衡或者正在进行分区迁移),那么这个方法不会返回该分区的信息。如果该 topic 不存在,则会返回空数组。

因此,当我们需要获取指定 topic 的所有分区信息时,可以使用 cluster.partitionsForTopic(topic) 方法;而当我们只需要获取当前可用的分区信息时,可以选择使用 cluster.availablePartitionsForTopic(topic) 方法,从而避免处理那些暂时不可用的分区。

内存池:减少gc压力

int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
1

如果消息大于了batchSize,那么意味一条消息就会成为一个批次,一条一条进行发送,这样的话批次的概念就失效了,因为没有将多个消息缓存后一起发送,所以这个batchSize的大小需要根据实际的业务进行设置。

copyOnWriteMap

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.common.utils;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

/**
 * A simple read-optimized map implementation that synchronizes only writes and does a full copy on each modification
 */
public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {

    private volatile Map<K, V> map;

    public CopyOnWriteMap() {
        this.map = Collections.emptyMap();
    }

    public CopyOnWriteMap(Map<K, V> map) {
        this.map = Collections.unmodifiableMap(map);
    }

    @Override
    public boolean containsKey(Object k) {
        return map.containsKey(k);
    }

    @Override
    public boolean containsValue(Object v) {
        return map.containsValue(v);
    }

    @Override
    public Set<java.util.Map.Entry<K, V>> entrySet() {
        return map.entrySet();
    }

    @Override
    public V get(Object k) {
        return map.get(k);
    }

    @Override
    public boolean isEmpty() {
        return map.isEmpty();
    }

    @Override
    public Set<K> keySet() {
        return map.keySet();
    }

    @Override
    public int size() {
        return map.size();
    }

    @Override
    public Collection<V> values() {
        return map.values();
    }

    @Override
    public synchronized void clear() {
        this.map = Collections.emptyMap();
    }

    @Override
    public synchronized V put(K k, V v) {
        Map<K, V> copy = new HashMap<K, V>(this.map);
        V prev = copy.put(k, v);
        this.map = Collections.unmodifiableMap(copy);
        return prev;
    }

    @Override
    public synchronized void putAll(Map<? extends K, ? extends V> entries) {
        Map<K, V> copy = new HashMap<K, V>(this.map);
        copy.putAll(entries);
        this.map = Collections.unmodifiableMap(copy);
    }

    @Override
    public synchronized V remove(Object key) {
        Map<K, V> copy = new HashMap<K, V>(this.map);
        V prev = copy.remove(key);
        this.map = Collections.unmodifiableMap(copy);
        return prev;
    }

    @Override
    public synchronized V putIfAbsent(K k, V v) {
        if (!containsKey(k))
            return put(k, v);
        else
            return get(k);
    }

    @Override
    public synchronized boolean remove(Object k, Object v) {
        if (containsKey(k) && get(k).equals(v)) {
            remove(k);
            return true;
        } else {
            return false;
        }
    }

    @Override
    public synchronized boolean replace(K k, V original, V replacement) {
        if (containsKey(k) && get(k).equals(original)) {
            put(k, replacement);
            return true;
        } else {
            return false;
        }
    }

    @Override
    public synchronized V replace(K k, V v) {
        if (containsKey(k)) {
            return put(k, v);
        } else {
            return null;
        }
    }

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147

image-20230416195202595

重点看put、get 。读写分离copyOnWrite为什么线程安全

append方法的代码中进行分段加锁,提高性能

分段锁,free释放,多次尝试加入批次

sender线程设计

batch中满足什么条件时可以发送

image-20230416212342279

网路架构:

编辑 (opens new window)
上次更新: 2024/02/22, 14:03:19
kafka之缓冲区
kafka个人技术分享

← kafka之缓冲区 kafka个人技术分享→

最近更新
01
spark基础
02-22
02
mysql读写分离和分库分表
02-22
03
数据库迁移
02-22
更多文章>
Theme by Vdoing | Copyright © 2019-2024 Evan Xu | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式