千家信息网

怎么进行kafka的异步模式分析

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,本篇文章为大家展示了怎么进行kafka的异步模式分析,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。啥是异步模式kafka的生产者可以选择使用异步方式发送数据,
千家信息网最后更新 2025年01月23日怎么进行kafka的异步模式分析

本篇文章为大家展示了怎么进行kafka的异步模式分析,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

啥是异步模式

kafka的生产者可以选择使用异步方式发送数据,所谓异步方式,就是我们调用 send() 方法,并指定一个回调函数, 服务器在返回响应时调用该函数。

kafka在客户端里暴露了两个send方法,我们可以自己选择同步或者异步模式。我们来看一个kafka的生产者发送示例,有个直观的感受。这个示例是一个同步的模式。

ProducerRecord record = new ProducerRecord<>("Kafka", "Kafka_Products", "测试");//Topic Key Value
try{
Future future = producer.send(record);
future.get();//获取执行结果
} catch(Exception e) {
e.printStackTrace();
}

我们从源码层面来继续看下。

首先kafka定义了一个接口,

然后KafkaProducer实现了这两个方法,我们看下异步方法的实现逻辑。

可以看到最终是调用doSend方法,调用的时候传入一个回调。这个回调就是监听方法的执行结果的。

异步模式也会阻塞的

很多人会认为,既然是异步模式,不管结果是成功还是失败,肯定方法调用会马上返回的。那我只能告诉你,不好意思,不一定是这样。我自己就曾经踩过这个坑。

我们当时有个业务流程需要在执行完成后发送kakfa消息给某个业务方,为了尽量减少影响我这个主流程的执行时间,采用了异步方式发送kafka消息。在使用中,因为配错了kafka的TOPIC信息,发现流程阻塞发送消息这里长达6秒(kafka默认的发送超时时间)。

究竟为啥异步方式还会阻塞呢?我们继续看源码。

不管是同步模式还是异步模式,最终都会调用到doSend方法,注意看上图中的waitOnMetadata方法,我上面说的阻塞的情况就是阻塞在这个方法里。那我们继续看这个方法。

通过代码中的注释我们大概能了解这个方法的功能,不过我这里还是要解释下。(防止有人看不懂英文,哈哈)

waitOnMetadata获取当前的集群元数据信息,如果缓存有,并且分区没有超过指定分区范围则缓存返回,否则触发更新,等待新的metadata。这个等待的操作在下面这行代码:

metadata.awaitUpdate(version, remainingWaitMs);

然后就继续跟喽,

这个方法很好理解,就是一直在等一个条件,这个条件达到了就返回,否则一直等待超时退出。而这个条件就是当前的版本号要大于上个版本号。

那么谁来更新版本号呢?就是我们前面提到的sender线程。当我们的topic配置错误的时候导致metadata一直无法更新,然后一直等到超时。

上述内容就是怎么进行kafka的异步模式分析,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注行业资讯频道。

0