NIO

Summary::同步阻塞模型,一个连接一个线程

NIO是Java 1.4引入的新IO模型,也称为同步非阻塞IO,它提供了一种基于事件驱动的方式来处理I/O操作。

相比于传统的BIO模型,NIO采用了Channel、Buffer和Selector等组件,线程可以对某个IO事件进行监听,并继续执行其他任务,不需要阻塞等待。当IO事件就绪时,线程会得到通知,然后可以进行相应的操作,实现了非阻塞式的高伸缩性网络通信。在NIO模型中,数据总是从Channel读入Buffer,或者从Buffer写入Channel,这种模式提高了IO效率,并且可以充分利用系统资源。
Pasted image 20240729102258
NIO主要由三部分组成:选择器(Selector)、缓冲区(Buffer)和通道(Channel)。Channel是一个可以进行数据读写的对象,所有的数据都通过Buffer来处理,这种方式避免了直接将字节写入通道中,而是将数据写入包含一个或者多个字节的缓冲区。在多线程模式下,一个线程可以处理多个请求,这是通过将客户端的连接请求注册到多路复用器上,然后由多路复用器轮询到连接有I/O请求时进行处理。

适用场景

NIO适用于连接数目多且连接比较短(轻操作)的架构,例如聊天服务器、弹幕系统、服务器间通讯等。它通过引入非阻塞通道的概念,提高了系统的伸缩性和并发性能。同时,NIO的使用也简化了程序编写,提高了开发效率。

特点

事件驱动模型、单线程处理多任务、非阻塞I/O,I/O读写不再阻塞,而是返回0、基于block的传输比基于流的传输更高效、更高级的IO函数zero-copy、IO多路复用大大提高了Java网络应用的可伸缩性和实用性。基于Reactor线程模型

优缺点

优点:

NIO适合一些复杂的、高频的、长连接的通信场景,例如聊天室、网络游戏等。

操作流程

  1. 打开通道并设置为非阻塞模式。
  2. 将通道注册到选择器上,并指定感兴趣的事件类型(如连接打开、可读等)。
  3. 线程通过调用选择器的select()方法等待事件发生。
  4. 当有一个或多个事件发生时,线程可以从选择器中获取已经准备好的通道,并进行相应的IO操作。
  5. IO操作完成后,关闭通道和选择器。

代码样例

server端

package Java.IO.NIODemo;  
import java.io.*;  
import java.net.*;  
import java.nio.ByteBuffer;  
import java.nio.channels.SelectionKey;  
import java.nio.channels.Selector;  
import java.nio.channels.ServerSocketChannel;  
import java.nio.channels.SocketChannel;  
import java.util.Iterator;  
import java.util.Set;  
public class NIOServer {  
    public static void main(String[] args) throws IOException {  
        // Selector是抽象类,根据具体的平台选择实现类,windows下使用WindowsSelectorImpl实现类,linux下使用EPollSelectorImpl实现类  
        Selector selector = Selector.open();  
        // 创建一个ServerSocketChannel并绑定到指定的端口  
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();  
        serverSocketChannel.bind(new InetSocketAddress(9999));  
        // 设置为非阻塞模式  
        serverSocketChannel.configureBlocking(false);  
        // 将ServerSocketChannel注册到Selector上,并监听OP_ACCEPT事件,ServerSocketChannel只需要关注ACCEPT事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);  
        System.out.println("服务器已启动,等待客户端连接...");  
        while (true) {  
            // 在没有事件发生时,线程阻塞;反之,则线程恢复运行  
            selector.select();  
            // 处理事件,SelectionKey 内部包含了所有发生的事件  
            Set<SelectionKey> selectedKeys = selector.selectedKeys();  
            Iterator<SelectionKey> keyIterator = selectedKeys.iterator();  
            while (keyIterator.hasNext()) {  
                SelectionKey key = keyIterator.next();  
                // 根据事件类型分别处理  
                if (key.isAcceptable()) {  
                    // 处理连接请求事件  
                    SocketChannel client = serverSocketChannel.accept();  
                    client.configureBlocking(false);  
                    client.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE|SelectionKey.OP_CONNECT);
                    // 必须手动移除SelectionKey,否则会导致已被处理过的事件再次被处理,引发错误  
                    keyIterator.remove();  
                } else if (key.isReadable()) {  
                    SocketChannel client = (SocketChannel) key.channel();  
                    client.getRemoteAddress();  
                    //分配缓存区容量  
                    ByteBuffer buffer = ByteBuffer.allocate(1024);  
                    try {  
                        // 在客户端主动断开连接的时候, read()方法会返回-1,需要关闭客户端连接  
                        int read = client.read(buffer);  
                        if (read == -1) {  
                            // 取消此事件,并关闭连接  
                            key.cancel();  
                            client.close();  
                        } else {  
                            buffer.flip();  
                            buffer.clear();  
                        }  
                        String output = new String(buffer.array()).trim();  
                        Socket socket = client.socket();  
                        InetAddress inetAddress = socket.getInetAddress();  
                        int port = socket.getPort();  
                        String clientInfo = inetAddress+":"+port;  
                        String message = String.format("来自客户端 %s , 消息:%s", clientInfo , output);  
                        System.out.println(message);  
                        System.out.print("回复消息: ");  
                        NIOUtil.writeMessage(selector, client, buffer);  
                        // 必须手动移除SelectionKey,否则会导致已被处理过的事件再次被处理,引发错误  
                        keyIterator.remove();  
                    } catch (IOException e) {  
                        e.printStackTrace();  
                        // 取消此事件,并关闭连接  
                        key.cancel();  
                        client.close();  
                        // 必须手动移除SelectionKey,否则会导致已被处理过的事件再次被处理,引发错误  
                        keyIterator.remove();  
                    }  
                }  
            }  
        }  
    }  
}

