TopicLookup请求处理方法是什么
本篇内容主要讲解"TopicLookup请求处理方法是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"TopicLookup请求处理方法是什么"吧!
简单逻辑说明
通过topic名字确定namespace
查找这个namespace的bundle分配信息
根据bundle分配信息来确认这个topic属于哪个bundle
根据bundle信息来确认哪个broker负责这个bundle,返回broker的地址。
CommandLookup
主要用来查找Topic在被哪个broker负责。
一般客户端可以通过http协议或者二进制协议来查询。
message CommandLookupTopic { // topic 名字 required string topic = 1; // 网络层请求id required uint64 request_id = 2; optional bool authoritative = 3 [default = false]; // TODO - Remove original_principal, original_auth_data, original_auth_method // Original principal that was verified by // a Pulsar proxy. optional string original_principal = 4; // Original auth role and auth Method that was passed // to the proxy. optional string original_auth_data = 5; optional string original_auth_method = 6; // 从哪个指定的连接点进行连接 optional string advertised_listener_name = 7;}
这里直接看服务端的代码ServerCnx
protected void handleLookup(CommandLookupTopic lookup) { final long requestId = lookup.getRequestId(); final boolean authoritative = lookup.isAuthoritative(); final String advertisedListenerName = lookup.hasAdvertisedListenerName() ? lookup.getAdvertisedListenerName() : null; // 校验topic名字 TopicName topicName = validateTopicName(lookup.getTopic(), requestId, lookup); if (topicName == null) { return; } // 这里的Semaphore 是服务端Lookup请求的限流器 final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); if (lookupSemaphore.tryAcquire()) { .... isTopicOperationAllowed(topicName, TopicOperation.LOOKUP) .thenApply(isAuthorized -> { // 通过鉴权 if (isAuthorized) { lookupTopicAsync(getBrokerService().pulsar(), topicName, authoritative, getPrincipal(), getAuthenticationData(), requestId, advertisedListenerName) .handle((lookupResponse, ex) -> { if (ex == null) { ctx.writeAndFlush(lookupResponse); } else { .... } lookupSemaphore.release(); return null; }); } else { .... }).exceptionally(ex -> { .... }); } else { // 如果有异常是发送的`CommandLookupTopicResponse` // 这里已经是新的定义二进制消息的方式了 // / Wire format // [TOTAL_SIZE] [CMD_SIZE][CMD]ctx.writeAndFlush(newLookupErrorResponse(ServerError.TooManyRequests, "Failed due to too many pending lookup requests", requestId)); } }
TopicLookupBase.lookupTopicAsync
org.apache.pulsar.broker.lookup.TopicLookupBase#lookupTopicAsync
这个是一个静态方法
主要
validation 校验集群,topic名字等(这里面有跨集群检查的逻辑,先略过)
lookup逻辑
这里校验的逻辑先略过了,实际核心的逻辑在下面这2行上。
LookupOptions options = LookupOptions.builder() .authoritative(authoritative) .advertisedListenerName(advertisedListenerName) .loadTopicsInBundle(true) // 这里这个条件是true .build(); pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, options)
这里面的主要逻辑在NamespaceService
里面,PulsarService
可以认为是一个全局对象,pulsar需要的任何核心逻辑对象
(比如说NamspaceService
,BrokerService
,ConfigurationCacheService
等)你都可以从这个对象里面拿到。
NamespaceService.getBrokerServiceUrlAsync
这里面的主要逻辑是
根据传递过来的topic名字定位namespace
之后确认这个topic属于哪个NamespaceBundle。
之后根据这个NamespaceBundle 来找到这个bundle 的owner broker的地址。
public CompletableFuture> getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) { ....CompletableFuture > future = getBundleAsync(topic) .thenCompose(bundle -> findBrokerServiceUrl(bundle, options)); ....}public CompletableFuture getBundleAsync(TopicName topic) { return bundleFactory.getBundlesAsync(topic.getNamespaceObject()) .thenApply(bundles -> bundles.findBundle(topic));}
这里面的bundleFactory实际上是一个异步加载的cache。
我们看一下定义
// org.apache.pulsar.common.naming.NamespaceBundleFactoryprivate final AsyncLoadingCachebundlesCache;// 构造函数里面public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) { // .....this.bundlesCache = Caffeine.newBuilder() .recordStats() // 记录metric .buildAsync(// 加载cache 的逻辑(NamespaceName namespace, Executor executor) -> { String path = AdminResource.joinPath(LOCAL_POLICIES_ROOT, namespace.toString()); .... CompletableFuture future = new CompletableFuture<>(); // Read the static bundle data from the policies pulsar .getLocalZkCacheService() // 获取LocalZooKeeperCacheService .policiesCache() .getWithStatAsync(path) .thenAccept(result -> { // 这里实际是去找有没有单独为这个namespace配置bundle数量 BundlesData bundlesData = result.map(Entry::getKey).map(p -> p.bundles).orElse(null); // 通过namespace拿到namespaceBundle NamespaceBundles namespaceBundles = getBundles( namespace, bundlesData, result.map(Entry::getValue).map(s -> s.getVersion()).orElse(-1)); .... future.complete(namespaceBundles); }).exceptionally(ex -> { future.completeExceptionally(ex); return null; }); return future; }); // .....}
这里简单说一下NamespaceBundles 这个类,这个类会保存这个Namespace的所有NamespaceBundle,提供一个聚合的视图。
这个类表示一个hash环,这个环按照配置的分片个数,会被分成几个片段,
每个broker会按照一定算法来确定这个环上的哪一部分属于他自己。
topic也会按照一定的算法分配到这个hash环上。
这样broker就能确定自己负责哪些topic。
就可以返回lookup请求了,这个流程也会触发topic的加载流程。
NamespaceBundles.findBundle
这个函数就是确定这个topic属于哪个NamespaceBundle
// 映射topic到hash环上的一段, 这一段就被NamespaceBundle 标识public NamespaceBundle findBundle(TopicName topicName) { checkArgument(this.nsname.equals(topicName.getNamespaceObject())); long hashCode = factory.getLongHashCode(topicName.toString()); NamespaceBundle bundle = getBundle(hashCode); if (topicName.getDomain().equals(TopicDomain.non_persistent)) { bundle.setHasNonPersistentTopic(true); } return bundle; }
到这一步我们就能确定这个namespace的信息了,namespce被分为多少个bundle。
而且可以确定这个topic属于哪个namespacebundle。
下一步是根据namespaceBundle查找负责的broker。
NamespaceService.findBrokerServiceUrl
到这里是根据namespacebundle 确定broker
// 这个记录的是一个broker的元数据信息public class NamespaceEphemeralData { private String nativeUrl; private String nativeUrlTls; private String httpUrl; private String httpUrlTls; private boolean disabled; private MapadvertisedListeners;}private CompletableFuture > findBrokerServiceUrl( NamespaceBundle bundle, LookupOptions options) { ConcurrentOpenHashMap >> targetMap; return targetMap.computeIfAbsent(bundle, (k) -> { CompletableFuture > future = new CompletableFuture<>(); // First check if we or someone else already owns the bundle ownershipCache.getOwnerAsync(bundle) .thenAccept(nsData -> { // nsData : Optional if (!nsData.isPresent()) { // 如果没找到这个信息 if (options.isReadOnly()) { // Do not attempt to acquire ownership future.complete(Optional.empty()); } else { // 目前还没有人负责这个bundle 尝试查找这个bundle的owner pulsar.getExecutor().execute(() -> { searchForCandidateBroker(bundle, future, options); }); } } else if (nsData.get().isDisabled()) { // namespce 正在unload future.completeExceptionally( new IllegalStateException(String.format("Namespace bundle %s is being unloaded", bundle))); } else { // 到这里是找到了的逻辑,直接拼接正常的response就行了 ... // find the target future.complete(Optional.of(new LookupResult(nsData.get()))); } }).exceptionally(exception -> { ... }); // 这里实际上是使用这个targetMap来做一个锁的结构避免多次加载。 // https://github.com/apache/pulsar/pull/1527 future.whenComplete((r, t) -> pulsar.getExecutor().execute( () -> targetMap.remove(bundle) )); return future; }); }
这样如果cache中存在这个topic的owner信息,就可以直接返回。
到此,相信大家对"TopicLookup请求处理方法是什么"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!