千家信息网

Storm中Thrift如何使用

发表于:2025-01-25 作者:千家信息网编辑
千家信息网最后更新 2025年01月25日,这期内容当中小编将会给大家带来有关Storm中Thrift如何使用,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。1 IDL首先是storm.thrift, 作为ID
千家信息网最后更新 2025年01月25日Storm中Thrift如何使用

这期内容当中小编将会给大家带来有关Storm中Thrift如何使用,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

1 IDL

首先是storm.thrift, 作为IDL里面定义了用到的数据结构和service
然后backtype.storm.generated, 存放从IDL通过Thrift自动转化成的Java代码

比如对于nimbus service
在IDL的定义为,

service Nimbus {

void submitTopology(1: string name, 2: string uploadedJarLocation, 3: stringjsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2:InvalidTopologyException ite);

void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3:string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyExceptionite);

void killTopology(1: string name) throws (1: NotAliveException e);

void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e);

void activate(1: string name) throws (1: NotAliveException e);

void deactivate(1: string name) throws (1: NotAliveException e);

void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyExceptionite);

// need to add functions for asking aboutstatus of storms, what nodes they're running on, looking at task logs

string beginFileUpload();

void uploadChunk(1: string location, 2: binary chunk);

void finishFileUpload(1: string location);

string beginFileDownload(1: string file);

//can stop downloading chunks when receive0-length byte array back

binary downloadChunk(1: string id);

// returns json

string getNimbusConf();

// stats functions

ClusterSummary getClusterInfo();

TopologyInfo getTopologyInfo(1: string id) throws (1:NotAliveException e);

//returns json

string getTopologyConf(1: string id) throws (1:NotAliveException e);

StormTopologygetTopology(1: string id) throws (1: NotAliveException e);

StormTopology getUserTopology(1: string id) throws (1:NotAliveException e);

}

而对应在Nimbus.java的Java代码如下,

public class Nimbus {

public interface Iface {

public void submitTopology(String name, StringuploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException,org.apache.thrift7.TException;

public void submitTopologyWithOpts(String name, StringuploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptionsoptions) throws AlreadyAliveException,InvalidTopologyException, org.apache.thrift7.TException;

public void killTopology(String name) throws NotAliveException, org.apache.thrift7.TException;

public void killTopologyWithOpts(String name,KillOptions options) throws NotAliveException,org.apache.thrift7.TException;

public void activate(String name) throws NotAliveException, org.apache.thrift7.TException;

public void deactivate(String name) throws NotAliveException, org.apache.thrift7.TException;

public void rebalance(String name, RebalanceOptionsoptions) throws NotAliveException, InvalidTopologyException,org.apache.thrift7.TException;

public String beginFileUpload() throwsorg.apache.thrift7.TException;

public void uploadChunk(String location, ByteBufferchunk) throws org.apache.thrift7.TException;

public void finishFileUpload(String location) throws org.apache.thrift7.TException;

public String beginFileDownload(String file) throws org.apache.thrift7.TException;

public ByteBuffer downloadChunk(String id) throws org.apache.thrift7.TException;

public String getNimbusConf() throwsorg.apache.thrift7.TException;

public ClusterSummary getClusterInfo() throwsorg.apache.thrift7.TException;

public TopologyInfo getTopologyInfo(String id) throws NotAliveException, org.apache.thrift7.TException;

public String getTopologyConf(String id) throws NotAliveException, org.apache.thrift7.TException;

public StormTopology getTopology(String id) throws NotAliveException, org.apache.thrift7.TException;

public StormTopology getUserTopology(String id) throws NotAliveException, org.apache.thrift7.TException;

}

2 Client

1. 首先Get Client,

NimbusClient client =NimbusClient.getConfiguredClient(conf);

看看backtype.storm.utils下面的client.getConfiguredClient的逻辑,
只是从配置中取出nimbus的host:port, 并new NimbusClient

public static NimbusClient getConfiguredClient(Map conf) {

try {

String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);

int nimbusPort =Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));

return new NimbusClient(conf, nimbusHost, nimbusPort);

} catch (TTransportException ex) {

throw new RuntimeException(ex);

}

}

NimbusClient 继承自ThriftClient, public class NimbusClient extends ThriftClient
ThriftClient又做了什么? 关键是怎么进行数据序列化和怎么将数据传输到remote
这里看出Thrift对Transport和Protocol的封装
对于Transport, 其实就是对Socket的封装, 使用TSocket(host, port)
然后对于protocol, 默认使用TBinaryProtocol, 如果你不指定的话

