在使用 NIO 之前,我们先来看一下传统 BIO 阻塞 IO 的实现。

Java BIO 工作机制

Java BIO 就是传统的 Java I/O 编程,其相关的类和接口在 java.io

BIO(BlockingI/O):同步阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善(实现多个客户连接服务器)。Java BIO 工作流程:

  1. 服务器端启动一个 ServerSocket
  2. 客户端启动 Socket 对服务器进行通信,默认情况下服务器端需要对每个客户建立一个线程与之通讯。
  3. 客户端发出请求后,先咨询服务器是否有线程响应,如果没有则会等待,或者被拒绝。
  4. 如果有响应,客户端线程会等待请求结束后,再继续执行。

Java BIO 应用实例

  1. 使用 BIO 模型编写一个服务器端,监听 6666 端口,当有客户端连接时,就启动一个线程与之通讯。
  2. 要求使用线程池机制改善,可以连接多个客户端。
  3. 服务器端可以接收客户端发送的数据(telnet 方式即可)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class BIOServer {
public static void main(String[] args) throws Exception{
//如果有客户端连接,就创建一个线程与之通信
ExecutorService executorService = Executors.newCachedThreadPool();
ServerSocket serverSocket = new ServerSocket(6666);
System.out.println("服务器启动了");
while (true) {
//监听,等待客户端连接
Socket socket = serverSocket.accept(); //阻塞
System.out.println("连接到一个客户端");
executorService.execute(new Runnable() {
@Override
public void run() {
try {
byte[] bytes = new byte[1024];
InputStream inputStream = socket.getInputStream();
while (true) {
int read = inputStream.read(bytes); //阻塞
if (read == -1) break;
System.out.println(new String(bytes, 0, read));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
System.out.println("关闭客户端连接");
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
});
}
}
}

为什么使用 NIO

传统 Socket 的 accept() 方法阻塞(等待客户端连接),输入流的 read() 方法阻塞(等待 OS 将数据从内核拷贝到用户空间)。也就是说 BIO 会让主线程进入阻塞状态,性能不高。就算使用多线程来解决,但在高并发的情况下,会创建很多线程,线程会占用内存,线程之间的切换也会浪费资源开销。

而 NIO 只有在连接/通道真正有读写事件发生时(事件驱动),才会进行读写,就大大地减少了系统的开销。不必为每一个连接都创建一个线程,也不必去维护多个线程。避免了多个线程之间的上下文切换,导致资源的浪费。

NIO 和 BIO 的比较

  1. BIO 以流的方式处理数据,而 NIO 以块的方式处理数据,块 I/O 的效率比流 I/O 高很多。
  2. BIO 是阻塞的,NIO 则是非阻塞的。
  3. BIO 基于字节流和字符流进行操作,而 NIO 基于 Channel(通道)和 Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中Selector(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道。
  4. BufferChannel 之间的数据流向是双向的。

NIO 实现群聊系统

服务端实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
public class NIOServer {
private Selector selector;
private ServerSocketChannel listenChannel;
private static final int PORT = 6667;

public NIOServer() throws IOException{
// 初始化Selector选择器
this.selector = Selector.open();
// 初始化Channel通道
this.listenChannel = getServerSocketChannel(selector);
}

/**
* 初始化 ServerSocketChannel
*/
private ServerSocketChannel getServerSocketChannel(Selector selector) throws IOException {
// 开辟一个Channel通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 通道设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
// 通道绑定端口,开始监听
serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
// 为了将Channel跟Selector绑定在一起,我们需要将Channel注册到Selector上,调用Channel的register()方法
// 通道中数据的事件类型为OP_ACCEPT(通道与选择器之间的桥梁SelectionKey)
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
return serverSocketChannel;
}

/**
* 监听
*/
public void listen() {
System.out.println("服务器开始监听......");
try {
while (true) {
if (selector.select() == 0) {
continue;
}
//若select()返回大于0,则获取发生事件的SelectionKey
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
//手动从集合中移除当前SelectionKey,防止重复操作
it.remove();
if (key.isAcceptable()) { //判断是否为连接事件(表示有客户端连接)
//accept()为阻塞方法,但此时已经有事件发生,所以实际并不会阻塞
SocketChannel channel = listenChannel.accept();
channel.configureBlocking(false);
//将socketChannel注册到selector,关注事件为OP_READ
channel.register(selector, SelectionKey.OP_READ);
System.out.println("客户端连接: " + channel.getRemoteAddress());
} else if (key.isReadable()) {
//通过SelectionKey反向获取到关联的Channel与Buffer
SocketChannel channel = null;
try {
channel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int read = channel.read(byteBuffer);
if (read > 0) {
String msg = new String(byteBuffer.array());
System.out.println(msg);
notifyAllClient(msg, channel); //转发消息
}
} catch (IOException e) {
System.out.println("客户端 " + channel.getRemoteAddress() + ": 离线了...");
key.cancel(); //取消注册
channel.close();
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//调用close()方法将会关闭Selector,同时也会将关联的SelectionKey失效,但不会关闭Channel
selector.close();
listenChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

/**
* 转发消息到其他客户端
*/
private void notifyAllClient(String msg, SocketChannel self) throws IOException {
for (SelectionKey selectionKey : selector.keys()) {
Channel channel = selectionKey.channel();
if (channel instanceof SocketChannel && channel != self) {
SocketChannel socketChannel = (SocketChannel) channel;
ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
socketChannel.write(byteBuffer);
}
}
}

public static void main(String[] args) throws IOException {
new NIOServer().listen();
}
}

客户端线程类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public class ClientThread extends Thread{

private Selector selector;
private SocketChannel socketChannel;
private static final String HOST = "127.0.0.1";
private static final int PORT = 6667;
private String userName;

public ClientThread(String userName) throws Exception{
this.selector = Selector.open();
this.socketChannel = getSocketChannel();
this.userName = userName;
}

public SocketChannel getSocketChannel() throws Exception{
SocketChannel socketChannel = SocketChannel.open();
socketChannel.socket().connect(new InetSocketAddress(HOST, PORT)); //连接到远程地址
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
return socketChannel;
}

@Override
public void run() {
try {
while (true) {
readInfo();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
selector.close();
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

/**
* 从服务器读取消息
*/
public void readInfo() {
try {
int read = selector.select();
if (read > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
channel.read(byteBuffer);
System.out.println(new String(byteBuffer.array()));
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* 向服务器发送消息
*/
public void sendInfo(String msg) {
msg = userName + ": " + msg;
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
try {
socketChannel.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}

public void close() {
try {
selector.close();
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

客户端实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class NIOClient {

public static void main(String[] args) throws Exception{
String userName = "小明";
ClientThread client = new ClientThread(userName);
client.start();
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String readLine;
while ((readLine = reader.readLine()) != null) {
if (readLine.equals("bye")) {
client.close();
System.exit(0);
}
client.sendInfo(readLine);
}
}
}