千家信息网

Flink的rpc组件有哪些

发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,本篇内容介绍了"Flink的rpc组件有哪些"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!Flink
千家信息网最后更新 2025年02月05日Flink的rpc组件有哪些

本篇内容介绍了"Flink的rpc组件有哪些"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

Flink采用akka来实现rpc服务。其中有这几个重要组件:RpcServer、RpcService、AkkaRpcActor、RpcEndpoint。

这几个组件作用如下:

(1)RpcEndpoint

提供具体rpc服务。主要实现有 ResourceManagerTaskExecutor,

①YarnResourceManager为AM容器中启动的服务,持有ResourceManager和NodeManager的客户端

②TaskExecutor为NM容器中启动taskmanager的类

(2)AkkaRpcService

提供rpc的服务类。该类内部持有ActorSystem实例和Supervisor实例。Supervisor中含有SupervisorActor实例,SupervisorActor用于创建其他Actor,可以理解为根Actor。RpcEndpoint在构造时,通过AkkaRpcService的startServer()方法,获取RpcServer实例。

      public  RpcServer startServer(C rpcEndpoint) {                checkNotNull(rpcEndpoint, "rpc endpoint");                final SupervisorActor.ActorRegistration actorRegistration = registerAkkaRpcActor(rpcEndpoint);                final ActorRef actorRef = actorRegistration.getActorRef();                final CompletableFuture actorTerminationFuture = actorRegistration.getTerminationFuture();                LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());                final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);                final String hostname;                Option host = actorRef.path().address().host();                if (host.isEmpty()) {                        hostname = "localhost";                } else {                        hostname = host.get();                }                Set> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));                implementedRpcGateways.add(RpcServer.class);                implementedRpcGateways.add(AkkaBasedEndpoint.class);                final InvocationHandler akkaInvocationHandler;                if (rpcEndpoint instanceof FencedRpcEndpoint) {                        // a FencedRpcEndpoint needs a FencedAkkaInvocationHandler                        akkaInvocationHandler = new FencedAkkaInvocationHandler<>(                                akkaAddress,                                hostname,                                actorRef,                                configuration.getTimeout(),                                configuration.getMaximumFramesize(),                                actorTerminationFuture,                                ((FencedRpcEndpoint) rpcEndpoint)::getFencingToken,                                captureAskCallstacks);                        implementedRpcGateways.add(FencedMainThreadExecutable.class);                } else {                        akkaInvocationHandler = new AkkaInvocationHandler(                                akkaAddress,                                hostname,                                actorRef,                                configuration.getTimeout(),                                configuration.getMaximumFramesize(),                                actorTerminationFuture,                                captureAskCallstacks);                }                // Rather than using the System ClassLoader directly, we derive the ClassLoader                // from this class . That works better in cases where Flink runs embedded and all Flink                // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader                ClassLoader classLoader = getClass().getClassLoader();                @SuppressWarnings("unchecked")                RpcServer server = (RpcServer) Proxy.newProxyInstance(                        classLoader,                        implementedRpcGateways.toArray(new Class[implementedRpcGateways.size()]),                        akkaInvocationHandler);                return server;        }

先创建RpcEndpoint对应的ActorRef,然后创建RpcServer的代理类AkkaInvocationHandler或FencedAkkaInvocationHandler,并将ActorRef实例赋给其成员属性 rpcEndpoint:ActorRef。这里的ActorRef即为AkkaRpcActor或FencedAkkaRpcActor实例

(3)RpcServer

用来启动rpc服务,通常不直接调用,而是调用其动态代理类AkkaInvocationHandler或FencedAkkaInvocationHandler的start()方法

(4)AkkaInvocationHandler或FencedAkkaInvocationHandler

RpcServer的动态代理类。start()方法用来启动服务:

      public void start() {                rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());        }

这里向rpcEndpoint,即AkkaRpcActor或FencedAkkaRpcActor实例发送一条ControlMessages.START消息

(5)AkkaRpcActor

响应rpc消息的actor。其createReceive():

     public Receive createReceive() {                return ReceiveBuilder.create()                        .match(RemoteHandshakeMessage.class, this::handleHandshakeMessage)                        .match(ControlMessages.class, this::handleControlMessage)                        .matchAny(this::handleMessage)                        .build();        }

当消息为ControlMessages.START,调用StoppedState 的start()方法

          public State start(AkkaRpcActor akkaRpcActor) {                        akkaRpcActor.mainThreadValidator.enterMainThread();                        try {                                akkaRpcActor.rpcEndpoint.internalCallOnStart();                        } catch (Throwable throwable) {                                akkaRpcActor.stop(                                        RpcEndpointTerminationResult.failure(                                                new AkkaRpcException(                                                        String.format("Could not start RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId()),                                                        throwable)));                        } finally {                                akkaRpcActor.mainThreadValidator.exitMainThread();                        }                        return StartedState.STARTED;                }

在start()方法中调用具体提供服务的RpcEndpoint实现类internalCallOnStart()方法来启动服务。internalCallOnStart()方法中会调用onStart()方法。

"Flink的rpc组件有哪些"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0