千家信息网

Netty、MINA、Twisted中如何定制自己的协议

发表于:2024-10-19 作者:千家信息网编辑
千家信息网最后更新 2024年10月19日,这篇文章给大家分享的是有关Netty、MINA、Twisted中如何定制自己的协议的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。之前介绍了一些消息分割的方案,以及 MINA
千家信息网最后更新 2024年10月19日Netty、MINA、Twisted中如何定制自己的协议

这篇文章给大家分享的是有关Netty、MINA、Twisted中如何定制自己的协议的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

之前介绍了一些消息分割的方案,以及 MINA、Netty、Twisted 针对这些方案提供的相关API。例如MINA的TextLineCodecFactory、PrefixedStringCodecFactory,Netty的LineBasedFrameDecoder、LengthFieldBasedFrameDecoder,Twisted的LineOnlyReceiver、Int32StringReceiver。

除了这些方案,还有很多其他方案,当然也可以自己定义。在这里,我们定制一个自己的方案,并分别使用MINA、Netty、Twisted实现对这种消息的解析和组装,也就是编码和解码。

之前介绍了一种用固定字节数的Header来指定Body字节数的消息分割方案,其中Header部分是常规的大字节序(Big-Endian)的4字节整数。本文中对这个方案稍作修改,将固定字节数的Header改为小字节序(Little-Endian)的4字节整数。

常规的大字节序表示一个数的话,用高字节位的存放数字的低位,比较符合人的习惯。而小字节序和大字节序正好相反,用高字节位存放数字的高位。

Python中struct模块支持大小字节序的pack和unpack,在Java中可以用下面的两个方法实现小字节序字节数组转int和int转小字节序字节数组,下面的Java程序中将会用到这两个方法:

public class LittleEndian {

/**
* 将int转成4字节的小字节序字节数组
*/
public static byte[] toLittleEndian(int i) {
byte[] bytes = new byte[4];
bytes[0] = (byte) i;
bytes[1] = (byte) (i >>> 8);
bytes[2] = (byte) (i >>> 16);
bytes[3] = (byte) (i >>> 24);
return bytes;
}

/**
* 将小字节序的4字节的字节数组转成int
*/
public static int getLittleEndianInt(byte[] bytes) {
int b0 = bytes[0] & 0xFF;
int b1 = bytes[1] & 0xFF;
int b2 = bytes[2] & 0xFF;
int b3 = bytes[3] & 0xFF;
return b0 + (b1 << 8) + (b2 << 16) + (b3 << 24);
}
}

无论是MINA、Netty还是Twisted,消息的编码、解码、切合的代码,都是应该和业务逻辑代码分开,这样有利于代码的开发、重用和维护。在MINA和Netty中类似,编码、解码需要继承实现相应的Encoder、Decoder,而在Twisted中则是继承Protocol实现编码解码。虽然实现方式不同,但是它们的功能都是一样的:

  • 对消息根据一定规则进行切合,例如固定长度消息、按行、按分隔符、固定长度Header指定Body长度等;

  • 将切合后的消息由字节码转成自己想要的类型,如MINA中将IoBuffer转成字符串,这样messageReceived接收到的message参数就是String类型;

  • write的时候可以传入自定义类型的参数,由编码器完成编码。

下面分别用MINA、Netty、Twisted实现4字节的小字节序int来指定body长度的消息的编码和解码。

MINA

在MINA中对接收到的消息进行切合和解码,一般会定义一个解码器类,继承自抽象类CumulativeProtocolDecoder,实现doDecode方法:

public class MyMinaDecoder extends CumulativeProtocolDecoder {

@Override
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {

// 如果没有接收完Header部分(4字节),直接返回false
if(in.remaining() < 4) {
return false;
} else {

// 标记开始位置,如果一条消息没传输完成则返回到这个位置
in.mark();

byte[] bytes = new byte[4];
in.get(bytes); // 读取4字节的Header

int bodyLength = LittleEndian.getLittleEndianInt(bytes); // 按小字节序转int

// 如果body没有接收完整,直接返回false
if(in.remaining() < bodyLength) {
in.reset(); // IoBuffer position回到原来标记的地方
return false;
} else {
byte[] bodyBytes = new byte[bodyLength];
in.get(bodyBytes);
String body = new String(bodyBytes, "UTF-8");
out.write(body); // 解析出一条消息
return true;
}
}
}
}

