千家信息网

Java NIO就绪模式怎么实现

发表于:2025-01-19 作者:千家信息网编辑
千家信息网最后更新 2025年01月19日,这篇文章主要介绍"Java NIO就绪模式怎么实现",在日常操作中,相信很多人在Java NIO就绪模式怎么实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Java
千家信息网最后更新 2025年01月19日Java NIO就绪模式怎么实现

这篇文章主要介绍"Java NIO就绪模式怎么实现",在日常操作中,相信很多人在Java NIO就绪模式怎么实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Java NIO就绪模式怎么实现"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

Java NIO非堵塞应用通常适用用在I/O读写等方面,我们知道,系统运行的性能瓶颈通常在I/O读写,包括对端口和文件的操作上,过去,在打开一个I/O通道后,read()将一直等待在端口一边读取字节内容,如果没有内容进来,read()也是傻傻的等,这会影响我们程序继续做其他事情,那么改进做法就是开设线程,让线程去等待,但是这样做也是相当耗费资源的。

Java NIO非堵塞技术实际是采取Reactor模式,或者说是Observer模式为我们监察I/O端口,如果有内容进来,会自动通知我们,这样,我们就不必开启多个线程死等,从外界看,实现了流畅的I/O读写,不堵塞了。

Java NIO出现不只是一个技术性能的提高,你会发现网络上到处在介绍它,因为它具有里程碑意义,从JDK1.4开始,Java开始提高性能相关的功能,从而使得Java在底层或者并行分布式计算等操作上已经可以和C或Perl等语言并驾齐驱。


图 1 类结构图

