千家信息网

Kafka消费与心跳机制如何理解

发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,Kafka消费与心跳机制如何理解,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。导读kafka是一个分布式,分区的,多副本的,多订阅者的消
千家信息网最后更新 2025年02月05日Kafka消费与心跳机制如何理解

Kafka消费与心跳机制如何理解,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

导读kafka是一个分布式,分区的,多副本的,多订阅者的消息发布订阅系统(分布式MQ系统),可以用于搜索日志,监控日志,访问日志等。kafka是一个分布式,分区的,多副本的,多订阅者的消息发布订阅系统(分布式MQ系统),可以用于搜索日志,监控日志,访问日志等。今天小编来领大家一起来学习一下Kafka消费与心跳机制。

1、Kafka消费

首先,我们来看看消费。Kafka提供了非常简单的消费API,使用者只需初始化Kafka的Broker Server地址,然后实例化KafkaConsumer类即可拿到Topic中的数据。一个简单的Kafka消费实例代码如下所示:

public class JConsumerSubscribe extends Thread {      public static void main(String[] args) {        JConsumerSubscribe jconsumer = new JConsumerSubscribe();        jconsumer.start();    }    /** 初始化Kafka集群信息. */    private Properties configure() {        Properties props = new Properties();        props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092");// 指定Kafka集群地址          props.put("group.id", "ke");// 指定消费者组          props.put("enable.auto.commit", "true");// 开启自动提交          props.put("auto.commit.interval.ms", "1000");// 自动提交的时间间隔          // 反序列化消息主键        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");          // 反序列化消费记录        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");          return props;      }    /** 实现一个单线程消费者. */    @Override    public void run() {        // 创建一个消费者实例对象        KafkaConsumer consumer = new KafkaConsumer<>(configure());        // 订阅消费主题集合        consumer.subscribe(Arrays.asList("test_kafka_topic"));          // 实时消费标识        boolean flag = true;          while (flag) {              // 获取主题消息数据            ConsumerRecords records = consumer.poll(Duration.ofMillis(100));              for (ConsumerRecord record : records)                  // 循环打印消息记录                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());          }        // 出现异常关闭消费者对象        consumer.close();      }}

上述代码我们就可以非常便捷地拿到Topic中的数据。但是,当我们调用poll方法拉取数据的时候,Kafka Broker Server做了那些事情。接下来,我们可以去看看源代码的实现细节。核心代码如下:

org.apache.kafka.clients.consumer.KafkaConsumer

private ConsumerRecords poll(final long timeoutMs, final boolean includeMetadataInTimeout) {          acquireAndEnsureOpen();        try {              if (timeoutMs < 0) throw new IllegalArgumentException("Timeout must not be negative");              if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {                  throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");              }            // poll for new data until the timeout expires              long elapsedTime = 0L;              do {                  client.maybeTriggerWakeup();                final long metadataEnd;                if (includeMetadataInTimeout) {                      final long metadataStart = time.milliseconds();                    if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {                          return ConsumerRecords.empty();                      }                    metadataEnd = time.milliseconds();                    elapsedTime += metadataEnd - metadataStart;                } else {                      while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) {                          log.warn("Still waiting for metadata");                      }                    metadataEnd = time.milliseconds();                }                final Map>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime));                if (!records.isEmpty()) {                      // before returning the fetched records, we can send off the next round of fetches                      // and avoid block waiting for their responses to enable pipelining while the user                      // is handling the fetched records.                      //                      // NOTE: since the consumed position has already been updated, we must not allow                      // wakeups or any other errors to be triggered prior to returning the fetched records.                      if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {                          client.pollNoWakeup();                    }                    return this.interceptors.onConsume(new ConsumerRecords<>(records));                  }                final long fetchEnd = time.milliseconds();                elapsedTime += fetchEnd - metadataEnd;            } while (elapsedTime < timeoutMs);              return ConsumerRecords.empty();          } finally {              release();        }    }

上述代码中有个方法pollForFetches,它的实现逻辑如下:

private Map>> pollForFetches(final long timeoutMs) {          final long startMs = time.milliseconds();          long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);          // if data is available already, return it immediately          final Map>> records = fetcher.fetchedRecords();          if (!records.isEmpty()) {              return records;          }          // send any new fetches (won't resend pending fetches)          fetcher.sendFetches();          // We do not want to be stuck blocking in poll if we are missing some positions          // since the offset lookup may be backing off after a failure          // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call          // updateAssignmentMetadataIfNeeded before this method.          if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {              pollTimeout = retryBackoffMs;          }          client.poll(pollTimeout, startMs, () -> {              // since a fetch might be completed by the background thread, we need this poll condition              // to ensure that we do not block unnecessarily in poll()              return !fetcher.hasCompletedFetches();          });          // after the long poll, we should check whether the group needs to rebalance          // prior to returning data so that the group can stabilize faster          if (coordinator.rejoinNeededOrPending()) {              return Collections.emptyMap();          }          return fetcher.fetchedRecords();      }

上述代码中加粗的位置,我们可以看出每次消费者客户端拉取数据时,通过poll方法,先调用fetcher中的fetchedRecords函数,如果获取不到数据,就会发起一个新的sendFetches请求。而在消费数据的时候,每个批次从Kafka Broker Server中拉取数据是有最大数据量限制,默认是500条,由属性(max.poll.records)控制,可以在客户端中设置该属性值来调整我们消费时每次拉取数据的量。

提示:这里需要注意的是,max.poll.records返回的是一个poll请求的数据总和,与多少个分区无关。因此,每次消费从所有分区中拉取Topic的数据的总条数不会超过max.poll.records所设置的值。

而在Fetcher的类中,在sendFetches方法中有限制拉取数据容量的限制,由属性(max.partition.fetch.bytes),默认1MB。可能会有这样一个场景,当满足max.partition.fetch.bytes限制条件,如果需要Fetch出10000条记录,每次默认500条,那么我们需要执行20次才能将这一次通过网络发起的请求全部Fetch完毕。

这里,可能有同学有疑问,我们不能将默认的max.poll.records属性值调到10000吗?可以调,但是还有个属性需要一起配合才可以,这个就是每次poll的超时时间(Duration.ofMillis(100)),这里需要根据你的实际每条数据的容量大小来确定设置超时时间,如果你将最大值调到10000,当你每条记录的容量很大时,超时时间还是100ms,那么可能拉取的数据少于10000条。

而这里,还有另外一个需要注意的事情,就是会话超时的问题。session.timeout.ms默认是10s,group.min.session.timeout.ms默认是6s,group.max.session.timeout.ms默认是30min。当你在处理消费的业务逻辑的时候,如果在10s内没有处理完,那么消费者客户端就会与Kafka Broker Server断开,消费掉的数据,产生的offset就没法提交给Kafka,因为Kafka Broker Server此时认为该消费者程序已经断开,而即使你设置了自动提交属性,或者设置auto.offset.reset属性,你消费的时候还是会出现重复消费的情况,这就是因为session.timeout.ms超时的原因导致的。

2、心跳机制

上面在末尾的时候,说到会话超时的情况导致消息重复消费,为什么会有超时?有同学会有这样的疑问,我的消费者线程明明是启动的,也没有退出,为啥消费不到Kafka的消息呢?消费者组也查不到我的ConsumerGroupID呢?这就有可能是超时导致的,而Kafka是通过心跳机制来控制超时,心跳机制对于消费者客户端来说是无感的,它是一个异步线程,当我们启动一个消费者实例时,心跳线程就开始工作了。

在org.apache.kafka.clients.consumer.internals.AbstractCoordinator中会启动一个HeartbeatThread线程来定时发送心跳和检测消费者的状态。每个消费者都有个org.apache.kafka.clients.consumer.internals.ConsumerCoordinator,而每个ConsumerCoordinator都会启动一个HeartbeatThread线程来维护心跳,心跳信息存放在org.apache.kafka.clients.consumer.internals.Heartbeat中,声明的Schema如下所示:

private final int sessionTimeoutMs;      private final int heartbeatIntervalMs;      private final int maxPollIntervalMs;      private final long retryBackoffMs;      private volatile long lastHeartbeatSend;       private long lastHeartbeatReceive;      private long lastSessionReset;      private long lastPoll;      private boolean heartbeatFailed;

心跳线程中的run方法实现代码如下:

public void run() {              try {                  log.debug("Heartbeat thread started");                  while (true) {                      synchronized (AbstractCoordinator.this) {                          if (closed)                              return;                          if (!enabled) {                              AbstractCoordinator.this.wait();                              continue;                          }                        if (state != MemberState.STABLE) {                              // the group is not stable (perhaps because we left the group or because the coordinator                              // kicked us out), so disable heartbeats and wait for the main thread to rejoin.                              disable();                              continue;                          }                          client.pollNoWakeup();                          long now = time.milliseconds();                          if (coordinatorUnknown()) {                              if (findCoordinatorFuture != null || lookupCoordinator().failed())                                  // the immediate future check ensures that we backoff properly in the case that no                                  // brokers are available to connect to.                                  AbstractCoordinator.this.wait(retryBackoffMs);                          } else if (heartbeat.sessionTimeoutExpired(now)) {                              // the session timeout has expired without seeing a successful heartbeat, so we should                              // probably make sure the coordinator is still healthy.                              markCoordinatorUnknown();                          } else if (heartbeat.pollTimeoutExpired(now)) {                              // the poll timeout has expired, which means that the foreground thread has stalled                              // in between calls to poll(), so we explicitly leave the group.                              maybeLeaveGroup();                          } else if (!heartbeat.shouldHeartbeat(now)) {                              // poll again after waiting for the retry backoff in case the heartbeat failed or the                              // coordinator disconnected                              AbstractCoordinator.this.wait(retryBackoffMs);                          } else {                              heartbeat.sentHeartbeat(now);                              sendHeartbeatRequest().addListener(new RequestFutureListener() {                                  @Override                                  public void onSuccess(Void value) {                                      synchronized (AbstractCoordinator.this) {                                          heartbeat.receiveHeartbeat(time.milliseconds());                                      }                                  }                                  @Override                                  public void onFailure(RuntimeException e) {                                      synchronized (AbstractCoordinator.this) {                                          if (e instanceof RebalanceInProgressException) {                                              // it is valid to continue heartbeating while the group is rebalancing. This                                              // ensures that the coordinator keeps the member in the group for as long                                              // as the duration of the rebalance timeout. If we stop sending heartbeats,                                              // however, then the session timeout may expire before we can rejoin.                                              heartbeat.receiveHeartbeat(time.milliseconds());                                          } else {                                              heartbeat.failHeartbeat();                                              // wake up the thread if it's sleeping to reschedule the heartbeat                                              AbstractCoordinator.this.notify();                                          }                                      }                                  }                              });                          }                      }                  }              } catch (AuthenticationException e) {                  log.error("An authentication error occurred in the heartbeat thread", e);                  this.failed.set(e);              } catch (GroupAuthorizationException e) {                  log.error("A group authorization error occurred in the heartbeat thread", e);                  this.failed.set(e);              } catch (InterruptedException | InterruptException e) {                  Thread.interrupted();                  log.error("Unexpected interrupt received in heartbeat thread", e);                  this.failed.set(new RuntimeException(e));              } catch (Throwable e) {                  log.error("Heartbeat thread failed due to unexpected error", e);                  if (e instanceof RuntimeException)                      this.failed.set((RuntimeException) e);                  else                      this.failed.set(new RuntimeException(e));              } finally {                  log.debug("Heartbeat thread has closed");              }          }

在心跳线程中这里面包含两个最重要的超时函数,它们是sessionTimeoutExpired和pollTimeoutExpired。

public boolean sessionTimeoutExpired(long now) {          return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeoutMs;  }public boolean pollTimeoutExpired(long now) {          return now - lastPoll > maxPollIntervalMs;  }

2.1、sessionTimeoutExpired

如果是sessionTimeout超时,则会被标记为当前协调器处理断开,此时,会将消费者移除,重新分配分区和消费者的对应关系。在Kafka Broker Server中,Consumer Group定义了5种(如果算上Unknown,应该是6种状态)状态,org.apache.kafka.common.ConsumerGroupState,如下图所示:


2.2、pollTimeoutExpired

如果触发了poll超时,此时消费者客户端会退出ConsumerGroup,当再次poll的时候,会重新加入到ConsumerGroup,触发RebalanceGroup。而KafkaConsumer Client是不会帮我们重复poll的,需要我们自己在实现的消费逻辑中不停地调用poll方法。

3.分区与消费线程

关于消费分区与消费线程的对应关系,理论上消费线程数应该小于等于分区数。之前是有这样一种观点,一个消费线程对应一个分区,当消费线程等于分区数是最大化线程的利用率。直接使用KafkaConsumer Client实例,这样使用确实没有什么问题。但是,如果我们有富裕的CPU,其实还可以使用大于分区数的线程,来提升消费能力,这就需要我们对KafkaConsumer Client实例进行改造,实现消费策略预计算,利用额外的CPU开启更多的线程,来实现消费任务分片。

看完上述内容,你们掌握Kafka消费与心跳机制如何理解的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!

消费 数据 消费者 线程 方法 属性 消息 机制 代码 实例 日志 时候 客户 客户端 订阅 分布式 时间 系统 问题 限制 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 三级数据库上机 uipath循环插入数据库 海南邵动网络技术有限公司游戏 数据库怎样判断数据是否准确 浙江适合玩lol的服务器云空间 税务局对数据库删除 数据库 客户端 mac ping服务器一般多少速度 网络安全的全球问题类型 中小家庭教育与网络安全读后感 宁波高科技博物馆软件开发 手机突然提醒网络安全隐患 单位网络安全事件应急演练方案 数据库查看指定对象数据 软件开发部门划分 英雄联盟韩服服务器 软件开发团队如何合作协议 wamp怎么连接数据库 从数据库获取二进制 大学生网络安全管理制度 华硕固件有打印服务器吗 软件开发培养方式 关系数据库中基于数学上 合肥软件开发收入 切实维护学校网络安全 信息网络安全的第三个时钟 网络监控服务器安全 软件开发团队如何合作协议 谷歌服务器怎么玩国际服云顶之弈 安徽智能软件开发培训
0