kafka实现异步发送-凯发k8官方网
kafka 一直以来都以高吞吐量的特性而家喻户晓,就在上周,在一个性能监控项目中,需要使用到 kafka 传输海量消息,在这过程中遇到了一个 kafka producer 异步发送消息会被阻塞的问题,导致生产端发送耗时很大。
是的,你没听错,kafka producer 异步发送消息也会发生阻塞现象,那究竟是怎么回事呢?
在新版的 kafka producer 中,设计了一个消息缓冲池,客户端发送的消息都会被存储到缓冲池中,同时 producer 启动后还会开启一个 sender 线程,不断地从缓冲池获取消息并将其发送到 broker,如下图所示:
这么看来,kafka 的所有发送,都可以看作是异步发送了,因此在新版的 kafka producer 中废弃掉异步发送的方法了,仅保留了一个 send 方法,同时返回一个 futrue 对象,需要同步等待发送结果,就使用 futrue#get 方法阻塞获取发送结果。而我在项目中直接调用 send 方法,为何还会发送阻塞呢?
我们在构建 kafka producer 时,会有一个自定义缓冲池大小的参数 buffer.memory,默认大小为 32m,因此缓冲池的大小是有限制的,我们不妨想一下,缓冲池内存资源耗尽了会怎么样?
kafka 源码的注释是非常详细的,recordaccumulator 类是 kafka producer 缓冲池的核心类,而 recordaccumulator 类就有那么一段注释:
the accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless this behavior is explicitly disabled.
大概的意思是:
当缓冲池的内存块用完后,消息追加调用将会被阻塞,直到有空闲的内存块。
由于性能监控项目每分钟需要发送几百万条消息,只要 kafka 集群负载很高或者网络稍有波动,sender 线程从缓冲池捞取消息的速度赶不上客户端发送的速度,就会造成客户端发送被阻塞。
我写个例子让大家直观感受一下被阻塞的现象:
public static void main(string[] args){
properties properties = new properties();
properties.put(producerconfig.acks_config, "0");
properties.put(producerconfig.key_serializer_class_config, "org.apache.kafka.common.serialization.stringserializer");
properties.put(producerconfig.value_serializer_class_config, "org.apache.kafka.common.serialization.bytearrayserializer");
properties.put(producerconfig.bootstrap_servers_config, "localhost:9092,localhost:9093,localhost:9094");
properties.put(producerconfig.linger_ms_config, 1000);
properties.put(producerconfig.batch_size_config, 1024 * 1024);
properties.put(producerconfig.max_request_size_config, 5242880);
properties.put(producerconfig.compression_type_config, "lz4");
kafkaproducer producer = new kafkaproducer<>(properties);
string str = "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz0123456789";
list byteslist = new arraylist<>();
random random = new random();
for (int j = 0; j
int i1 = random.nextint(10);
if (i1 == 0) {
i1 = 1;
}
byte[] bytes = new byte[1024 * i1];
for (int i = 0; i
bytes[i] = (byte) str.charat(random.nextint(62));
}
byteslist.add(bytes);
}
while (true) {
long start = system.currenttimemillis();
producer.send(new producerrecord<>("test_topic", byteslist.get(random.nextint(1023))));
long end = system.currenttimemillis() - start;
if (end > 100) {
system.out.println("发送耗时:" end);
}
// thread.sleep(10);
}
}
以上例子构建了一个 kafka producer 对象,同时使用死循环不断地发送消息,这时如果把 thread.sleep(10);注释掉,则会出现发送耗时很长的现象:
使用 jprofiler 可以查看到分配内存的地方出现了阻塞:
跟踪到源码:
发现在 org.apache.kafka.clients.producer.internals.bufferpool#allocate 方法中,如果判断缓冲池没有空闲的内存了,则会阻塞内存分配,直到有空闲内存为止。
如果不注释 thread.sleep(10);这段代码则不会发生阻塞现象,打断点到阻塞的地方,也不会被 debug 到,从现象能够得知,thread.sleep(10);使得发送消息的频率变低了,此时 sender 线程发送的速度超过了客户端的发送速度,缓冲池一直处于未满状态,因此不会产生阻塞现象。
除了以上缓冲池内存满了会发生阻塞之外,kafka produer 其它情况都不会发生阻塞了吗?非也,其实还有一个地方,也会发生阻塞!
kafka producer 通常在第一次发送消息之前,需要获取该主题的元数据 metadata,metadata 内容包括了主题相关分区 leader 所在节点信息、副本所在节点信息、isr 列表等,kafka producer 获取 metadata 后,便会根据 metadata 内容将消息发送到指定的分区 leader 上,整个获取流程大致如下:
如上图所示,kafka producer 在发送消息之前,会检查主题的 metadata 是否需要更新,如果需要更新,则会唤醒 sender 线程并发送 metatadata 更新请求,此时 kafka producer 主线程则会阻塞等待 metadata 的更新。
如果 metadata 一直无法更新,则会导致客户端一直阻塞在那里。
总结
以上是凯发k8官方网为你收集整理的kafka实现异步发送_kafka producer 异步发送消息居然也会阻塞?的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇:
- 下一篇: