基于akka怎样实现RPC
发表于:2025-02-13 作者:千家信息网编辑
千家信息网最后更新 2025年02月13日,这期内容当中小编将会给大家带来有关基于akka怎样实现RPC,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。目前的工作在基于akka实现数据服务总线,Akka 2.3
千家信息网最后更新 2025年02月13日基于akka怎样实现RPC
这期内容当中小编将会给大家带来有关基于akka怎样实现RPC,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
目前的工作在基于akka实现数据服务总线,Akka 2.3中提供了 Cluster Sharing(分片集群)和Persistence功能可以很简单的写出一个大型的分布式集群的架构。里面的一块功能就是RPC(远程过程调用),这篇文章将会介绍一种实现方式。akka rpc java目录[-]akka-rpc(基于akka的rpc的实现)RPC实现原理Server端核心代码Client端核心代码 Demoakka-rpc(基于akka的rpc的实现)代码:http://git.oschina.net/for-1988/Simples目前的工作在基于akka(java)实现数据服务总线,Akka 2.3中提供了 Cluster Sharing(分片集群)和Persistence功能可以很简单的写出一个大型的分布式集群的架构。里面的一块功能就是RPC(远程过程调用)。RPC远程过程调用(Remote Procedure Call,RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用,例:Java RMI。实现原理整个RPC的调用过程完全基于akka来传递对象,因为需要进行网络通信,所以我们的接口实现类、调用参数以及返回值都需要实现java序列化接口。客户端跟服务端其实都是在一个Akka 集群关系中,Client跟Server都是集群中的一个节点。首先Client需要初始化RpcClient对象,在初始化的过程中,我们启动了AkkaSystem,加入到整个集群中,并创建了负责与Server进行通信的Actor。然后通过RpcClient中的getBean(Classclz)方法获取Server端的接口实现类的实例对象,然后通过动态代理拦截这个对象的所有方法。最后,在执行方法的时候,在RpcBeanProxy中向Server发送CallMethod事件,执行远程实现类的方法,获取返回值给Client。Server端核心代码public class RpcServer extends UntypedActor { private Map proxyBeans; public RpcServer(Map , Object> beans) { proxyBeans = new HashMap (); for (Iterator > iterator = beans.keySet().iterator(); iterator .hasNext();) { Class> inface = iterator.next(); proxyBeans.put(inface.getName(), beans.get(inface)); } } @Override public void onReceive(Object message) throws Exception { if (message instanceof RpcEvent.CallBean) { //返回Server端的接口实现类的实例 CallBean event = (CallBean) message; ReturnBean bean = new ReturnBean( proxyBeans.get(event.getBeanName()), getSelf()); getSender().tell(bean, getSelf()); } else if (message instanceof RpcEvent.CallMethod) { CallMethod event = (CallMethod) message; Object bean = proxyBeans.get(event.getBeanName()); Object[] params = event.getParams(); List > paraTypes = new ArrayList >(); Class>[] paramerTypes = new Class>[] {}; if (params != null) { for (Object param : params) { paraTypes.add(param.getClass()); } } Method method = bean.getClass().getMethod(event.getMethodName(), paraTypes.toArray(paramerTypes)); Object o = method.invoke(bean, params); getSender().tell(o, getSelf()); } }}启动Serverpublic static void main(String[] args) { final Config config = ConfigFactory .parseString("akka.remote.netty.tcp.port=" + 2551) .withFallback( ConfigFactory .parseString("akka.cluster.roles = [RpcServer]")) .withFallback(ConfigFactory.load()); ActorSystem system = ActorSystem.create("EsbSystem", config); // Server 加入发布的服务 Map , Object> beans = new HashMap , Object>(); beans.put(ExampleInterface.class, new ExampleInterfaceImpl()); system.actorOf(Props.create(RpcServer.class, beans), "rpcServer"); }Client端核心代码 RpcClient类型集成了Thread,为了解决一个问题:因为AkkaSystem在加入集群中的时候是异步的,所以我们在第一次new RpcClient对象的时候需要等待加入集群成功以后,才可以执行下面的方法,不然获取的 /user/rpcServer Route中没有Server的Actor,请求会失败。public class RpcClient extends Thread { private ActorSystem system; private ActorRef rpc; private ActorRef clientServer; private static RpcClient instance = null; public RpcClient() { this.start(); final Config config = ConfigFactory .parseString("akka.remote.netty.tcp.port=" + 2552) .withFallback( ConfigFactory .parseString("akka.cluster.roles = [RpcClient]")) .withFallback(ConfigFactory.load()); system = ActorSystem.create("EsbSystem", config); int totalInstances = 100; Iterable routeesPaths = Arrays.asList("/user/rpcServer"); boolean allowLocalRoutees = false; ClusterRouterGroup clusterRouterGroup = new ClusterRouterGroup( new AdaptiveLoadBalancingGroup( HeapMetricsSelector.getInstance(), Collections. emptyList()), new ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, "RpcServer")); rpc = system.actorOf(clusterRouterGroup.props(), "rpcCall"); clientServer = system.actorOf(Props.create(RpcClientServer.class, rpc), "client"); Cluster.get(system).registerOnMemberUp(new Runnable() { //加入集群成功后的回调事件,恢复当前线程的中断 @Override public void run() { synchronized (instance) { System.out.println("notify"); instance.notify(); } } }); } public static RpcClient getInstance() { if (instance == null) { instance = new RpcClient(); synchronized (instance) { try { //中断当前线程,等待加入集群成功后,恢复 System.out.println("wait"); instance.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } return instance; } public T getBean(Class clz) { Future
上述就是小编为大家分享的基于akka怎样实现RPC了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。
集群
方法
对象
过程
代码
功能
接口
核心
服务
成功
就是
时候
端的
计算机
通信
事件
内容
分布式
动态
原理
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
企业的网络安全保护需要缴费吗
数据库编程定义输入参数有误
for循环做了查询数据库的操作
高学和软件开发
糖果传奇游戏软件开发商
纬创服务器出货量
某公司网络安全方案
网络安全 企业网
创胜网络软件开发有限公司
是网络安全生态的主体
晋江市新塘亿鑫网络技术服务部
计算机网络技术适合女子学吗
软件开发合同增值税率
sql查询个数据库
绝地逃生 服务器
莫北网络技术
网络安全csf核心功能
网络安全法的主要特征有哪些
开mc的服务器内存要放哪
软件开发合同用什么章
数据库查询用list
江苏常见软件开发代理价钱
数据库信息管理论文
本地服务器映射端口安全吗
数据库有几个表就有几个实体图吗
计算机网络技术就业职位
无锡品牌软件开发信息推荐
jquery如何对接数据库
数据库索引是在哪创建的
企业网站能用自己服务器吗