另外,session.write的时候要对数据编码,需要定义一个编码器,继承自抽象类ProtocolEncoderAdapter,实现encode方法:

public class MyMinaEncoder extends ProtocolEncoderAdapter {

@Override
public void encode(IoSession session, Object message,
ProtocolEncoderOutput out) throws Exception {

String msg = (String) message;
byte[] bytes = msg.getBytes("UTF-8");
int length = bytes.length;
byte[] header = LittleEndian.toLittleEndian(length); // 按小字节序转成字节数组

IoBuffer buffer = IoBuffer.allocate(length + 4);
buffer.put(header); // header
buffer.put(bytes); // body
buffer.flip();
out.write(buffer);
}
}

在服务器启动的时候加入相应的编码器和解码器:

public class TcpServer {

public static void main(String[] args) throws IOException {
IoAcceptor acceptor = new NioSocketAcceptor();

// 指定编码解码器
acceptor.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new MyMinaEncoder(), new MyMinaDecoder()));

acceptor.setHandler(new TcpServerHandle());
acceptor.bind(new InetSocketAddress(8080));
}
}

下面是业务逻辑的代码:

public class TcpServerHandle extends IoHandlerAdapter {

@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
cause.printStackTrace();
}

// 接收到新的数据
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {

// MyMinaDecoder将接收到的数据由IoBuffer转为String
String msg = (String) message;
System.out.println("messageReceived:" + msg);

// MyMinaEncoder将write的字符串添加了一个小字节序Header并转为字节码
session.write("收到");
}
}

Netty

Netty中解码器和MINA类似,解码器继承抽象类ByteToMessageDecoder,实现decode方法:

public class MyNettyDecoder extends ByteToMessageDecoder {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {

// 如果没有接收完Header部分(4字节),直接退出该方法
if(in.readableBytes() >= 4) {

// 标记开始位置,如果一条消息没传输完成则返回到这个位置
in.markReaderIndex();

byte[] bytes = new byte[4];
in.readBytes(bytes); // 读取4字节的Header

int bodyLength = LittleEndian.getLittleEndianInt(bytes); // header按小字节序转int

// 如果body没有接收完整
if(in.readableBytes() < bodyLength) {
in.resetReaderIndex(); // ByteBuf回到标记位置
} else {
byte[] bodyBytes = new byte[bodyLength];
in.readBytes(bodyBytes);
String body = new String(bodyBytes, "UTF-8");
out.add(body); // 解析出一条消息
}
}
}
}

下面是编码器,继承自抽象类MessageToByteEncoder,实现encode方法:

public class MyNettyEncoder extends MessageToByteEncoder {

@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out)
throws Exception {

byte[] bytes = msg.getBytes("UTF-8");
int length = bytes.length;
byte[] header = LittleEndian.toLittleEndian(length); // int按小字节序转字节数组
out.writeBytes(header); // write header
out.writeBytes(bytes); // write body
}
}

加上相应的编码器和解码器:

public class TcpServer {

public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ChannelPipeline pipeline = ch.pipeline();

// 加上自己的Encoder和Decoder
pipeline.addLast(new MyNettyDecoder());
pipeline.addLast(new MyNettyEncoder());

pipeline.addLast(new TcpServerHandler());
}
});
ChannelFuture f = b.bind(8080).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}

业务逻辑处理类:

