千家信息网

TopicLookup请求处理方法是什么

发表于:2024-11-30 作者:千家信息网编辑
千家信息网最后更新 2024年11月30日,本篇内容主要讲解"TopicLookup请求处理方法是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"TopicLookup请求处理方法是什么"吧!简单
千家信息网最后更新 2024年11月30日TopicLookup请求处理方法是什么

本篇内容主要讲解"TopicLookup请求处理方法是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"TopicLookup请求处理方法是什么"吧!

简单逻辑说明

  1. 通过topic名字确定namespace

  2. 查找这个namespace的bundle分配信息

  3. 根据bundle分配信息来确认这个topic属于哪个bundle

  4. 根据bundle信息来确认哪个broker负责这个bundle,返回broker的地址。

CommandLookup 主要用来查找Topic在被哪个broker负责。
一般客户端可以通过http协议或者二进制协议来查询。

message CommandLookupTopic {    // topic 名字    required string topic            = 1;    // 网络层请求id    required uint64 request_id       = 2;    optional bool authoritative      = 3 [default = false];    // TODO - Remove original_principal, original_auth_data, original_auth_method    // Original principal that was verified by    // a Pulsar proxy.    optional string original_principal = 4;    // Original auth role and auth Method that was passed    // to the proxy.    optional string original_auth_data = 5;    optional string original_auth_method = 6;       // 从哪个指定的连接点进行连接    optional string advertised_listener_name = 7;}

这里直接看服务端的代码ServerCnx

protected void handleLookup(CommandLookupTopic lookup) {        final long requestId = lookup.getRequestId();        final boolean authoritative = lookup.isAuthoritative();        final String advertisedListenerName = lookup.hasAdvertisedListenerName() ? lookup.getAdvertisedListenerName()                : null;       // 校验topic名字        TopicName topicName = validateTopicName(lookup.getTopic(), requestId, lookup);        if (topicName == null) {            return;        }       // 这里的Semaphore 是服务端Lookup请求的限流器        final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();        if (lookupSemaphore.tryAcquire()) {            ....            isTopicOperationAllowed(topicName, TopicOperation.LOOKUP)            .thenApply(isAuthorized -> {                // 通过鉴权                if (isAuthorized) {                    lookupTopicAsync(getBrokerService().pulsar(),                            topicName,                            authoritative,                            getPrincipal(),                            getAuthenticationData(),                            requestId,                            advertisedListenerName)                            .handle((lookupResponse, ex) -> {                                if (ex == null) {                                    ctx.writeAndFlush(lookupResponse);                                } else {                                    ....                                }                                lookupSemaphore.release();                                return null;                            });                } else {                    ....            }).exceptionally(ex -> {                ....            });        } else {            // 如果有异常是发送的`CommandLookupTopicResponse`            // 这里已经是新的定义二进制消息的方式了            // / Wire format            // [TOTAL_SIZE] [CMD_SIZE][CMD]ctx.writeAndFlush(newLookupErrorResponse(ServerError.TooManyRequests,                    "Failed due to too many pending lookup requests", requestId));        }    }

TopicLookupBase.lookupTopicAsync

org.apache.pulsar.broker.lookup.TopicLookupBase#lookupTopicAsync
这个是一个静态方法
主要

  1. validation 校验集群,topic名字等(这里面有跨集群检查的逻辑,先略过)

  2. lookup逻辑

这里校验的逻辑先略过了,实际核心的逻辑在下面这2行上。

LookupOptions options = LookupOptions.builder()                        .authoritative(authoritative)                        .advertisedListenerName(advertisedListenerName)                        .loadTopicsInBundle(true)    // 这里这个条件是true                        .build();                pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, options)

这里面的主要逻辑在NamespaceService里面,PulsarService 可以认为是一个全局对象,pulsar需要的任何核心逻辑对象
(比如说NamspaceService,BrokerService,ConfigurationCacheService等)你都可以从这个对象里面拿到。

NamespaceService.getBrokerServiceUrlAsync

这里面的主要逻辑是
根据传递过来的topic名字定位namespace
之后确认这个topic属于哪个NamespaceBundle。
之后根据这个NamespaceBundle 来找到这个bundle 的owner broker的地址。

public CompletableFuture> getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) {      ....CompletableFuture> future = getBundleAsync(topic)                .thenCompose(bundle -> findBrokerServiceUrl(bundle, options));      ....}public CompletableFuture getBundleAsync(TopicName topic) {        return bundleFactory.getBundlesAsync(topic.getNamespaceObject())                .thenApply(bundles -> bundles.findBundle(topic));}

这里面的bundleFactory实际上是一个异步加载的cache。

我们看一下定义

// org.apache.pulsar.common.naming.NamespaceBundleFactoryprivate final AsyncLoadingCache bundlesCache;// 构造函数里面public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {    // .....this.bundlesCache = Caffeine.newBuilder()                .recordStats()   // 记录metric                .buildAsync(// 加载cache 的逻辑(NamespaceName namespace, Executor executor) -> {            String path = AdminResource.joinPath(LOCAL_POLICIES_ROOT, namespace.toString());                         ....            CompletableFuture future = new CompletableFuture<>();            // Read the static bundle data from the policies            pulsar                  .getLocalZkCacheService()   // 获取LocalZooKeeperCacheService                  .policiesCache()                    .getWithStatAsync(path)                  .thenAccept(result -> {                // 这里实际是去找有没有单独为这个namespace配置bundle数量                BundlesData bundlesData = result.map(Entry::getKey).map(p -> p.bundles).orElse(null);                 // 通过namespace拿到namespaceBundle                NamespaceBundles namespaceBundles = getBundles(                    namespace, bundlesData, result.map(Entry::getValue).map(s -> s.getVersion()).orElse(-1));                ....                future.complete(namespaceBundles);            }).exceptionally(ex -> {                future.completeExceptionally(ex);                return null;            });            return future;        });      // .....}

这里简单说一下NamespaceBundles 这个类,这个类会保存这个Namespace的所有NamespaceBundle,提供一个聚合的视图。

这个类表示一个hash环,这个环按照配置的分片个数,会被分成几个片段,
每个broker会按照一定算法来确定这个环上的哪一部分属于他自己。
topic也会按照一定的算法分配到这个hash环上。
这样broker就能确定自己负责哪些topic。
就可以返回lookup请求了,这个流程也会触发topic的加载流程。

NamespaceBundles.findBundle

这个函数就是确定这个topic属于哪个NamespaceBundle

// 映射topic到hash环上的一段, 这一段就被NamespaceBundle 标识public NamespaceBundle findBundle(TopicName topicName) {        checkArgument(this.nsname.equals(topicName.getNamespaceObject()));        long hashCode = factory.getLongHashCode(topicName.toString());        NamespaceBundle bundle = getBundle(hashCode);        if (topicName.getDomain().equals(TopicDomain.non_persistent)) {            bundle.setHasNonPersistentTopic(true);        }        return bundle;    }

到这一步我们就能确定这个namespace的信息了,namespce被分为多少个bundle。
而且可以确定这个topic属于哪个namespacebundle。
下一步是根据namespaceBundle查找负责的broker。

NamespaceService.findBrokerServiceUrl

到这里是根据namespacebundle 确定broker

// 这个记录的是一个broker的元数据信息public class NamespaceEphemeralData {    private String nativeUrl;    private String nativeUrlTls;    private String httpUrl;    private String httpUrlTls;    private boolean disabled;    private Map advertisedListeners;}private CompletableFuture> findBrokerServiceUrl(            NamespaceBundle bundle, LookupOptions options) {               ConcurrentOpenHashMap>> targetMap;                return targetMap.computeIfAbsent(bundle, (k) -> {            CompletableFuture> future = new CompletableFuture<>();                        // First check if we or someone else already owns the bundle            ownershipCache.getOwnerAsync(bundle)                                     .thenAccept(nsData -> {               // nsData : Optional                if (!nsData.isPresent()) {                    // 如果没找到这个信息                    if (options.isReadOnly()) {                        // Do not attempt to acquire ownership                        future.complete(Optional.empty());                    } else {                        // 目前还没有人负责这个bundle 尝试查找这个bundle的owner                        pulsar.getExecutor().execute(() -> {                            searchForCandidateBroker(bundle, future, options);                        });                    }                } else if (nsData.get().isDisabled()) {                    // namespce 正在unload                    future.completeExceptionally(                            new IllegalStateException(String.format("Namespace bundle %s is being unloaded", bundle)));                } else {                    // 到这里是找到了的逻辑,直接拼接正常的response就行了                    ...                     // find the target                    future.complete(Optional.of(new LookupResult(nsData.get())));                }            }).exceptionally(exception -> {                ...             });            // 这里实际上是使用这个targetMap来做一个锁的结构避免多次加载。              //  https://github.com/apache/pulsar/pull/1527            future.whenComplete((r, t) -> pulsar.getExecutor().execute(                () -> targetMap.remove(bundle)            ));            return future;        });    }

这样如果cache中存在这个topic的owner信息,就可以直接返回。

到此,相信大家对"TopicLookup请求处理方法是什么"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0