kafka源码
代码质量好
版本0.10.1版本。
gradle
根据代码,分析代码的架构技巧,编程技巧
先从producer开始
从producer开始debug进行流程。源码中examples/src/java中的代码demo可以直接运行。
生产环境一般用异步发送
环境搭建https://xie.infoq.cn/article/ece8077adf7f6e8aaca047da9 (opens new window)
基础:jdk 1.8、gradle 6.8、scala 2.13.1(kafka的broker是用scala编写的)
scala 2.13.1 (opens new window)
下载好后,将上面的环境变量进行相应的配置
tar –xvf file.tar //解压 tar包 tar -xzvf file.tar.gz //解压tar.gz tar -xjvf file.tar.bz2 //解压 tar.bz2 tar –xZvf file.tar.Z //解压tar.Z unrar e file.rar //解压rar unzip file.zip //解压zip
环境配置
mac环境怎么配置环境变量
首先陪你mac环境变量的文件是当前用户的根目录中的.bash_profile文件中
我们可以使用vim去编辑这个环境变量的文件
我们需要在其中配置哪些环境变量呢?
JAVA_HOME、SCALA_HOME、GRADLE_HOME
参考下面的配置
export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_351.jdk/Contents/Home
export SCALA_HOME=/Users/zhengjian/kafkaPack/scala
export GRADLE_HOME=/Users/zhengjian/kafkaPack/gradle/gradle-6.8
export PATH=$JAVA_HOME/bin:$SCALA_HOME/bin:$GRADLE_HOME/bin:$PATH:.
CLASSPATH=$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar:.
export CLASSPATH
export M="/Users/zhengjian/local/apache-maven-3.8.7"
export PATH="$M/bin:$PATH"
___MY_VMOPTIONS_SHELL_FILE="${HOME}/.jetbrains.vmoptions.sh"; if [ -f "${___MY_VMOPTIONS_SHELL_FILE}" ]; then . "${___MY_VMOPTIONS_SHELL_FILE}"; fi
~
~
~
~
~
~
~
~
~
~
~
".bash_profile" 13L, 573B
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
最后使用source .bash_profile刷新文件
使用scala -version和gradle -version看是否环境配置成功
Zookeeper
kafka 在 2.8.0 版本之前是依赖 Zookeeper 来存储元数据信息的,从 2.8.0 版本开始,kafka 不再强依赖 Zookeeper ,而是自己实现了 raft 协议来存储元数据。
当然我们看源码时也可以使用zookeepr (Zookeeper-3.6.3下载地址 (opens new window))
分区器
public interface Partitioner extends Configurable, Closeable {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
…………
}
2
3
4
5
6
7
返回指定主题的分区下标
- 默认实现 DefaultPartitioner
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
int numPartitions) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
//如果key存在,就对key进行hash然后对分区长度取模
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
2
3
4
5
6
7
8
9
Utils.murmur2(keyBytes) 对keybytes进行hash,toPositive将整数转化为非负整数
stickyPartitionCache粘性分区器
粘性分区器通过选择单个分区来发送所有非键记录,解决了将没有键的记录分散成较小批次的问题。 一旦该分区的批次被填满或以其他方式完成,粘性分区程序会随机选择并“粘”到一个新分区。 这样,在更长的时间内,记录大致均匀地分布在所有分区中,同时获得更大批量的额外好处。
为什么有粘粘分区
首先,我们知道,Producer在发送消息的时候,会将消息放到一个ProducerBatch中, 这个Batch可能包含多条消息,然后再将Batch打包发送。关于这一块可以看看我之前的文章 图解Kafka Producer 消息缓存模型 (opens new window)
这样做的好处就是能够提高吞吐量,减少发起请求的次数。
但是有一个问题就是, 因为消息的发送它必须要你的一个Batch满了或者linger.ms
时间到了,才会发送(当当然具体的条件会更多)。如果生产的消息比较少的话,迟迟难以让Batch塞满,那么就意味着更高的延迟。
在之前的消息发送中,就将消息轮询到各个分区的, 本来消息就少,你还给所有分区遍历的分配,那么每个ProducerBatch都很难满足条件。
那么假如我先让一个ProducerBatch塞满了之后,再给其他的分区分配是不是可以降低这个延迟呢?
详细的可以看看下面这张图、
这张图的前提是:
Topic1 有3分区, 此时给Topic1 发9条无key的消息, 这9条消息加起来都不超过batch.size
. 那么以前的分配方式和粘性分区的分配方式如下
可以看到,使用粘性分区之后,至少是先把一个Batch填满了发送然后再去填充另一个Batch。不至于向之前那样,虽然平均分配了,但是导致一个Batch都没有放满,不能立即发送。这不就增大了延迟了吗(只能通过linger.ms
时间到了才发送)
划重点:
当一个Batch发送之后,需要选择一个新的粘性分区的时候 ①. 可用分区<1 ;那么选择分区的逻辑是在所有分区中随机选择。 ②. 可用分区=1; 那么直接选择这个分区。 ③. 可用分区>1 ; 那么在所有可用分区中随机选择。
当选择下一个粘性分区的时候,不是按照分区平均的原则来分配。而是随机原则(当然不能跟上一次的分区相同)
例如刚刚发送到的Batch是 1号分区,等Batch满了,发送之后,新的消息可能会发到2或者3, 如果选择的是2,等2的Batch满了之后,下一次选择的Batch仍旧可能是1,而不是说为了平均,选择3分区。
roundrobin RoundRobinPartitioner 轮询分区器
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
- 如果消息中指定了分区,则使用它
- 将消息平均的分配到每个分区中。
- 与key无关
上面是具体代码。有个地方需要注意;
- 当可用分区是0的话,那么就是遍历的是所有分区中的。
- 当有可用分区的话,那么遍历的是所有可用分区的。
这是一个Java方法,其目的是为给定的主题(topic)生成下一个整数值。该方法使用了Java中并发安全的AtomicInteger类来保证线程安全。
具体来说,该方法会先检查 topicCounterMap 中是否已经存在给定主题对应的计数器(counter),如果不存在则新建一个计数器并将其设置为 0;如果已经存在则直接获取该计数器。然后,通过调用 counter.getAndIncrement() 方法,返回当前计数器的值并将其加一,实现递增计数的效果。
总的来说,这个方法的作用是为给定的主题产生唯一的、递增的整数值,以便进行进一步处理。
cluster.partitionsForTopic(topic) 与cluster.availablePartitionsForTopic(topic); 有什么区别
在 Apache Kafka 中,cluster.partitionsForTopic(topic)
和 cluster.availablePartitionsForTopic(topic)
都是用于获取指定 topic 的分区信息的方法,但它们的返回结果略有不同。
cluster.partitionsForTopic(topic)
方法会返回指定 topic 所有分区的信息,包括分区 ID、分区 leader、分配到该分区的副本列表等。如果该 topic 不存在,则会返回 null。
cluster.availablePartitionsForTopic(topic)
方法则只返回指定 topic 当前可用的分区信息。也就是说,如果某个分区当前不可用(如正在进行副本重平衡或者正在进行分区迁移),那么这个方法不会返回该分区的信息。如果该 topic 不存在,则会返回空数组。
因此,当我们需要获取指定 topic 的所有分区信息时,可以使用 cluster.partitionsForTopic(topic)
方法;而当我们只需要获取当前可用的分区信息时,可以选择使用 cluster.availablePartitionsForTopic(topic)
方法,从而避免处理那些暂时不可用的分区。
内存池:减少gc压力
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
如果消息大于了batchSize,那么意味一条消息就会成为一个批次,一条一条进行发送,这样的话批次的概念就失效了,因为没有将多个消息缓存后一起发送,所以这个batchSize的大小需要根据实际的业务进行设置。
copyOnWriteMap
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.utils;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
/**
* A simple read-optimized map implementation that synchronizes only writes and does a full copy on each modification
*/
public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
private volatile Map<K, V> map;
public CopyOnWriteMap() {
this.map = Collections.emptyMap();
}
public CopyOnWriteMap(Map<K, V> map) {
this.map = Collections.unmodifiableMap(map);
}
@Override
public boolean containsKey(Object k) {
return map.containsKey(k);
}
@Override
public boolean containsValue(Object v) {
return map.containsValue(v);
}
@Override
public Set<java.util.Map.Entry<K, V>> entrySet() {
return map.entrySet();
}
@Override
public V get(Object k) {
return map.get(k);
}
@Override
public boolean isEmpty() {
return map.isEmpty();
}
@Override
public Set<K> keySet() {
return map.keySet();
}
@Override
public int size() {
return map.size();
}
@Override
public Collection<V> values() {
return map.values();
}
@Override
public synchronized void clear() {
this.map = Collections.emptyMap();
}
@Override
public synchronized V put(K k, V v) {
Map<K, V> copy = new HashMap<K, V>(this.map);
V prev = copy.put(k, v);
this.map = Collections.unmodifiableMap(copy);
return prev;
}
@Override
public synchronized void putAll(Map<? extends K, ? extends V> entries) {
Map<K, V> copy = new HashMap<K, V>(this.map);
copy.putAll(entries);
this.map = Collections.unmodifiableMap(copy);
}
@Override
public synchronized V remove(Object key) {
Map<K, V> copy = new HashMap<K, V>(this.map);
V prev = copy.remove(key);
this.map = Collections.unmodifiableMap(copy);
return prev;
}
@Override
public synchronized V putIfAbsent(K k, V v) {
if (!containsKey(k))
return put(k, v);
else
return get(k);
}
@Override
public synchronized boolean remove(Object k, Object v) {
if (containsKey(k) && get(k).equals(v)) {
remove(k);
return true;
} else {
return false;
}
}
@Override
public synchronized boolean replace(K k, V original, V replacement) {
if (containsKey(k) && get(k).equals(original)) {
put(k, replacement);
return true;
} else {
return false;
}
}
@Override
public synchronized V replace(K k, V v) {
if (containsKey(k)) {
return put(k, v);
} else {
return null;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
重点看put、get 。读写分离copyOnWrite为什么线程安全
append方法的代码中进行分段加锁,提高性能
分段锁,free释放,多次尝试加入批次
sender线程设计
batch中满足什么条件时可以发送
网路架构: