千家信息网

Kafka生产者与可靠性保证ACK的方法有哪些

发表于:2024-11-20 作者:千家信息网编辑
千家信息网最后更新 2024年11月20日,本篇内容介绍了"Kafka生产者与可靠性保证ACK的方法有哪些"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学
千家信息网最后更新 2024年11月20日Kafka生产者与可靠性保证ACK的方法有哪些

本篇内容介绍了"Kafka生产者与可靠性保证ACK的方法有哪些"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

生产者消息发送流程

消息发送的整体流程,生产端主要由两个线程协调运行。分别是main线程和sender线程(发送线程)。

在Kafka(2.6.0版本)源码中,可以看到。

源码地址: kafka\clients\src\main\java\org.apache.kafka.clients.producer.KafkaProducer.java测试入口:KafkaProducerTest.testInvalidGenerationIdAndMemberIdCombinedInSendOffsets()

在创建KafkaProducer时,在430创建了一个Sender对象,并且启动了一个IO线程。

this.errors = this.metrics.sensor("errors");this.sender = newSender(logContext, kafkaClient, this.metadata);String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;this.ioThread = new KafkaThread(ioThreadName, this.sender, true);this.ioThread.start();
interceptor

interceptor的作用是实现消息的定制化,类似:spring Interceptor 、MyBatis的插件、Quartz的监听器。

@Overridepublic Future send(ProducerRecord record, Callback callback) {     // intercept the record, which can be potentially modified; this method does not throw exceptions     ProducerRecord interceptedRecord = this.interceptors.onSend(record);    return doSend(interceptedRecord, callback);}

可通过实现org.apache.kafka.clients.producer.ProducerInterceptor接口开发自定义器。

简单自定义例子:

public class CustomInterceptor implements ProducerInterceptor {    // 发送消息时触发    @Override    public ProducerRecord onSend(ProducerRecord record) {        System.out.println("发送消息时触发");        return record;    }    // 收到服务端的ACK时触发    @Override    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {        System.out.println("消息被服务端接收");    }    @Override    public void close() {        System.out.println("生产者关闭");    }    // 用键值对配置时触发    @Override    public void configure(Map configs) {        System.out.println("configure...");    }}// 生产者中添加List interceptors = new ArrayList<>();interceptors.add("com.freecloud.plug.kafka.interceptor.CustomInterceptor");props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
序列化
byte[] serializedKey;try {    serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException cce) {    throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +            " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +            " specified in key.serializer", cce);}byte[] serializedValue;try {    serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());} catch (ClassCastException cce) {    throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +            " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +            " specified in value.serializer", cce);}

在kafka针对不同的数据类型做了相应的序列化工具。如需自定义实现org.apache.kafka.common.serialization.Serializer接口。

路由器(分区器)
int partition = partition(record, serializedKey, serializedValue, cluster);
消息累加器
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,                    serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

// RecordAccumulator本质是一个ConcurrentMap:

private final ConcurrentMap> batches;

一个partition一个Batch。batch满了之后,会唤醒Sender线程发送消息。

if (result.batchIsFull || result.newBatchCreated) {    log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);    this.sender.wakeup();}

数据可靠性保证ACK

生产者发送一条消息到服务器如何确保服务器收到消息?如果在发送过程中网络出了问题,或者kafka服务器接收的时候出了问题,这个消息发送失败了,生产者是不知道的。

所以kafka服务端需要使用一种响应客户端的方式,只有在服务端确认以后,生产者才发一下条消息,否则重新发送数据。

那什么时候才算接收成功?因为消息存储在不同的broker里,所以是在写入到磁盘之后响应生产者。

服务端响应策略

在分布式场景中,只有一个broker写入成功还是不够的,如果有多个副本,follower也要写入成功才行。

服务端发送ACK给生产者一般有以下几种策略。

  1. 只要leader成功接收就可以,会产生副本与leader不一致情况,如果leader出问题可能会出现数据丢失风险。客户端等待时间最短。

  2. 需要半数以上的follower节点完成同步,这种方式客户端等待的时间比上边稍长一点,但可以确保大部分场景不出问题。

  3. 需要所有follwer全部完成同步,客户端等待时间最长,但如果节点挂掉的影响相对来说最小,因为所有节点的数据都是完整的。

kafka的ACK应答机制就使用了以上三种方式。可以通过配置acks参数进行配置。

ISR (in-sync replica set)

上边第三种方式如果保证所有follower同步数据成功?

假设leader接收到数据,所有follower都开始同步数据,但是有一个follower出了问题,没办法从leader同步数据,按这个规则,leader就要一直等待,无法返回ack,成了害群之马。

所以我们该如果解决这个问题呢?接下来我们把规则修改一下,不是所有follower都有权利让leader等待,而是只有那些正常工作的follower同步数据的时候才会等待。

把那些正常和leader保持同步的副本维护起来,放到一个动态set里,这个就叫做in-sync replica set (ISR)。只要ISR里面的follower同步完数据之后,就可以给客户端发送ACK。

对于经常出问题的follower可以设定replica.lag.time.max.ms=30(默认30秒),如果超过配置时间才会从isr中剔除。

参数说明
acks = 0Producer不等待broker的ack,brokder一接收到还没写入磁盘就返回,当brokder故障时有可能丢失数据;
acks = 1Producer等待brokder的ack,partition的leader成功落盘后返回ack,如果在follower同步成功前leader故障,将会丢失数据;
acks = -1producer等待brokder的ack,partition的leader和follower全部成功落盘后才返回ack;

以上三种机制性能依次递减(producer吞吐量降低),数据健壮性则依次递增。实际开发中可根据不同场景选择不同的策略。

"Kafka生产者与可靠性保证ACK的方法有哪些"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

数据 消息 生产 生产者 同步 服务 成功 问题 线程 客户 保证 不同 客户端 方式 时间 配置 可靠性 副本 只有 场景 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 站群服务器如何做seo 保定行为管理服务器价格 数据库授予用户建表权限 银行网络安全项目 不可重复读违反了数据库事务 java高级数据库安装 高等院校基本状态数据库系统 国研网统计数据库免费吗 服务器上的数据库未打开 青浦区特定软件开发服务产品介绍 软件开发工程师被淘汰 市人大调研非机动车和网络安全 苹果网络安全性在哪儿设置 亲子网络安全绘画 河北太阳线软件开发价格 数据库大题是电脑阅卷吗 张家界软件开发外包公司 广东省第三届网络安全技能大赛 金融国外数据库 软件开发包含哪些经营范围 网络技术平台开发合同版本 bufferpool数据库 微信小程序云数据库优势 网络安全图片2021 在互联网中部署多台缓存服务器 江西品牌软件开发电话多少 《网络技术应用》目录 亿保网络技术有限公司招聘 关系数据库中的关系是指什么集合 东环路租房软件开发
0