千家信息网

Hadoop 学习之RPC的使用与实现原理

发表于:2024-11-26 作者:千家信息网编辑
千家信息网最后更新 2024年11月26日,hadoop作为分布式的系统, 集群机器之间的通信是最基本,最常见的需求。这种需求本质上是IPC, 即进程间通信。 按照传统的UINX编程模型,进程间通信无非是如下的几种方式:管道, FIFO, 消息
千家信息网最后更新 2024年11月26日Hadoop 学习之RPC的使用与实现原理

hadoop作为分布式的系统, 集群机器之间的通信是最基本,最常见的需求。
这种需求本质上是IPC, 即进程间通信。 按照传统的UINX编程模型,进程间通信无非是如下的几种方式:
管道, FIFO, 消息队列, 信号量, 共享存储, 套接字。只有套接字是可以跨机器的网络通信, 能满足hadoop的需求。

通常情况下, 网络通信的程序使用显式网络编程(即直接使用java.net包)。比如Web浏览器, Web服务器等。
但也有另一部分程序使用隐式网络编程, 比如利用hadoop RPC这种封装了底层通信细节的工具包。

这样做使得底层的网络通信对于程序员透明。一则减轻了程序员的负担, 二则抽象了功能模块, 使得模块之间职责更清晰, 便于维护。

首先展示一个hadoop RPC功能demo, 了解hadoop RPC的用法。

step 1: 在pom.xml文件中添加依赖项

     org.apache.hadoop     hadoop-common     2.6.0 

step 2: 在src/main/resources中添加log4j.properties

log4j.rootLogger=DEBUG,consolelog4j.additivity.org.apache=true# (console)log4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.Threshold=INFOlog4j.appender.console.ImmediateFlush=truelog4j.appender.console.Target=System.errlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=[%-5p] %d(%r) --> [%t] %l: %m %x %n

step 3: 定义RPC协议