client

package Java.IO.NIODemo;  
import java.io.*;  
import java.net.InetSocketAddress;  
import java.nio.ByteBuffer;  
import java.nio.channels.SelectionKey;  
import java.nio.channels.Selector;  
import java.nio.channels.SocketChannel;  
import java.util.Iterator;  
import java.util.Scanner;  
import java.util.Set;  
public class NIOClient {  
    public static void main(String[] args) throws IOException {  
        Selector selector = Selector.open();  
        SocketChannel socketChannel = SocketChannel.open();  
        socketChannel.configureBlocking(false);  
        socketChannel.connect(new InetSocketAddress("localhost", 9999));  
        socketChannel.register(selector, SelectionKey.OP_CONNECT);  
        while (true) {  
            selector.select();  
            Set<SelectionKey> selectedKeys = selector.selectedKeys();  
            Iterator<SelectionKey> keyIterator = selectedKeys.iterator();  
            while (keyIterator.hasNext()) {  
                SelectionKey key = keyIterator.next();  
                if (key.isConnectable()) {  
                    SocketChannel client = (SocketChannel) key.channel();  
                    if (client.isConnectionPending()) {  
                        client.finishConnect();  
                    }  
                    System.out.print("Enter message to server: ");  
                    Scanner scanner = new Scanner(System.in);  
                    String message = scanner.nextLine();  
                    ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());  
                    client.write(buffer);  
                    client.register(selector, SelectionKey.OP_READ);  
                } else if (key.isReadable()) {  
                    SocketChannel client = (SocketChannel) key.channel();  
                    ByteBuffer buffer = ByteBuffer.allocate(1024);  
                    client.read(buffer);  
                    String output = new String(buffer.array()).trim();  
                    System.out.println("来自客户端的消息: " + output);  
                    System.out.print("输入消息: ");  
                    // 和服务端代码一样  
                    NIOUtil.writeMessage(selector, client, buffer);  
                }  
                keyIterator.remove();  
            }  
        }  
    }  
}

Util

package Java.IO.NIODemo;  
import java.io.IOException;  
import java.nio.ByteBuffer;  
import java.nio.channels.SelectionKey;  
import java.nio.channels.Selector;  
import java.nio.channels.SocketChannel;  
import java.util.Scanner;  
public class NIOUtil {  
    static void writeMessage(Selector selector, SocketChannel client, ByteBuffer buffer) throws IOException {  
        Scanner scanner = new Scanner(System.in);  
        String message = scanner.nextLine();  
        buffer.clear();  
        buffer.put(message.getBytes());  
        //从写模式切换到读模式  
        buffer.flip();  
        while (buffer.hasRemaining()) {  
            client.write(buffer);  
        }  
        //  重新监听OP_ACCEPT事件  
        client.register(selector, SelectionKey.OP_READ);  
    }  
}

参考文章

从理论到实践:深度解读BIO、NIO、AIO的优缺点及使用场景-腾讯云开发者社区-腾讯云
【Netty】「NIO」(三)剖析 Selector-腾讯云开发者社区-腾讯云