千家信息网

Apache Pulsar二进制协议怎么实现

发表于:2025-02-06 作者:千家信息网编辑
千家信息网最后更新 2025年02月06日,这篇文章主要介绍"Apache Pulsar二进制协议怎么实现",在日常操作中,相信很多人在Apache Pulsar二进制协议怎么实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希
千家信息网最后更新 2025年02月06日Apache Pulsar二进制协议怎么实现

这篇文章主要介绍"Apache Pulsar二进制协议怎么实现",在日常操作中,相信很多人在Apache Pulsar二进制协议怎么实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Apache Pulsar二进制协议怎么实现"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

pulsar 使用protocolBuf 作为二进制协议编写的工具

大致的消息类型(截止2.7版本)

message BaseCommand {    enum Type {        CONNECT     = 2;        CONNECTED   = 3;        // consumer 注册        SUBSCRIBE   = 4;        // producer 注册        PRODUCER    = 5;        // 向topic写入消息        SEND        = 6;        // 写入的response        SEND_RECEIPT= 7;        // 写入异常的response        SEND_ERROR  = 8;        // 发message 给consumer        MESSAGE     = 9;        // 确认某个消息是否成功消费        ACK         = 10;        // consumer 请求消息        FLOW        = 11;        UNSUBSCRIBE = 12;        // 通用的一个成功的response        SUCCESS     = 13;        // 通用的一个异常的response        ERROR       = 14;        CLOSE_PRODUCER = 15;        CLOSE_CONSUMER = 16;        // Producer 的 response        PRODUCER_SUCCESS = 17;        // 网络层keepAlive 用的        PING = 18;        PONG = 19;        //         REDELIVER_UNACKNOWLEDGED_MESSAGES = 20;        PARTITIONED_METADATA           = 21;        PARTITIONED_METADATA_RESPONSE  = 22;        LOOKUP           = 23;        LOOKUP_RESPONSE  = 24;        CONSUMER_STATS        = 25;        CONSUMER_STATS_RESPONSE    = 26;        //         REACHED_END_OF_TOPIC = 27;        SEEK = 28;        GET_LAST_MESSAGE_ID = 29;        GET_LAST_MESSAGE_ID_RESPONSE = 30;        //         ACTIVE_CONSUMER_CHANGE = 31;        GET_TOPICS_OF_NAMESPACE             = 32;        GET_TOPICS_OF_NAMESPACE_RESPONSE     = 33;        GET_SCHEMA = 34;        GET_SCHEMA_RESPONSE = 35;        AUTH_CHALLENGE = 36;        AUTH_RESPONSE = 37;        ACK_RESPONSE = 38;        GET_OR_CREATE_SCHEMA = 39;        GET_OR_CREATE_SCHEMA_RESPONSE = 40;        // transaction related        // 事务相关的比较容易理解,下面先忽略了 50 - 61     }    // .....}

CommandConnect

这里是客户端与server连接的channel一连上就会发送一个CONNECT 请求
这里会有一些鉴权和协议版本上报的信息。
沟通客户端版本之后,服务端就知道客户端支持哪些特性,会做一些兼容处理
相当于kafka 里面的ApiVersionRequest

// org.apache.pulsar.client.impl.ClientCnxpublic void channelActive(ChannelHandlerContext ctx) throws Exception {        super.channelActive(ctx);        this.timeoutTask = this.eventLoopGroup.scheduleAtFixedRate(() -> checkRequestTimeout(), operationTimeoutMs,                operationTimeoutMs, TimeUnit.MILLISECONDS);        if (proxyToTargetBrokerAddress == null) {            if (log.isDebugEnabled()) {                log.debug("{} Connected to broker", ctx.channel());            }        } else {            log.info("{} Connected through proxy to target broker at {}", ctx.channel(), proxyToTargetBrokerAddress);        }        // Send CONNECT command        ctx.writeAndFlush(newConnectCommand())                .addListener(future -> {                    if (future.isSuccess()) {                        if (log.isDebugEnabled()) {                            log.debug("Complete: {}", future.isSuccess());                        }                        state = State.SentConnectFrame;                    } else {                        log.warn("Error during handshake", future.cause());                        ctx.close();                    }                });    }

CommandConnected

这里实际上是CommandConnect 的response ,但是换了名字
(很容易对不上号)

// org.apache.pulsar.broker.service.ServerCnxprotected void handleConnect(CommandConnect connect) {        checkArgument(state == State.Start);        if (log.isDebugEnabled()) {            log.debug("Received CONNECT from {}, auth enabled: {}:"                    + " has original principal = {}, original principal = {}",                remoteAddress,                service.isAuthenticationEnabled(),                connect.hasOriginalPrincipal(),                connect.getOriginalPrincipal());        }        String clientVersion = connect.getClientVersion();        int clientProtocolVersion = connect.getProtocolVersion();        features = new FeatureFlags();        if (connect.hasFeatureFlags()) {            features.copyFrom(connect.getFeatureFlags());        }        if (!service.isAuthenticationEnabled()) {            completeConnect(clientProtocolVersion, clientVersion);            return;        }      // ......}

CommandSubscribe

这个RPC是consumer用来在服务端注册的。

具体调用的位置是,在ConsumerImpl构造函数的最后一行会请求服务端和客户端进行连接,如果拿到了一个Connection,会调用这个连接成功的回调connectionOpened 如果是consumer的话就会发送这个请求,来注册consumer相关的信息。

如果和上面的CommandConnect请求联动起来,这个请求是在CommandConnect 之后发送的。

// org.apache.pulsar.client.impl.ConsumerImpl@Override    public void connectionOpened(final ClientCnx cnx) {        // ... 上面做了一大堆的准备参数先忽略        // 构建一个subscription        ByteBuf request = Commands.newSubscribe(topic,                subscription,                consumerId,                requestId,                getSubType(),                priorityLevel,                consumerName,                isDurable,                startMessageIdData,                metadata,                readCompacted,                conf.isReplicateSubscriptionState(),                InitialPosition.valueOf(subscriptionInitialPosition.getValue()),                startMessageRollbackDuration,                schemaInfo,                createTopicIfDoesNotExist,                conf.getKeySharedPolicy());}

proto定义说明(见注释)

message CommandSubscribe {    // 这里对应subscription的4种类型    enum SubType {        Exclusive = 0;        Shared    = 1;        Failover  = 2;        Key_Shared = 3;    }       // topic 名字    required string topic        = 1;   // subscription 名字    required string subscription = 2;   // subscription 类型    required SubType subType     = 3;   // 这个是用来标记这个网络连接上的consumer标识    required uint64 consumer_id  = 4;   // 网络层的请求标识    required uint64 request_id   = 5;   // consumer 名字    optional string consumer_name = 6;   // consumer 的优先级,优先级高的consumer 容易先收到请求    optional int32 priority_level = 7;   // 这个subsciption是否是持久化的    // Signal wether the subscription should be backed by a    // durable cursor or not    optional bool durable = 8 [default = true];    // If specified, the subscription will position the cursor    // markd-delete position  on the particular message id and    // will send messages from that point    optional MessageIdData start_message_id = 9;    // 加了一些consumer 的自定义tag Map    /// Add optional metadata key=value to this consumer    repeated KeyValue metadata = 10;    optional bool read_compacted = 11;    optional Schema schema = 12;   // 初始化位置从哪里开始,最新还是最旧    enum InitialPosition {        Latest   = 0;        Earliest = 1;    }    // Signal whether the subscription will initialize on latest    // or not -- earliest    optional InitialPosition initialPosition = 13 [default = Latest];    // geo-replication 相关,先忽略    // Mark the subscription as "replicated". Pulsar will make sure    // to periodically sync the state of replicated subscriptions    // across different clusters (when using geo-replication).    optional bool replicate_subscription_state = 14;    // If true, the subscribe operation will cause a topic to be    // created if it does not exist already (and if topic auto-creation    // is allowed by broker.    // If false, the subscribe operation will fail if the topic    // does not exist.    optional bool force_topic_creation = 15 [default = true];    // 这个是按照时间重置消费进度的时候    // If specified, the subscription will reset cursor's position back    // to specified seconds and  will send messages from that point    optional uint64 start_message_rollback_duration_sec = 16 [default = 0];    // key_Share 模式使用的,暂时不看    optional KeySharedMeta keySharedMeta = 17;}

CommandProducer

这个RPC 和 consumer相对应的,是producer在服务端注册用的,调用位置也是相同的org.apache.pulsar.client.impl.ProducerImpl.connectionOpened 里面。

/// Create a new Producer on a topic, assigning the given producer_id,/// all messages sent with this producer_id will be persisted on the topicmessage CommandProducer {    // topic     required string topic         = 1;    required uint64 producer_id   = 2;    // 网络层的请求编号    required uint64 request_id    = 3;    /// If a producer name is specified, the name will be used,    /// otherwise the broker will generate a unique name    optional string producer_name = 4;    // 是否是加密的写入    optional bool encrypted       = 5 [default = false];    // 元数据 Map    /// Add optional metadata key=value to this producer    repeated KeyValue metadata    = 6;    optional Schema schema = 7;    // 这里应该叫producer_epoch    // If producer reconnect to broker, the epoch of this producer will +1    optional uint64 epoch = 8 [default = 0];    // Indicate the name of the producer is generated or user provided    // Use default true here is in order to be forward compatible with the client    optional bool user_provided_producer_name = 9 [default = true];    // 这里是写入的3种方式    // Require that this producers will be the only producer allowed on the topic    optional ProducerAccessMode producer_access_mode = 10 [default = Shared];    // Topic epoch is used to fence off producers that reconnects after a new    // exclusive producer has already taken over. This id is assigned by the    // broker on the CommandProducerSuccess. The first time, the client will    // leave it empty and then it will always carry the same epoch number on    // the subsequent reconnections.    optional uint64 topic_epoch = 11;}enum ProducerAccessMode {    Shared           = 0; // By default multiple producers can publish on a topic    Exclusive        = 1; // Require exclusive access for producer. Fail immediately if there's already a producer connected.    WaitForExclusive = 2; // Producer creation is pending until it can acquire exclusive access}

CommandProducerSuccess

这个是作为CommandProduce 请求的成功response

/// Response from CommandProducermessage CommandProducerSuccess {    // 网络层id    required uint64 request_id    = 1;    // producer 名字    required string producer_name = 2;    // The last sequence id that was stored by this producer in the previous session    // This will only be meaningful if deduplication has been enabled.    optional int64  last_sequence_id = 3 [default = -1];    optional bytes schema_version = 4;    // The topic epoch assigned by the broker. This field will only be set if we    // were requiring exclusive access when creating the producer.    optional uint64 topic_epoch = 5;    // 这个应该和上面ProducerAccessMode 相关,后面有机会来介绍这个吧    // If producer is not "ready", the client will avoid to timeout the request    // for creating the producer. Instead it will wait indefinitely until it gets     // a subsequent  `CommandProducerSuccess` with `producer_ready==true`.    optional bool producer_ready = 6 [default = true];}

CommandSend

这个是producer 用来发送消息到服务端用的RPC
可以通过org.apache.pulsar.client.impl.ProducerImpl.sendAsync 这个方法一路追到这个调用的位置,一般消息经过batch,加密,分块等逻辑处理之后,会将消息序列化成这个请求。

具体序列化的格式是下面这个
BaseCommand就是CommandSend

// org.apache.pulsar.common.protocol.Commandsprivate static ByteBufPair serializeCommandSendWithSize(BaseCommand cmd, ChecksumType checksumType,            MessageMetadata msgMetadata, ByteBuf payload) {        // / Wire format        // [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]

这里面的protocol格式实际只包含了上面的 [CMD] 部分

message CommandSend {    required uint64 producer_id = 1;    required uint64 sequence_id = 2;    optional int32 num_messages = 3 [default = 1];    optional uint64 txnid_least_bits = 4 [default = 0];    optional uint64 txnid_most_bits = 5 [default = 0];    /// Add highest sequence id to support batch message with external sequence id    optional uint64 highest_sequence_id = 6 [default = 0];    optional bool is_chunk     =7 [default = false];}

CommandSendReceipt

这个是服务端成功处理完消息持久化之后成功的response

message CommandSendReceipt {    required uint64 producer_id = 1;    // 这个是用来保证顺序的    required uint64 sequence_id = 2;    optional MessageIdData message_id = 3;    // 这个应该是用来去重的    optional uint64 highest_sequence_id = 4 [default = 0];}// 这个是返回的写入成功的消息id,这个结构会在其他位置复用message MessageIdData {    required uint64 ledgerId = 1;    required uint64 entryId  = 2;    optional int32 partition = 3 [default = -1];    // 这里是    optional int32 batch_index = 4 [default = -1];    repeated int64 ack_set = 5;    optional int32 batch_size = 6;}

CommandSendError

这个是CommandSend 异常的response

message CommandSendError {    required uint64 producer_id = 1;    required uint64 sequence_id = 2;    required ServerError error  = 3;    required string message     = 4;}

CommandFlow

这个是用来告知服务端我这个consumer当前可以接受消息的数目
服务端会记录一个subscription里面每个consumer当前可以接受消息的数目
分配消息给哪个consumer的时候会按照这个数目来确定consumer当前能否接受消息。

目前了解到的位置是在connectionOpened的这个方法成功处理Subscription 注册之后会发送一个CommandFlow 请求,来让服务端推送消息。
不过可以想到,如果consumer队列是空闲的状态下都会发送这个消息。

message CommandFlow {    required uint64 consumer_id       = 1;    // Max number of messages to prefetch, in addition    // of any number previously specified    required uint32 messagePermits     = 2;}

CommandMessage

这里实际上可能是服务端推消息给consumer,服务端会主动发送这个请求给consumer。(这个逻辑在服务端的 subscription 里的 dispatcher里面)

具体的调用位置在 org.apache.pulsar.broker.service.Consumer#sendMessages
这个方法在往上看一层的话都是org.apache.pulsar.broker.service.Dispatcher 这个类调用的。

这里和上面写入的格式一样这里的Command 实际上是一个RPC的header后面会加上消息的payload。

//  Wire format// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]//// metadataAndPayload contains from magic-number to the payload included
message CommandMessage {    required uint64 consumer_id       = 1;    // 这里是消息的id    required MessageIdData message_id = 2;    // 这个消息重发了多少次    optional uint32 redelivery_count  = 3 [default = 0];    // 这个消息里面哪些已经被ack了    repeated int64 ack_set = 4;}

CommandAck

这个用来ack成功消费的消息,可以单独ack一条消息,
也可以累积确认(类似kafka)。
这里为了减少RPC的频率,在客户端做了一个batch ack 的优化。
服务端的对应处理一般会更新ManagedCursor里面保存的数据,将这个ack的结果持久化。

message CommandAck {        // ack 类型,是累积确认还是单独确认    enum AckType {        Individual = 0;        Cumulative = 1;    }    required uint64 consumer_id       = 1;    required AckType ack_type         = 2;    // 这里类型是repeated类型的可以把ack做batch    // In case of individual acks, the client can pass a list of message ids    repeated MessageIdData message_id = 3;    // Acks can contain a flag to indicate the consumer    // received an invalid message that got discarded    // before being passed on to the application.    enum ValidationError {        UncompressedSizeCorruption = 0;        DecompressionError = 1;        ChecksumMismatch = 2;        BatchDeSerializeError = 3;        DecryptionError = 4;    }    // 一些异常情况可能也会ack这个消息,这里会记录一些信息    optional ValidationError validation_error = 4;    repeated KeyLongValue properties = 5;    optional uint64 txnid_least_bits = 6 [default = 0];    optional uint64 txnid_most_bits = 7 [default = 0];    // 网络层请求id    optional uint64 request_id = 8;}

CommandRedeliverUnacknowledgedMessages

这个是consumer告诉服务端哪些消息需要重新被投递的RPC

message CommandRedeliverUnacknowledgedMessages {    required uint64 consumer_id = 1;    repeated MessageIdData message_ids = 2;}

CommandSuccess & CommandError

这个其实是一个公用的response,如果请求没有特殊需要返回的字段的话,几乎可以被所有的请求使用。
这里不像Kafka 每个request和response 都带着一个ApiKey不会严格一一对应。

message CommandSuccess {    required uint64 request_id = 1;    optional Schema schema = 2;}message CommandError {    required uint64 request_id  = 1;    required ServerError error = 2;    required string message    = 3;}

CommandPing & CommandPong

这2个都是空的,主要作用是用来维护tcp连接应用层的keepAlive
org.apache.pulsar.common.protocol.PulsarHandler#handleKeepAliveTimeout

// Commands to probe the state of connection.// When either client or broker doesn't receive commands for certain// amount of time, they will send a Ping probe.message CommandPing {}message CommandPong {}

到此,关于"Apache Pulsar二进制协议怎么实现"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0