在使用 NIO 之前,我们先来看一下传统 BIO 阻塞 IO 的实现。
Java BIO 工作机制
Java BIO
就是传统的 Java I/O
编程,其相关的类和接口在 java.io
。
BIO(BlockingI/O)
:同步阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善(实现多个客户连接服务器)。Java BIO 工作流程:
- 服务器端启动一个
ServerSocket
。
- 客户端启动
Socket
对服务器进行通信,默认情况下服务器端需要对每个客户建立一个线程与之通讯。
- 客户端发出请求后,先咨询服务器是否有线程响应,如果没有则会等待,或者被拒绝。
- 如果有响应,客户端线程会等待请求结束后,再继续执行。
Java BIO 应用实例
- 使用
BIO
模型编写一个服务器端,监听 6666
端口,当有客户端连接时,就启动一个线程与之通讯。
- 要求使用线程池机制改善,可以连接多个客户端。
- 服务器端可以接收客户端发送的数据(
telnet
方式即可)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public class BIOServer { public static void main(String[] args) throws Exception{ ExecutorService executorService = Executors.newCachedThreadPool(); ServerSocket serverSocket = new ServerSocket(6666); System.out.println("服务器启动了"); while (true) { Socket socket = serverSocket.accept(); System.out.println("连接到一个客户端"); executorService.execute(new Runnable() { @Override public void run() { try { byte[] bytes = new byte[1024]; InputStream inputStream = socket.getInputStream(); while (true) { int read = inputStream.read(bytes); if (read == -1) break; System.out.println(new String(bytes, 0, read)); } } catch (IOException e) { e.printStackTrace(); } finally { System.out.println("关闭客户端连接"); try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }); } } }
|
为什么使用 NIO
传统 Socket 的 accept()
方法阻塞(等待客户端连接),输入流的 read()
方法阻塞(等待 OS 将数据从内核拷贝到用户空间)。也就是说 BIO 会让主线程进入阻塞状态,性能不高。就算使用多线程来解决,但在高并发的情况下,会创建很多线程,线程会占用内存,线程之间的切换也会浪费资源开销。
而 NIO 只有在连接/通道真正有读写事件发生时(事件驱动),才会进行读写,就大大地减少了系统的开销。不必为每一个连接都创建一个线程,也不必去维护多个线程。避免了多个线程之间的上下文切换,导致资源的浪费。
NIO 和 BIO 的比较
BIO
以流的方式处理数据,而 NIO
以块的方式处理数据,块 I/O
的效率比流 I/O
高很多。
BIO
是阻塞的,NIO
则是非阻塞的。
BIO
基于字节流和字符流进行操作,而 NIO
基于 Channel
(通道)和 Buffer
(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector
(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道。
Buffer
和 Channel
之间的数据流向是双向的。
NIO 实现群聊系统
服务端实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
| public class NIOServer { private Selector selector; private ServerSocketChannel listenChannel; private static final int PORT = 6667;
public NIOServer() throws IOException{ this.selector = Selector.open(); this.listenChannel = getServerSocketChannel(selector); }
private ServerSocketChannel getServerSocketChannel(Selector selector) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(PORT)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); return serverSocketChannel; }
public void listen() { System.out.println("服务器开始监听......"); try { while (true) { if (selector.select() == 0) { continue; } Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); if (key.isAcceptable()) { SocketChannel channel = listenChannel.accept(); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); System.out.println("客户端连接: " + channel.getRemoteAddress()); } else if (key.isReadable()) { SocketChannel channel = null; try { channel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int read = channel.read(byteBuffer); if (read > 0) { String msg = new String(byteBuffer.array()); System.out.println(msg); notifyAllClient(msg, channel); } } catch (IOException e) { System.out.println("客户端 " + channel.getRemoteAddress() + ": 离线了..."); key.cancel(); channel.close(); } } } } } catch (Exception e) { e.printStackTrace(); } finally { try { selector.close(); listenChannel.close(); } catch (IOException e) { e.printStackTrace(); } } }
private void notifyAllClient(String msg, SocketChannel self) throws IOException { for (SelectionKey selectionKey : selector.keys()) { Channel channel = selectionKey.channel(); if (channel instanceof SocketChannel && channel != self) { SocketChannel socketChannel = (SocketChannel) channel; ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes()); socketChannel.write(byteBuffer); } } }
public static void main(String[] args) throws IOException { new NIOServer().listen(); } }
|
客户端线程类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| public class ClientThread extends Thread{
private Selector selector; private SocketChannel socketChannel; private static final String HOST = "127.0.0.1"; private static final int PORT = 6667; private String userName;
public ClientThread(String userName) throws Exception{ this.selector = Selector.open(); this.socketChannel = getSocketChannel(); this.userName = userName; }
public SocketChannel getSocketChannel() throws Exception{ SocketChannel socketChannel = SocketChannel.open(); socketChannel.socket().connect(new InetSocketAddress(HOST, PORT)); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); return socketChannel; }
@Override public void run() { try { while (true) { readInfo(); } } catch (Exception e) { e.printStackTrace(); } finally { try { selector.close(); socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } }
public void readInfo() { try { int read = selector.select(); if (read > 0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); channel.read(byteBuffer); System.out.println(new String(byteBuffer.array())); } } } } catch (IOException e) { e.printStackTrace(); } }
public void sendInfo(String msg) { msg = userName + ": " + msg; ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); try { socketChannel.write(buffer); } catch (IOException e) { e.printStackTrace(); } }
public void close() { try { selector.close(); socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } }
|
客户端实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class NIOClient {
public static void main(String[] args) throws Exception{ String userName = "小明"; ClientThread client = new ClientThread(userName); client.start(); BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); String readLine; while ((readLine = reader.readLine()) != null) { if (readLine.equals("bye")) { client.close(); System.exit(0); } client.sendInfo(readLine); } } }
|