千家信息网

基于akka怎样实现RPC

发表于:2025-02-13 作者:千家信息网编辑
千家信息网最后更新 2025年02月13日,这期内容当中小编将会给大家带来有关基于akka怎样实现RPC,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。目前的工作在基于akka实现数据服务总线,Akka 2.3
千家信息网最后更新 2025年02月13日基于akka怎样实现RPC

这期内容当中小编将会给大家带来有关基于akka怎样实现RPC,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

目前的工作在基于akka实现数据服务总线,Akka 2.3中提供了 Cluster Sharing(分片集群)和Persistence功能可以很简单的写出一个大型的分布式集群的架构。里面的一块功能就是RPC(远程过程调用),这篇文章将会介绍一种实现方式。akka rpc java目录[-]akka-rpc(基于akka的rpc的实现)RPC实现原理Server端核心代码Client端核心代码 Demoakka-rpc(基于akka的rpc的实现)代码:http://git.oschina.net/for-1988/Simples目前的工作在基于akka(java)实现数据服务总线,Akka 2.3中提供了 Cluster Sharing(分片集群)和Persistence功能可以很简单的写出一个大型的分布式集群的架构。里面的一块功能就是RPC(远程过程调用)。RPC远程过程调用(Remote Procedure Call,RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用,例:Java RMI。实现原理整个RPC的调用过程完全基于akka来传递对象,因为需要进行网络通信,所以我们的接口实现类、调用参数以及返回值都需要实现java序列化接口。客户端跟服务端其实都是在一个Akka 集群关系中,Client跟Server都是集群中的一个节点。首先Client需要初始化RpcClient对象,在初始化的过程中,我们启动了AkkaSystem,加入到整个集群中,并创建了负责与Server进行通信的Actor。然后通过RpcClient中的getBean(Class clz)方法获取Server端的接口实现类的实例对象,然后通过动态代理拦截这个对象的所有方法。最后,在执行方法的时候,在RpcBeanProxy中向Server发送CallMethod事件,执行远程实现类的方法,获取返回值给Client。Server端核心代码public class RpcServer extends UntypedActor {         private Map proxyBeans;    public RpcServer(Map, Object> beans) {        proxyBeans = new HashMap();        for (Iterator> iterator = beans.keySet().iterator(); iterator                .hasNext();) {            Class inface = iterator.next();            proxyBeans.put(inface.getName(), beans.get(inface));        }    }    @Override    public void onReceive(Object message) throws Exception {        if (message instanceof RpcEvent.CallBean) {   //返回Server端的接口实现类的实例            CallBean event = (CallBean) message;            ReturnBean bean = new ReturnBean(                    proxyBeans.get(event.getBeanName()), getSelf());            getSender().tell(bean, getSelf());        } else if (message instanceof RpcEvent.CallMethod) {            CallMethod event = (CallMethod) message;            Object bean = proxyBeans.get(event.getBeanName());            Object[] params = event.getParams();            List> paraTypes = new ArrayList>();            Class[] paramerTypes = new Class[] {};            if (params != null) {                for (Object param : params) {                    paraTypes.add(param.getClass());                }            }            Method method = bean.getClass().getMethod(event.getMethodName(),                    paraTypes.toArray(paramerTypes));            Object o = method.invoke(bean, params);            getSender().tell(o, getSelf());        }    }}启动Serverpublic static void main(String[] args) {        final Config config = ConfigFactory                .parseString("akka.remote.netty.tcp.port=" + 2551)                .withFallback(                        ConfigFactory                                .parseString("akka.cluster.roles = [RpcServer]"))                .withFallback(ConfigFactory.load());        ActorSystem system = ActorSystem.create("EsbSystem", config);                // Server 加入发布的服务        Map, Object> beans = new HashMap, Object>();        beans.put(ExampleInterface.class, new ExampleInterfaceImpl());        system.actorOf(Props.create(RpcServer.class, beans), "rpcServer");    }Client端核心代码 RpcClient类型集成了Thread,为了解决一个问题:因为AkkaSystem在加入集群中的时候是异步的,所以我们在第一次new RpcClient对象的时候需要等待加入集群成功以后,才可以执行下面的方法,不然获取的 /user/rpcServer Route中没有Server的Actor,请求会失败。public class RpcClient extends Thread {    private ActorSystem system;    private ActorRef rpc;    private ActorRef clientServer;    private static RpcClient instance = null;    public RpcClient() {        this.start();        final Config config = ConfigFactory                .parseString("akka.remote.netty.tcp.port=" + 2552)                .withFallback(                        ConfigFactory                                .parseString("akka.cluster.roles = [RpcClient]"))                .withFallback(ConfigFactory.load());        system = ActorSystem.create("EsbSystem", config);        int totalInstances = 100;        Iterable routeesPaths = Arrays.asList("/user/rpcServer");        boolean allowLocalRoutees = false;        ClusterRouterGroup clusterRouterGroup = new ClusterRouterGroup(                new AdaptiveLoadBalancingGroup(                        HeapMetricsSelector.getInstance(),                        Collections. emptyList()),                new ClusterRouterGroupSettings(totalInstances, routeesPaths,                        allowLocalRoutees, "RpcServer"));        rpc = system.actorOf(clusterRouterGroup.props(), "rpcCall");        clientServer = system.actorOf(Props.create(RpcClientServer.class, rpc),                "client");        Cluster.get(system).registerOnMemberUp(new Runnable() {  //加入集群成功后的回调事件,恢复当前线程的中断            @Override            public void run() {                synchronized (instance) {                    System.out.println("notify");                    instance.notify();                }            }        });    }    public static RpcClient getInstance() {        if (instance == null) {            instance = new RpcClient();            synchronized (instance) {                try {   //中断当前线程,等待加入集群成功后,恢复                    System.out.println("wait");                    instance.wait();                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }        return instance;    }    public  T getBean(Class clz) {        Future future = Patterns.ask(clientServer,                new RpcEvent.CallBean(clz.getName(), clientServer),                new Timeout(Duration.create(5, TimeUnit.SECONDS)));        try {            Object o = Await.result(future,                    Duration.create(5, TimeUnit.SECONDS));            if (o != null) {                ReturnBean returnBean = (ReturnBean) o;                return (T) new RpcBeanProxy().proxy(returnBean.getObj(),                        clientServer, clz);            }        } catch (Exception e) {            e.printStackTrace();        }        return null;    }}RpcClientServerpublic class RpcClientServer extends UntypedActor {    private ActorRef rpc;    public RpcClientServer(ActorRef rpc) {        this.rpc = rpc;    }    @Override    public void onReceive(Object message) throws Exception {        if (message instanceof RpcEvent.CallBean) {  //向Server发送CallBean请求            CallBean event = (CallBean) message;            Future future = Patterns.ask(rpc, event, new Timeout(                    Duration.create(5, TimeUnit.SECONDS)));            Object o = Await.result(future,                    Duration.create(5, TimeUnit.SECONDS));            getSender().tell(o, getSelf());        } else if (message instanceof RpcEvent.CallMethod) {  //向Server发送方法调用请求            Future future = Patterns.ask(rpc, message, new Timeout(                    Duration.create(5, TimeUnit.SECONDS)));            Object o = Await.result(future,                    Duration.create(5, TimeUnit.SECONDS));            getSender().tell(o, getSelf());        }    }}RpcBeanProxy,客户端的动态代理类public class RpcBeanProxy implements InvocationHandler {    private ActorRef rpcClientServer;    private Class clz;    public Object proxy(Object target, ActorRef rpcClientServer, Class clz) {        this.rpcClientServer = rpcClientServer;        this.clz = clz;        return Proxy.newProxyInstance(target.getClass().getClassLoader(),                target.getClass().getInterfaces(), this);    }    @Override    public Object invoke(Object proxy, Method method, Object[] args)            throws Throwable {        Object result = null;        RpcEvent.CallMethod callMethod = new RpcEvent.CallMethod(                method.getName(), args, clz.getName());        Future future = Patterns.ask(rpcClientServer, callMethod,                new Timeout(Duration.create(5, TimeUnit.SECONDS)));        Object o = Await.result(future, Duration.create(5, TimeUnit.SECONDS));        result = o;        return result;    }}DemoInterface,Client和Server都需要这个类,必须实现序列化public interface ExampleInterface extends Serializable{    public String sayHello(String name);}实现类,只需要Server端存在这个类。public class ExampleInterfaceImpl implements ExampleInterface {    @Override    public String sayHello(String name) {        System.out.println("Be Called !");        return "Hello " + name;    }}Client调用public static void main(String[] args) {        RpcClient client = RpcClient.getInstance();        long start = System.currentTimeMillis();                ExampleInterface example = client.getBean(ExampleInterface.class);        System.out.println(example.sayHello("rpc"));                long time = System.currentTimeMillis() - start;        System.out.println("time :" + time);    } 这里第一次调用耗时比较长需要46毫秒,akka会对消息进行优化,调用多次以后时间为 1~2毫秒。

上述就是小编为大家分享的基于akka怎样实现RPC了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。

0