千家信息网

kafka 生产发送消息失败无响应或者Error while fetching metadata with correlation id该怎么办

发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,kafka 生产发送消息失败无响应或者Error while fetching metadata with correlation id该怎么办,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面
千家信息网最后更新 2025年02月01日kafka 生产发送消息失败无响应或者Error while fetching metadata with correlation id该怎么办

kafka 生产发送消息失败无响应或者Error while fetching metadata with correlation id该怎么办,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

今天在使用代码编写kafka 生产者发送消息的时候,因为我的手误出现的搞笑的事情。

同样的代码和kafka 在不久前执行过,是没有问题的。 代码如下

package streaming.utilsimport java.utilimport java.util.{Date, Properties, UUID}import com.alibaba.fastjson.JSONObjectimport org.apache.commons.lang3.time.FastDateFormatimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}import scala.util.Random/**  * Author: Michael PK   QQ: 1990218038  *  * Kafka数据生产者  */object ProducerApp {  def main(args: Array[String]): Unit = {    val props = new Properties    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")    props.put("bootstrap.servers", ParamsConf.brokers)    props.put("request.required.acks","1")    val topic = ParamsConf.topic    val producer = new KafkaProducer[String,String](props)    val random = new Random()    val dateFormat = FastDateFormat.getInstance("yyyyMMddHHmmss")    for(i <- 1 to 100){      val time = dateFormat.format(new Date())+""      val userid = random.nextInt(1000)+""      val courseid = random.nextInt(500)+""      val fee = random.nextInt(400)+""      val result = Array("0","1") // 0未成功支付,1成功支付      val flag = result(random.nextInt(2))      var orderid = UUID.randomUUID().toString      val map = new util.HashMap[String, Object]()      map.put("time", time)      map.put("userid",userid)      map.put("courseid",courseid)      map.put("fee", fee)      map.put("flag", flag)      map.put("orderid",orderid)      val json = new JSONObject(map)      producer.send(new ProducerRecord[String,String](topic(0),json.toJSONString))    }    println("PK Kafka生产者生产数据完毕...")  }}

代码很简单。只是用来模拟生产数据而已。

一直以来的使用的 都是 2.0 版本的 kafka client

            org.apache.kafka            kafka-clients            2.0.0        

但是今天执行的上面的代码的时候。 就 不能发生消息了,也没有错误的提示。 程序也没有关闭。

通过 debug 发现卡在 doSend 代码里面了

 private Future doSend(ProducerRecord record, Callback callback) {        TopicPartition tp = null;        try {            throwIfProducerClosed();            // first make sure the metadata for the topic is available            ClusterAndWaitTime clusterAndWaitTime;            try {                clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);            } catch (KafkaException e) {                if (metadata.isClosed())                    throw new KafkaException("Producer closed while send in progress", e);                throw e;            }

虽然它抛出了异常,但是 不能进入

if (metadata.isClosed()) 逻辑里面

外层并没有捕获它的异常。通过debug 这个 异常 e 是 Failed to update metadata after 60000 ms.

考虑到它的版本也服务器版本不一样 就试着 减低版本看看

             org.apache.kafka            kafka-clients            1.1.0        

执行的时候不停的提示:因为进入了死循环中

lientId=producer-1] Error while fetching metadata with correlation id

并且测试过了 : telnet 192.168.0.205 9092 是没有问题的

在 服务器本地上面使用 命令生产消费消息是可以的。

这样就奇怪了。 于是我换成了 另一个 主题进行测试发现是没有问题的。。。

同时注意到了原来是 我写的 主题名称 后面带有空格

低级错误啊!但是后面的空格真的是空格?我自己测试手敲空格,经过测试没有问题的。也就是如果是 主题后面有空格是可以的。

那就是主题名称 后面带上了什么不可见内容。 我想起来了,我是通过复制这个主题名称 的,估计复制多了什么其他内容。

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注行业资讯频道,感谢您对的支持。

0