package hadooprpc.demo;import java.io.IOException;interface ClientProtocol extends org.apache.hadoop.ipc.VersionedProtocol {   // 版本号,默认情况下,不同版本号的 RPC Client 和 Server 之间不能相互通信   public static final long versionID = 1L;   String echo(String value) throws IOException;   int add(int v1, int v2) throws IOException;}

step 4: 实现RPC协议

package hadooprpc.demo;import java.io.IOException;import org.apache.hadoop.ipc.ProtocolSignature;public  class ClientProtocolImpl implements ClientProtocol {    // 重载的方法,用于获取自定义的协议版本号,   public long getProtocolVersion(String protocol, long clientVersion) {      return ClientProtocol.versionID;   }   // 重载的方法,用于获取协议签名   public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,   int hashcode) {      return new ProtocolSignature(ClientProtocol.versionID, null);   }   public String echo(String value) throws IOException {      return value;   }   public int add(int v1, int v2) throws IOException {      return v1 + v2;   }}

step 5; 构造并启动 RPC Server

package hadooprpc.demo;import java.io.IOException;import org.apache.hadoop.HadoopIllegalArgumentException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.ipc.RPC;import org.apache.hadoop.ipc.Server;public class RPCServer {   public static void main(String[] args) throws HadoopIllegalArgumentException, IOException {      Configuration conf = new Configuration();      Server server = new RPC.Builder(conf).setProtocol(ClientProtocol.class)            .setInstance(new ClientProtocolImpl()).setBindAddress("localhost").setPort(8097)            .setNumHandlers(5).build();            server.start();   }}

step 6: 构造 RPC Client 并发送 RPC 请求

package hadooprpc.demo;import java.io.IOException;import java.net.InetSocketAddress;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.ipc.RPC;public class RPCClient {   public static void main(String[] args) throws IOException {      ClientProtocol client = (ClientProtocol)RPC.getProxy(ClientProtocol.class, ClientProtocol.versionID,                                                           new InetSocketAddress(8097), new Configuration());      int result = client.add(5, 6);      System.out.println(result);      String echoResult = client.echo("result");      System.out.println(echoResult);      RPC.stopProxy(client); -- 关闭连接   }}

step 7: 启动RPC Server, 然后执行 RPC Client.

通过上面的例子可以发现, 通过这种编程方式,不用考虑网络层的细节,只需要编写接口和接口实现即可。
问渠那得清如许? 通过hadoop RPC的源码, 或许可以管中窥豹。

hadoop RPC的实现原理很简单。
Client:
1. 通过动态代理,获取到调用接口的方法,参数类型。
2. 将调用信息编码,发送到服务器
3. 获取服务器的返回值, 并解码。
4. 返回调用方法的返回值。
Server:
1. 启动服务器, 并监听客户端。
2. 获取客户端发送过来的调用方法, 参数。
3. 执行实现类中相关的方法。
4. 将返回值发送到客户端。

理解原理后, 自己动手实现了一个非常粗糙, 只具备演示功能的RPC框架。

package srpc;import java.io.*;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.net.ServerSocket;import java.net.Socket;import java.net.SocketException;import java.text.MessageFormat;import java.util.HashMap;import java.util.Map;/** * Created by shgy on 17-5-7. * 创建一个简单的PRC框架, 参数和返回值只支持基础类型 */// 接口interface MyProtocol{    String echo(String msg);    int add(int a,int b);}// 实现class MyProtocolImp implements MyProtocol{    @Override    public String echo(String msg) {        return msg;    }    @Override    public int add(int a,int b){        return a+b;    }}public class ImitateRPC {    private static final Map> PRIMITIVE_NAMES = new HashMap>();    static {        PRIMITIVE_NAMES.put("boolean", Boolean.TYPE);        PRIMITIVE_NAMES.put("byte", Byte.TYPE);        PRIMITIVE_NAMES.put("char", Character.TYPE);        PRIMITIVE_NAMES.put("short", Short.TYPE);        PRIMITIVE_NAMES.put("int", Integer.TYPE);        PRIMITIVE_NAMES.put("long", Long.TYPE);        PRIMITIVE_NAMES.put("float", Float.TYPE);        PRIMITIVE_NAMES.put("double", Double.TYPE);        PRIMITIVE_NAMES.put("void", Void.TYPE);    }    public static class Server{        private Class protocolClass;        private Object protocolImpl;        private ServerSocket server;        public  Server(Class protocolClass, Object protocolImpl) throws IOException {            if(protocolImpl == null){                throw new IllegalArgumentException("protocolImpl is not set");            }            if (protocolClass == null) {                throw new IllegalArgumentException("protocolClass is not set");            } else {                if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {                    throw new IOException("protocolClass "+ protocolClass +                            " is not implemented by protocolImpl which is of class " +                            protocolImpl.getClass());                }            }            this.protocolClass = protocolClass;            this.protocolImpl = protocolImpl;        }        public void start(){            System.out.println("start server");            try{                this.server = new ServerSocket(8189);                listen();            }catch(Exception e){                e.printStackTrace();            }        }        public void close(){            System.out.println("close server");            try {                this.server.close();            } catch (IOException e) {}        }        private void listen(){            new Thread(){                @Override                public void run(){                    while (!server.isClosed()){                        Socket incoming = null;                        try {                            incoming = server.accept();                            DataInputStream inStream = new DataInputStream(incoming.getInputStream());                            // 从客户端读取出 调用方法的信息                            int dataLen = inStream.readInt();                            byte[] data = new byte[dataLen];                            inStream.read(data,0, dataLen);                            DataInputStream contentStream = new DataInputStream(new ByteArrayInputStream(data));                            String methodName = contentStream.readUTF();                            int paramCount = contentStream.readInt();                            Class[] paramTypes = new Class[paramCount];                            Object[] args = new Object[paramCount];                            for(int i=0;i declaredClass = PRIMITIVE_NAMES.get(className);                                if(declaredClass == null){                                    declaredClass = String.class;                                }                                paramTypes[i] = declaredClass;                                if(declaredClass == String.class){                                    args[i] = contentStream.readUTF();                                }else if (declaredClass.isPrimitive()) {            // primitive types                                    if (declaredClass == Boolean.TYPE) {             // boolean                                        args[i] = Boolean.valueOf(contentStream.readBoolean());                                    } else if (declaredClass == Character.TYPE) {    // char                                        args[i] = Character.valueOf(contentStream.readChar());                                    } else if (declaredClass == Byte.TYPE) {         // byte                                        args[i] = Byte.valueOf(contentStream.readByte());                                    } else if (declaredClass == Short.TYPE) {        // short                                        args[i] = Short.valueOf(contentStream.readShort());                                    } else if (declaredClass == Integer.TYPE) {      // int                                        args[i] = Integer.valueOf(contentStream.readInt());                                    } else if (declaredClass == Long.TYPE) {         // long                                        args[i] = Long.valueOf(contentStream.readLong());                                    } else if (declaredClass == Float.TYPE) {        // float                                        args[i] = Float.valueOf(contentStream.readFloat());                                    } else if (declaredClass == Double.TYPE) {       // double                                        args[i] = Double.valueOf(contentStream.readDouble());                                    } else if (declaredClass == Void.TYPE) {         // void                                        args[i] = null;                                    } else {                                        throw new IllegalArgumentException("Not a primitive: "+declaredClass);                                    }                                }                            }                            Method method = protocolClass.getMethod(methodName, paramTypes);                            Object obj = method.invoke(protocolImpl, args);                            Class retType = method.getReturnType();                            ByteArrayOutputStream buffer = new ByteArrayOutputStream();                            DataOutput out = new DataOutputStream(buffer);                            out.writeUTF(retType.getCanonicalName());                            if(retType == String.class){                                out.writeUTF((String)obj);                            }else if (retType.isPrimitive()) {            // primitive types                                if (retType == Boolean.TYPE) {             // boolean                                    out.writeBoolean((Boolean)obj);                                } else if (retType == Character.TYPE) {    // char                                    out.writeChar((Character) obj);                                } else if (retType == Byte.TYPE) {         // byte                                    out.writeBoolean((Boolean)obj);                                } else if (retType == Short.TYPE) {        // short                                    out.writeShort((Short) obj);                                } else if (retType == Integer.TYPE) {      // int                                    out.writeInt((Integer) obj);                                } else if (retType == Long.TYPE) {         // long                                    out.writeLong((Long) obj);                                } else if (retType == Float.TYPE) {        // float                                    out.writeFloat((Float) obj);                                } else if (retType == Double.TYPE) {       // double                                    out.writeDouble((Double) obj);                                } else if (retType == Void.TYPE) {         // void                                } else {                                    throw new IllegalArgumentException("Not a primitive: "+retType);                                }                            }                            byte[] array = buffer.toByteArray();                            //将返回结果写回到客户端                            DataOutputStream outStream = new DataOutputStream(incoming.getOutputStream());                            outStream.writeInt(array.length);                            outStream.write(array);                            outStream.flush();                        }catch (SocketException e){                        }catch (Exception e){                            e.printStackTrace();                        }finally {                            try {                                if(incoming!=null){ incoming.close();  }                            }catch (Exception e){}                        }                    }                }            }.start();        }    }    public static  T getProxy(Class protocol){        T proxy = (T)  Proxy.newProxyInstance(protocol.getClassLoader(), new Class[]{protocol},                new InvocationHandler() {            @Override            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {                Class[] paramTypes = method.getParameterTypes();                ByteArrayOutputStream buffer = new ByteArrayOutputStream();                DataOutput out = new DataOutputStream(buffer);                out.writeUTF(method.getName());  //  写入方法名                out.writeInt(paramTypes.length); // 写入参数个数                // 写入参数                for (int i = 0; i < paramTypes.length; i++) {                    Class declaredClass = paramTypes[i];                    Object instance = args[i];                    String paramTypeName = declaredClass.getCanonicalName();                    out.writeUTF(paramTypeName);                    if (declaredClass == String.class) {                        out.writeUTF((String)instance);                    }else if (declaredClass.isPrimitive()) {     // primitive type                        if (declaredClass == Boolean.TYPE) {        // boolean                            out.writeBoolean(((Boolean) instance).booleanValue());                        } else if (declaredClass == Character.TYPE) { // char                            out.writeChar(((Character) instance).charValue());                        } else if (declaredClass == Byte.TYPE) {    // byte                            out.writeByte(((Byte) instance).byteValue());                        } else if (declaredClass == Short.TYPE) {   // short                            out.writeShort(((Short) instance).shortValue());                        } else if (declaredClass == Integer.TYPE) { // int                            out.writeInt(((Integer) instance).intValue());                        } else if (declaredClass == Long.TYPE) {    // long                            out.writeLong(((Long) instance).longValue());                        } else if (declaredClass == Float.TYPE) {   // float                            out.writeFloat(((Float) instance).floatValue());                        } else if (declaredClass == Double.TYPE) {  // double                            out.writeDouble(((Double) instance).doubleValue());                        } else if (declaredClass == Void.TYPE) {    // void                        } else {                            throw new IllegalArgumentException("Not a primitive: " + declaredClass);                        }                    }else{                        throw new IOException("Can't write: "+instance+" as "+declaredClass);                    }                }                // 发送到服务器端                byte[] array = buffer.toByteArray();                Socket client = new Socket("127.0.0.1", 8189);                try{                   DataOutputStream outStream = new DataOutputStream(client.getOutputStream());                    outStream.writeInt(array.length);                    outStream.write(array);                    outStream.flush();                    // 读取服务器的应答                    DataInputStream inStream = new DataInputStream(client.getInputStream());                    int bufSize = inStream.readInt();                    byte[] retArray = new byte[bufSize];                    inStream.read(retArray,0, bufSize);                    DataInputStream contentStream = new DataInputStream(new ByteArrayInputStream(retArray));                    String retType = contentStream.readUTF();                    Class retTypeClass = PRIMITIVE_NAMES.get(retType);                    if(retTypeClass == null){                        retTypeClass = String.class;                    }                    if(retTypeClass == String.class){                        return contentStream.readUTF();                    }else if (retTypeClass.isPrimitive()) {            // primitive types                        if (retTypeClass == Boolean.TYPE) {             // boolean                            return Boolean.valueOf(contentStream.readBoolean());                        } else if (retTypeClass == Character.TYPE) {    // char                            return Character.valueOf(contentStream.readChar());                        } else if (retTypeClass == Byte.TYPE) {         // byte                            return Byte.valueOf(contentStream.readByte());                        } else if (retTypeClass == Short.TYPE) {        // short                            return Short.valueOf(contentStream.readShort());                        } else if (retTypeClass == Integer.TYPE) {      // int                            return Integer.valueOf(contentStream.readInt());                        } else if (retTypeClass == Long.TYPE) {         // long                            return Long.valueOf(contentStream.readLong());                        } else if (retTypeClass == Float.TYPE) {        // float                            return Float.valueOf(contentStream.readFloat());                        } else if (retTypeClass == Double.TYPE) {       // double                            return Double.valueOf(contentStream.readDouble());                        } else if (retTypeClass == Void.TYPE) {         // void                            return null;                        } else {                            throw new IllegalArgumentException("Not a primitive: "+retTypeClass);                        }                    }                }finally {                    try {                        client.close();                    }catch (Exception e){}                }                return null;            }        });        return proxy;    }    public static void main(String[] args) throws IOException {        // start server        Server server = new Server(MyProtocol.class, new MyProtocolImp());        server.start();        // call rpc        MyProtocol client = ImitateRPC.getProxy(MyProtocol.class);        System.out.println(client.echo("hello world"));        System.out.println(client.add(1, 4));        server.close();    }}

上面的代码只有300多行, 而且在编码/解码部分有重叠的代码, 可以优化缩减, 功能也不完善。 但是仿照Hadoop RPC, 演示了RPC的原理。 对照着Hadoop RPC的源码, 自己编码实现, 对于源码中各个类的功能, 会有更清晰的理解。

方法 通信 服务器 网络 服务 功能 参数 客户 客户端 接口 程序 编程 原理 之间 源码 版本 编码 网络通信 需求 代码 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 华为的网络安全 方舟一进服务器就没声音了 简单恢复模式下数据库备份 方舟手游能开几个服务器 山东专业软件开发收费报价表 中国科技及互联网企业 雄安新区服务器 淘宝订单同步到本地数据库 张家港技术软件开发报价方案 如何加强信息共享网络安全防护 自己的服务器怎么搭建二维码 宜宾学院数据库试题 软件开发技术规范化 静安区个人软件开发管理制度 滁州初趣网络技术有限公司 视图的画法数据库 山谷网络安全有限公司 藏宝阁交易后原服务器 虹口区通用软件开发服务设计标准 自考计算机网络技术免考 泰安微信小程序软件开发哪家好 ssr服务器配置 服务器专利大全 怎样在手机上玩拔刀剑服务器 鸡西精益管理软件开发 烟台有软件开发公司吗 汽车ecu软件开发配置表 普陀区定制网络技术哪家好 dnf服务器连接失败 数据库中sp 是什么意思
0