public class TcpServerHandler extends ChannelInboundHandlerAdapter {

// 接收到新的数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {

// MyNettyDecoder将接收到的数据由ByteBuf转为String
String message = (String) msg;
System.out.println("channelRead:" + message);

// MyNettyEncoder将write的字符串添加了一个小字节序Header并转为字节码
ctx.writeAndFlush("收到");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

Twisted

Twisted的实现方式和MINA、Netty不太一样,其实现方式相对来说更加原始,但是越原始也越接近底层原理。

首先要定义一个MyProtocol类继承自Protocol,用于充当类似于MINA、Netty的编码、解码器。处理业务逻辑的类TcpServerHandle继承MyProtocol,重写或调用MyProtocol提供的一些方法。

# -*- coding:utf-8 -*-

from struct import pack, unpack
from twisted.internet.protocol import Factory
from twisted.internet.protocol import Protocol
from twisted.internet import reactor

# 编码、解码器
class MyProtocol(Protocol):

# 用于暂时存放接收到的数据
_buffer = b""

def dataReceived(self, data):
# 上次未处理的数据加上本次接收到的数据
self._buffer = self._buffer + data
# 一直循环直到新的消息没有接收完整
while True:
# 如果header接收完整
if len(self._buffer) >= 4:
# 按小字节序转int
length, = unpack(" # 如果body接收完整
if len(self._buffer) >= 4 + length:
# body部分
packet = self._buffer[4:4 + length]
# 新的一条消息接收并解码完成,调用stringReceived
self.stringReceived(packet)
# 去掉_buffer中已经处理的消息部分
self._buffer = self._buffer[4 + length:]
else:
break;
else:
break;

def stringReceived(self, data):
raise NotImplementedError

def sendString(self, string):
self.transport.write(pack("
# 逻辑代码
class TcpServerHandle(MyProtocol):

# 实现MyProtocol提供的stringReceived而不是dataReceived,不然无法解码
def stringReceived(self, data):

# data为MyProtocol解码后的数据
print 'stringReceived:' + data

# 调用sendString而不是self.transport.write,不然不能进行编码
self.sendString("收到")

factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()

下面是Java编写的一个客户端测试程序:

public class TcpClient {

public static void main(String[] args) throws IOException {

Socket socket = null;
OutputStream out = null;
InputStream in = null;

try {

socket = new Socket("localhost", 8080);
out = socket.getOutputStream();
in = socket.getInputStream();

// 请求服务器
String data = "我是客户端";
byte[] outputBytes = data.getBytes("UTF-8");
out.write(LittleEndian.toLittleEndian(outputBytes.length)); // write header
out.write(outputBytes); // write body
out.flush();

// 获取响应
byte[] inputBytes = new byte[1024];
int length = in.read(inputBytes);
if(length >= 4) {
int bodyLength = LittleEndian.getLittleEndianInt(inputBytes);
if(length >= 4 + bodyLength) {
byte[] bodyBytes = Arrays.copyOfRange(inputBytes, 4, 4 + bodyLength);
System.out.println("Header:" + bodyLength);
System.out.println("Body:" + new String(bodyBytes, "UTf-8"));
}
}

} finally {
// 关闭连接
in.close();
out.close();
socket.close();
}
}
}

用客户端分别测试上面三个TCP服务器:

MINA服务器输出结果:

messageReceived:我是客户端

Netty服务器输出结果:

channelRead:我是客户端

Twisted服务器输出结果:

stringReceived:我是客户端

客户端测试三个服务器的输出结果都是:

Header:6

Body:收到

由于一个汉字一般占3个字节,所以两个汉字对应的Header为6。

感谢各位的阅读!关于"Netty、MINA、Twisted中如何定制自己的协议"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!

字节 消息 编码 数据 方法 解码器 客户 客户端 方案 服务器 服务 数组 代码 位置 编码器 逻辑 部分 UTF-8 业务 标记 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 北京数融互联网科技有限公司 格博数据库的建立 警云数据管理终端是服务器吗 动态汇总多表数据库 sql数据库怎么做减法 我的世界1.7.5服务器 我的世界单机版怎么加入服务器 金融科技公司是互联网金融 物联网的app开发软件开发 2020年网络安全小知识来啦 软件开发职称职业资格填什么 临沂微信公众号软件开发 jsp输出数据库时间格式 dayz服务器安装的数据损坏 数据库执行结果在哪找 饥荒联机版服务器怎么调活动 倒排索引为什么比数据库索引快 巴彦软件开发有限公司 获取数据库的链接地址 存储管理服务器系统 用数据库创建表是如何授权 c dll 数据库 长沙赛联网络技术 金银加工网络技术培训 公安网络安全技术服务中心 服务好又优惠的即时通讯软件开发 南京 行政处罚标准数据库 食堂配送软件开发哪家专业 怎么看数据库实例是什么 金保网络安全检查简报
0