public ThriftClient(Map storm_conf, String host, int port, Integer timeout) throws TTransportException {

try {

//locate loginconfiguration

Configuration login_conf = AuthUtils.GetConfiguration(storm_conf);

//construct atransport plugin

ITransportPlugin transportPlugin= AuthUtils.GetTransportPlugin(storm_conf, login_conf);

//create a socketwith server

if(host==null) {

throw new IllegalArgumentException("host is not set");

}

if(port<=0) {

throw new IllegalArgumentException("invalid port: "+port);

}

TSocket socket = new TSocket(host, port);

if(timeout!=null) {

socket.setTimeout(timeout);

}

final TTransport underlyingTransport = socket;

//establishclient-server transport via plugin

_transport = transportPlugin.connect(underlyingTransport, host);

} catch (IOException ex) {

throw new RuntimeException(ex);

}

_protocol = null;

if (_transport != null)

_protocol = new TBinaryProtocol(_transport);

}

2. 调用任意RPC
那么就看看submitTopologyWithOpts

client.getClient().submitTopologyWithOpts(name,submittedJar, serConf, topology, opts);

可以看出上面的Nimbus的interface里面有这个方法的定义, 而且Thrift不仅仅自动产生java interface, 而且还提供整个RPC client端的实现

public void submitTopologyWithOpts(String name, StringuploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptionsoptions) throws AlreadyAliveException,InvalidTopologyException, org.apache.thrift7.TException

{

send_submitTopologyWithOpts(name, uploadedJarLocation, jsonConf,topology, options);

recv_submitTopologyWithOpts();

}

分两步,
首先send_submitTopologyWithOpts, 调用sendBase
接着, recv_submitTopologyWithOpts, 调用receiveBase

protected void sendBase(String methodName, TBase args) throws TException {

oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL,++seqid_));

args.write(oprot_);

oprot_.writeMessageEnd();

oprot_.getTransport().flush();

}

protected void receiveBase(TBase result, String methodName)throws TException {

TMessage msg = iprot_.readMessageBegin();

if (msg.type == TMessageType.EXCEPTION) {

TApplicationException x = TApplicationException.read(iprot_);

iprot_.readMessageEnd();

throw x;

}

if (msg.seqid != seqid_) {

throw newTApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName +" failed: out ofsequence response");

}

result.read(iprot_);

iprot_.readMessageEnd();

}

可以看出Thrift对protocol的封装, 不需要自己处理序列化, 调用protocol的接口搞定

3 Server

Thrift强大的地方是, 实现了整个协议栈而不光只是IDL的转化, 对于server也给出多种实现
下面看看在nimbus server端, 是用clojure来写的
可见其中使用Thrift封装的NonblockingServerSocket, THsHaServer,TBinaryProtocol, Proccessor, 非常简单
其中processor会使用service-handle来处理recv到的数据, 所以作为使用者只需要在service-handle中实现Nimbus$Iface, 其他和server相关的, Thrift都已经帮你封装好了, 这里使用的IDL也在backtype.storm.generated, 因为clojure基于JVM所以IDL只需要转化成Java即可.

(defn launch-server! [conf nimbus]

(validate-distributed-mode! conf)

(let[service-handler (service-handler conf nimbus)

options (-> (TNonblockingServerSocket. (int (confNIMBUS-THRIFT-PORT)))

(THsHaServer$Args.)

(.workerThreads 64)

(.protocolFactory (TBinaryProtocol$Factory.))

(.processor(Nimbus$Processor. service-handler))

)

(.addShutdownHook(Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stopserver))))

(log-message "StartingNimbus server...")

(.serve server)))

上述就是小编为大家分享的Storm中Thrift如何使用了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。

封装 数据 代码 内容 就是 序列 面的 分析 处理 强大 不仅仅 专业 中小 使用者 关键 内容丰富 只是 地方 多种 接口 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 李沧区游戏软件开发解决方案 直接访问数据库的连接端口 数据库有哪几类数据模型 高德地图开启服务器提醒会怎么样 网络安全技术及应用第四版实践题 我的世界怎么当服务器 农村网络安全教育论文 win git服务器 东北地区网络安全法答题活动 数据库搜索引擎相关知识 百家号互联网和科技 收费站网络安全设备有哪些 行政编码数据库 青岛服务器代理 服务器断开101 购买服务器搭建免流 色弱可以报网络技术吗 中国移动宽带游戏服务器连接不上 VB软件开发单机版考试系统 美国办公室网络安全系统 网络安全法是第一部全面规范 网络安全小常识文字版 江苏个人软件开发价钱 网络安全事件的防治措施 色弱可以报网络技术吗 怎么查询石器时代服务器数据库 网络安全威胁情报分类 软件开发明年计划 哪个数据库提供了美国四大 vs自带数据库的使用
0