千家信息网

JAVA NIO ServerSocketChannel(线程池版)

发表于:2024-12-12 作者:千家信息网编辑
千家信息网最后更新 2024年12月12日,服务器端:import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import
千家信息网最后更新 2024年12月12日JAVA NIO ServerSocketChannel(线程池版)

服务器端:

import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectableChannel;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.LinkedList;import java.util.List;public class ServerSocketThreadPool{        private static final int MAX_THREAD = Runtime.getRuntime().availableProcessors();        private ThreadPool pool = new ThreadPool(MAX_THREAD);        private static int PORT_NUMBER = 1234;        public static void main(String[] args) throws Exception {                new ServerSocketThreadPool().go();        }        public void go() throws Exception {                int port = PORT_NUMBER;                System.out.println("Listenning on port:" + port);                // 创建通道 ServerSocketChannel                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();                // 绑定监听端口                serverSocketChannel.socket().bind(new InetSocketAddress(port));                // 设置为非阻塞方式                serverSocketChannel.configureBlocking(false);                // 创建选择器                Selector selector = Selector.open();                // 通道注册到选择器                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);                                while (true) {                        // 一直阻塞,直到有数据请求                        int n = selector.select();                        if (n == 0) {                                continue;                        }                        Iterator it = selector.selectedKeys().iterator();                        while (it.hasNext()) {                                SelectionKey key = it.next();                                if (key.isAcceptable()) {                                        ServerSocketChannel server = (ServerSocketChannel) key.channel();                                        SocketChannel socket = server.accept();                                        registerChannel(selector,socket, SelectionKey.OP_READ);                                        sayHello(socket);                                }                                if (key.isReadable()) {                                        readDataFromSocket(key);                                }                                it.remove();                        }                }        }                public void registerChannel(Selector selector,SelectableChannel channel,int ops)throws Exception{                if(channel==null){                        return;                }                channel.configureBlocking(false);                channel.register(selector, ops);                        }                public void sayHello(SocketChannel socket) throws Exception{                ByteBuffer buffer=ByteBuffer.allocate(1024);                buffer.clear();                buffer.put("hello client".getBytes());                buffer.flip();                        socket.write(buffer);        }        public void readDataFromSocket(SelectionKey key) throws Exception {                WorkThread thread=pool.getWork();                if(thread==null){                        return;                }                thread.serviceChannel(key);        }        private class ThreadPool {                List idle=new LinkedList();                                public ThreadPool(int poolSize) {                        for(int i=0;i0){                                        thread=(WorkThread) idle.remove(0);                                                                        }                        }                        return thread;                }                public void returnWorker(WorkThread workThread) {                        synchronized (idle) {                                idle.add(workThread);                        }                }        }        private class WorkThread extends Thread {                private ByteBuffer buffer = ByteBuffer.allocate(1024);                private ThreadPool pool;                private SelectionKey key;                public WorkThread(ThreadPool pool) {                        this.pool = pool;                }                public synchronized void run() {                        System.out.println(this.getName() + " is ready");                        while (true) {                                try {                                        this.wait();                                } catch (InterruptedException e) {                                        e.printStackTrace();                                        this.interrupt();                                }                                if (key == null) {                                        continue;                                }                                System.out.println(this.getName() + " has been awaken");                                try{                                        drainChannel(key);                                }catch(Exception e){                                        System.out.println("caught '"+e+"' closing channel");                                        try{                                                key.channel().close();                                        }catch(IOException ioe){                                                ioe.printStackTrace();                                        }                                        key.selector().wakeup();                                }                                key=null;                                this.pool.returnWorker(this);                        }                }                synchronized void serviceChannel(SelectionKey key){                        this.key=key;                        key.interestOps(key.interestOps()&(~SelectionKey.OP_READ));                        this.notify();                }                                void drainChannel(SelectionKey key)throws Exception{                        SocketChannel channel=(SocketChannel) key.channel();                        buffer.clear();                        int count;                        while((count=channel.read(buffer))>0){                                buffer.flip();                                /*while(buffer.hasRemaining()){                                        channel.write(buffer);                                }*/                                byte[] bytes;                                bytes=new byte[count];                                buffer.get(bytes);                                System.out.println(new String(bytes));                                buffer.clear();                        }                        if(count<0){                                channel.close();                                return;                        }                        key.interestOps(key.interestOps()|SelectionKey.OP_READ);                        key.selector().wakeup();                                        }        }}

客户端:

import java.io.ByteArrayOutputStream;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;public class ClientTest {                private static SocketChannel socketChannel;        private static Selector selector;        public ClientTest()throws Exception {                socketChannel=SocketChannel.open();                socketChannel.connect(new InetSocketAddress("127.0.0.1", 1234));                socketChannel.configureBlocking(false);                selector=Selector.open();                socketChannel.register(selector, SelectionKey.OP_READ);        }        public static void main(String[] args) throws Exception{                ClientTest test=new ClientTest();                                ByteBuffer buffer=ByteBuffer.allocate(1024);                buffer.put("hello server".getBytes());                buffer.flip();                while(buffer.hasRemaining()){                        test.socketChannel.write(buffer);                }                                buffer.clear();                socketChannel.socket().shutdownOutput();                                String response=receiveData(test.socketChannel);                System.out.println(response);        }                public static String receiveData(SocketChannel socketChannel2) throws IOException {                ByteArrayOutputStream baos = new ByteArrayOutputStream();                String response = "";                try {                        ByteBuffer buffer = ByteBuffer.allocate(1024);                        byte[] bytes;                        int count = 0;                        while ((count = socketChannel2.read(buffer)) >= 0) {                                buffer.flip();                                bytes = new byte[count];                                buffer.get(bytes);                                baos.write(bytes);                                buffer.clear();                        }                        bytes = baos.toByteArray();                        response = new String(bytes);                } finally {                        try {                                baos.close();                        } catch (Exception ex) {                        }                }                return response;        }                        }

更详细关于ServerSocketChannel的线程池的理解,请参考《JAVA NIO》这本书的第四章选择器。

0