千家信息网

netty使用EmbeddedChannel对channel

发表于:2024-10-03 作者:千家信息网编辑
千家信息网最后更新 2024年10月03日,一种特殊的Channel实现----EmbeddedChannel,它是Netty专门为改进针对ChannelHandler的单元测试而提供的。名称职责writeInbound将入站消息写到Embed
千家信息网最后更新 2024年10月03日netty使用EmbeddedChannel对channel

一种特殊的Channel实现----EmbeddedChannel,它是Netty专门为改进针对ChannelHandler的单元测试而提供的。

名称职责
writeInbound将入站消息写到EmbeddedChannel中。如果可以通过readInbound方法从EmbeddedChannel中读取数据,则返回true
readInbound从EmbeddedChannel中读取入站消息。任何返回东西都经过整个ChannelPipeline。如果没有任何可供读取的,则返回null
writeOutbound将出站消息写到EmbeddedChannel中,如果现在可以通过readOutbound从EmbeddedChannel中读取到东西,则返回true
readOutbound从EmbeddedChannel中读取出站消息。任何返回东西都经过整个ChannelPipeline。如果没有任何可供读取的,则返回null
finish将EmbeddedChannel标记为完成,如果有可读取的入站或出站数据,则返回true。这个方法还将会调用EmbeddedChannel上的close方法

测试入站消息

public class FixedLengthFrameDecoder extends ByteToMessageDecoder {    private final int frameLength;    public FixedLengthFrameDecoder(int frameLength) {        if (frameLength <= 0) {            throw new IllegalArgumentException("frameLength must be positive integer: " + frameLength);        }        this.frameLength = frameLength;    }    @Override    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {        while (in.readableBytes() >= frameLength) {            ByteBuf buf = in.readBytes(frameLength);            out.add(buf);        }    }}
public class FixedLengthFrameDecoderTest {    @Test    public void testFramesDecoded() {        ByteBuf buf = Unpooled.buffer();        for (int i = 0; i < 9; i++) {            buf.writeByte(i);        }        ByteBuf input = buf.duplicate();        EmbeddedChannel channel = new EmbeddedChannel(new FixedLengthFrameDecoder(3));        Assert.assertTrue(channel.writeInbound(input.retain()));        Assert.assertTrue(channel.finish());        ByteBuf read = channel.readInbound();        Assert.assertEquals(buf.readSlice(3), read);        read.release();        read = channel.readInbound();        Assert.assertEquals(buf.readSlice(3), read);        read.release();        read = channel.readInbound();        Assert.assertEquals(buf.readSlice(3), read);        read.release();        Assert.assertNull(channel.readInbound());        buf.release();    }    @Test    public void testFramesDecoded2() {        ByteBuf buf = Unpooled.buffer();        for (int i = 0; i < 9; i++) {            buf.writeByte(i);        }        ByteBuf input = buf.duplicate();        EmbeddedChannel channel = new EmbeddedChannel(new FixedLengthFrameDecoder(3));        Assert.assertFalse(channel.writeInbound(input.readBytes(2)));        Assert.assertTrue(channel.writeInbound(input.readBytes(7)));        Assert.assertTrue(channel.finish());        ByteBuf read = channel.readInbound();        Assert.assertEquals(buf.readSlice(3), read);        read.release();        read = channel.readInbound();        Assert.assertEquals(buf.readSlice(3), read);        read.release();        read = channel.readInbound();        Assert.assertEquals(buf.readSlice(3), read);        read.release();        Assert.assertNull(channel.readInbound());        buf.release();    }}

测试出站消息

public class AbsIntegerEncoder extends MessageToMessageEncoder {    @Override    protected void encode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List out) throws Exception {        while (in.readableBytes() >= 4) {            int value = Math.abs(in.readInt());            out.add(value);        }    }}
public class AbsIntegerEncoderTest {    @Test    public void testEncoded() {        ByteBuf buf = Unpooled.buffer();        for (int i = 0; i < 10; i++) {            buf.writeInt(i * -1);        }        EmbeddedChannel channel = new EmbeddedChannel(new AbsIntegerEncoder());        Assert.assertTrue(channel.writeOutbound(buf));        Assert.assertTrue(channel.finish());        for (int i = 0; i < 10; i++) {            Assert.assertEquals(Integer.valueOf(i), channel.readOutbound());        }        Assert.assertNull(channel.readOutbound());    }}

测试异常处理

public class FrameChunkDecoder extends ByteToMessageDecoder {    private final int maxFrameSize;    public FrameChunkDecoder(int maxFrameSize) {        this.maxFrameSize = maxFrameSize;    }    @Override    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {        int readableBytes = in.readableBytes();        if (readableBytes > maxFrameSize) {            in.clear();            throw new TooLongFrameException();        }        ByteBuf buf = in.readBytes(readableBytes);        out.add(buf);    }}
public class FrameChunkDecoderTest {    @Test    public void testFramesDecoded() {        ByteBuf buf = Unpooled.buffer();        for (int i = 0; i < 9; i++) {            buf.writeByte(i);        }        ByteBuf input = buf.duplicate();        EmbeddedChannel channel = new EmbeddedChannel(new FrameChunkDecoder(3));        Assert.assertTrue(channel.writeInbound(input.readBytes(2)));        try {            channel.writeInbound(input.readBytes(4));            Assert.fail();        } catch (TooLongFrameException e) {        }        Assert.assertTrue(channel.writeInbound(input.readBytes(3)));        Assert.assertTrue(channel.finish());        ByteBuf read = channel.readInbound();        Assert.assertEquals(buf.readSlice(2), read);        read.release();        read = channel.readInbound();        Assert.assertEquals(buf.skipBytes(4).readSlice(3), read);        read.release();        buf.release();    }}
0