package cn.chenkangxian.nioconcurrent;  import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.LinkedList; import java.util.List;  /**  * @Project: testNio  *   * @Author: chenkangxian  *   * @Annotation: 使用线程池来处理大量channel并发  *   * @Date:2011-7-5  *   * @Copyright: 2011 chenkangxian, All rights reserved.  *   */ public class SelectSocketsThreadPool extends SelectSockets {      private static final int MAX_THREADS = 5;     private ThreadPool pool = new ThreadPool(MAX_THREADS);      /**      * 从socket中读数据      */     protected void readDataFromSocket(SelectionKey key) throws Exception {         WorkerThread worker = pool.getWorker();         if (worker == null) {             return;         worker.serviceChannel(key);     }     /**      *        * @Project: concurrentnio      *      * @Author: chenkangxian      *      * @Annotation:线程池      *      * @Date:2011-7-20      *      * @Copyright: 2011 chenkangxian, All rights reserved.      *      */     private class ThreadPool {         List idle = new LinkedList();         /**          * 线程池初始化          *           * @param poolSize 线程池大小          */         ThreadPool(int poolSize) {             for (int i = 0; i < poolSize; i++) {                 WorkerThread thread = new WorkerThread(this);                 thread.setName("Worker" + (i + 1));                 thread.start();                 idle.add(thread);             }         }         /**          * 获得工作线程          *           * Author: chenkangxian          *          * Last Modification Time: 2011-7-20          *          * @return          */         WorkerThread getWorker() {             WorkerThread worker = null;              synchronized (idle) {                 if (idle.size() > 0) {                     worker = (WorkerThread) idle.remove(0);                 }             }             return (worker);         }         /**          * 送回工作线程          *           * Author: chenkangxian          *          * Last Modification Time: 2011-7-20          *          * @param worker          */         void returnWorker(WorkerThread worker) {             synchronized (idle) {                 idle.add(worker);             }         }     }     private class WorkerThread extends Thread {         private ByteBuffer buffer = ByteBuffer.allocate(1024);         private ThreadPool pool;         private SelectionKey key;         WorkerThread(ThreadPool pool) {             this.pool = pool;         }         public synchronized void run() {             System.out.println(this.getName() + " is ready");             while (true) {                 try {                     this.wait();//等待被notify                 } catch (InterruptedException e) {                     e.printStackTrace();                     this.interrupt();                 }                 if (key == null) {//直到有key                     continue;                 }                 System.out.println(this.getName() + " has been awakened");                 try {                     drainChannel(key);                 } catch (Exception e) { System.out.println("Caught '" + e + "' closing channel");                     try { key.channel().close();                     } catch (IOException ex) {     ex.printStackTrace();                     }                     key.selector().wakeup();                 }                 key = null;                 this.pool.returnWorker(this);             }         }         synchronized void serviceChannel(SelectionKey key) {             this.key = key;             //消除读的关注             key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));             this.notify();         }         void drainChannel(SelectionKey key) throws Exception {             SocketChannel channel = (SocketChannel) key.channel();             int count;             buffer.clear();              while ((count = channel.read(buffer)) > 0) {                 buffer.flip();                 while (buffer.hasRemaining()) {                     channel.write(buffer);                 }                 buffer.clear();             }             if (count < 0) {                 channel.close();                 return;             }             //重新开始关注读事件             key.interestOps(key.interestOps() | SelectionKey.OP_READ);             key.selector().wakeup();         }     }     public static void main(String[] args) throws Exception {         new SelectSocketsThreadPool().go(args);     } }
package cn.chenkangxian.nioconcurrent; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; /**  *   * @Project: concurrentnio  *  * @Author: chenkangxian  *  * @Annotation:   *  * @Date:2011-7-11  *  * @Copyright: 2011 chenkangxian, All rights reserved.  *  */ public class SelectSockets {     public static int PORT_NUMBER = 1234;     private ByteBuffer buffer = ByteBuffer.allocate(1024);     public static void main(String[] args) throws Exception {         new SelectSockets().go(args);     }     public void go(String[] args) throws Exception{         int port = PORT_NUMBER; //      if(args.length > 0){ //          port = Integer.parseInt(args[0]); //      } //      System.out.println("Listening on port " + port);         ServerSocketChannel serverChannel = ServerSocketChannel.open();         ServerSocket serverSocket = serverChannel.socket();                  Selector selector = Selector.open();         serverSocket.bind(new InetSocketAddress(port));         serverChannel.configureBlocking(false);         serverChannel.register(selector, SelectionKey.OP_ACCEPT);                  while(true){             int n = selector.select(); //没有轮询,单个selector             if(n == 0){                 continue;              }             Iterator it = selector.selectedKeys().iterator();                          while(it.hasNext()){                 SelectionKey key = (SelectionKey)it.next();                 if(key.isAcceptable()){                     ServerSocketChannel server =                (ServerSocketChannel)key.channel();                     SocketChannel channel = server.accept();        registerChannel(selector,channel,SelectionKey.OP_READ);                     sayHello(channel);                 }                 if(key.isReadable()){                     readDataFromSocket(key);                 }                 it.remove();             }         }     }     /**      * 在selector上注册channel,并设置interest      *       * Author: chenkangxian      *      * Last Modification Time: 2011-7-11      *      * @param selector 选择器      *       * @param channel 通道      *       * @param ops interest      *       * @throws Exception      */     protected void registerChannel(Selector selector,             SelectableChannel channel, int ops) throws Exception{         if(channel == null){             return ;          }         channel.configureBlocking(false);         channel.register(selector, ops);     }     /**      * 处理有可用数据的通道      *       * Author: chenkangxian      *      * Last Modification Time: 2011-7-11      *      * @param key 可用通道对应的key      *       * @throws Exception      */     protected void readDataFromSocket(SelectionKey key) throws Exception{         SocketChannel socketChannel = (SocketChannel)key.channel();         int count;         buffer.clear(); //Empty buffer         while((count = socketChannel.read(buffer)) > 0){             buffer.flip();              while(buffer.hasRemaining()){                 socketChannel.write(buffer);             }             buffer.clear();          }         if(count < 0){             socketChannel.close();         }     }     /**      * 打招呼      *       * Author: chenkangxian      *      * Last Modification Time: 2011-7-11      *      * @param channel 客户端channel      *       * @throws Exception      */     private void sayHello(SocketChannel channel) throws Exception{         buffer.clear();         buffer.put("Hello 哈罗! \r\n".getBytes());         buffer.flip();         channel.write(buffer);     } }

到此,关于"Java NIO就绪模式怎么实现"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0