java nio解决半包 粘包问题
NIO socket是非阻塞的通讯模式,与IO阻塞式的通讯不同点在于NIO的数据要通过channel放到一个缓存池ByteBuffer中,然后再从这个缓存池中读出数据,由于服务端缓存池大小限制以及网速不均匀等原因,会造成服务端读取到缓冲池中的数据不完整,就形成了断包问题,当缓存池大小够大的情况下又会发生一次读取到缓存池中的数据多于一个完整的数据包,这种情况因为无法分清数据包之间的界限,就形成了粘包问题。对于NIO的SocketChannel每次触发OP_READ事件时,发送端不一定仅仅写入了一次,同理,发送端如果一次发送数据包过大,那么发送端的一次写入也可能会被拆分成两次OP_READ事件,所以OP_READ事件和发送端的OP_WRITE事件并不是一一对应的。
一、断包、粘包问题的重现
package org.weir.socket.socketPackage;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class NioSocketClient extends Thread { private SocketChannel socketChannel; private Selector selector = null; private int clientId; public static void main(String args[]) throws IOException { NioSocketClient client = new NioSocketClient(); client.initClient(); client.start(); } public NioSocketClient() { } public NioSocketClient(int clientId) { this.clientId = clientId; } public void initClient() throws IOException { InetSocketAddress inetSocketAddress = new InetSocketAddress(8888); selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.connect(inetSocketAddress); synchronized (selector) { socketChannel.register(selector, SelectionKey.OP_CONNECT); } } public void run() { while (true) { try { int key = selector.select(); if (key > 0) { SetkeySet = selector.selectedKeys(); Iterator iter = keySet.iterator(); while (iter.hasNext()) { SelectionKey selectionKey = null; synchronized (iter) { selectionKey = iter.next(); iter.remove(); } if (selectionKey.isConnectable()) { finishConnect(selectionKey); } if (selectionKey.isWritable()) { send(selectionKey); } if (selectionKey.isReadable()) { read(selectionKey); } } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public void finishConnect(SelectionKey key) { System.out.println("client finish connect!"); SocketChannel socketChannel = (SocketChannel) key.channel(); try { socketChannel.finishConnect(); synchronized (selector) { socketChannel.register(selector, SelectionKey.OP_WRITE); key.interestOps(SelectionKey.OP_WRITE); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void read(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int len = channel.read(byteBuffer); if (len > 0) { byteBuffer.flip(); byte[] byteArray = new byte[byteBuffer.limit()]; byteBuffer.get(byteArray); System.out.println("client[" + clientId + "]" + "receive from server:"); System.out.println(new String(byteArray)); len = channel.read(byteBuffer); byteBuffer.clear(); } key.interestOps(SelectionKey.OP_READ); } public void send(SelectionKey key) { SocketChannel channel = (SocketChannel) key.channel(); // byteBuffer.put(ss.getBytes()); for (int i = 0; i < 10; i++) { String ss = i + "Server ,how are you? this is package message from NioSocketClient!"; ByteBuffer byteBuffer = ByteBuffer.wrap(ss.getBytes()); System.out.println("[client] send:{" + i + "}-- " + ss); while (byteBuffer.hasRemaining()) { try { channel.write(byteBuffer); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } // key.interestOps(SelectionKey.OP_READ); try { synchronized (selector) { channel.register(selector, SelectionKey.OP_READ); } } catch (ClosedChannelException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * int到byte[] * * @param i * @return */ public static byte[] intToBytes(int value) { byte[] result = new byte[4]; // 由高位到低位 result[0] = (byte) ((value >> 24) & 0xFF); result[1] = (byte) ((value >> 16) & 0xFF); result[2] = (byte) ((value >> 8) & 0xFF); result[3] = (byte) (value & 0xFF); return result; } }
package org.weir.socket.socketPackage;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class NioSocketServer extends Thread { ServerSocketChannel serverSocketChannel = null; Selector selector = null; SelectionKey selectionKey = null; public void initServer() throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(8888)); selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } public void run() { while (true) { try { int selectKey = selector.select(); if (selectKey > 0) { Set运行这两个类,结果如下:keySet = selector.selectedKeys(); Iterator iter = keySet.iterator(); while (iter.hasNext()) { SelectionKey selectionKey = iter.next(); iter.remove(); if (selectionKey.isAcceptable()) { accept(selectionKey); } if (selectionKey.isReadable()) { read(selectionKey); } if (selectionKey.isWritable()) { // write(selectionKey); System.out.println(); } } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); try { serverSocketChannel.close(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } } } public void accept(SelectionKey key) { try { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("is acceptable"); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void read(SelectionKey selectionKey) { System.out.println("read事件"); try { SocketChannel channel = (SocketChannel) selectionKey.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(100); int len = channel.read(byteBuffer); if (len > 0) { byteBuffer.flip(); byte[] byteArray = new byte[byteBuffer.limit()]; byteBuffer.get(byteArray); System.out.println("NioSocketServer receive from client:" + new String(byteArray)); } selectionKey.interestOps(SelectionKey.OP_READ); selectionKey.interestOps(SelectionKey.OP_READ); } catch (IOException e) { // TODO Auto-generated catch block try { serverSocketChannel.close(); selectionKey.cancel(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } e.printStackTrace(); } } public void write(SelectionKey selectionKey) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); String httpResponse = "HTTP/1.1 200 OK\r\n" + "Content-Length: 38\r\n" + "Content-Type: text/html\r\n" + "\r\n" + "Hello World!"; System.out.println("response from server to client"); try { ByteBuffer byteBuffer = ByteBuffer.wrap(httpResponse.getBytes()); while (byteBuffer.hasRemaining()) { socketChannel.write(byteBuffer); } selectionKey.cancel(); } catch (IOException e) { try { selectionKey.cancel(); serverSocketChannel.close(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } // TODO Auto-generated catch block e.printStackTrace(); } } /** * byte[]转int * * @param bytes * @return */ public static int byteArrayToInt(byte[] bytes) { int value = 0; // 由高位到低位 for (int i = 0; i < 4; i++) { int shift = (4 - 1 - i) * 8; value += (bytes[i] & 0x000000FF) << shift;// 往高位游 } return value; } public static void main(String args[]) throws IOException { NioSocketServer server = new NioSocketServer(); server.initServer(); server.start(); } }
由于server端的ByteBuffer大小为100,所以会发生粘包问题,当把server端的ByteBuffer大小改为50的情况下,运行结果如下
这种情况下就形成了断包问题,接收到的数据都是不完整的数据
二、断包、粘包问题的解决
解决思路是在封装自己的包协议:包=包内容长度(4byte)+包内容
对于粘包问题先读出包头即包体长度n,然后再读取长度为n的包内容,这样数据包之间的边界就清楚了。
对于断包问题先读出包头即包体长度n,由于此次读取的缓存池长度小于n,这时候就需要先缓存这部分的内容,等待下次read事件来时拼接起来形成完整的数据包。
由于读取channel数据到ByteBuffer缓存池时ByteBuffer的大小限制,client的一次write事件不一定一一对应server的read事件,所以需要一个全局变量来缓存这部分不完整的数据包。
代码如下:
package org.weir.socket.socketPackage;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class NioSocketServer extends Thread { ServerSocketChannel serverSocketChannel = null; Selector selector = null; SelectionKey selectionKey = null; // 缓存一个read事件中一个不完整的包,以待下次read事件到来时拼接成完整的包 ByteBuffer cacheBuffer = ByteBuffer.allocate(100); boolean cache = false; public void initServer() throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(8888)); selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } public void run() { while (true) { try { int selectKey = selector.select(); if (selectKey > 0) { SetkeySet = selector.selectedKeys(); Iterator iter = keySet.iterator(); while (iter.hasNext()) { SelectionKey selectionKey = iter.next(); iter.remove(); if (selectionKey.isAcceptable()) { accept(selectionKey); } if (selectionKey.isReadable()) { read(selectionKey); } if (selectionKey.isWritable()) { // write(selectionKey); System.out.println(); } } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); try { serverSocketChannel.close(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } } } public void accept(SelectionKey key) { try { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("is acceptable"); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } // 一个client的write事件不一定唯一对应server的read事件,所以需要缓存不完整的包,以便拼接成完整的包 //包协议:包=包头(4byte)+包体,包头内容为包体的数据长度 public void read(SelectionKey selectionKey) { System.out.println("read事件"); int head_length = 4;//数据包长度 byte[] headByte = new byte[4]; try { SocketChannel channel = (SocketChannel) selectionKey.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(100); int bodyLen = -1; if (cache) { cacheBuffer.flip(); byteBuffer.put(cacheBuffer); } channel.read(byteBuffer);// 当前read事件 byteBuffer.flip();// write mode to read mode while (byteBuffer.remaining() > 0) { if (bodyLen == -1) { // 还没有读出包头,先读出包头 if (byteBuffer.remaining() >= head_length) { // 可以读出包头,否则缓存 byteBuffer.mark(); byteBuffer.get(headByte); bodyLen = byteArrayToInt(headByte); } else { byteBuffer.reset(); cache = true; cacheBuffer.clear(); cacheBuffer.put(byteBuffer); break; } } else { // 已经读出包头 if (byteBuffer.remaining() >= bodyLen) { // 大于等于一个包,否则缓存 byte[] bodyByte = new byte[bodyLen]; byteBuffer.get(bodyByte, 0, bodyLen); bodyLen = -1; System.out.println("receive from clien content is:" + new String(bodyByte)); } else { byteBuffer.reset(); cacheBuffer.clear(); cacheBuffer.put(byteBuffer); cache = true; break; } } } selectionKey.interestOps(SelectionKey.OP_READ); } catch (IOException e) { // TODO Auto-generated catch block try { serverSocketChannel.close(); selectionKey.cancel(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } e.printStackTrace(); } } public void write(SelectionKey selectionKey) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); String httpResponse = "HTTP/1.1 200 OK\r\n" + "Content-Length: 38\r\n" + "Content-Type: text/html\r\n" + "\r\n" + "Hello World!"; System.out.println("response from server to client"); try { ByteBuffer byteBuffer = ByteBuffer.wrap(httpResponse.getBytes()); while (byteBuffer.hasRemaining()) { socketChannel.write(byteBuffer); } selectionKey.cancel(); } catch (IOException e) { try { selectionKey.cancel(); serverSocketChannel.close(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } // TODO Auto-generated catch block e.printStackTrace(); } } /** * byte[]转int * * @param bytes * @return */ public static int byteArrayToInt(byte[] bytes) { int value = 0; // 由高位到低位 for (int i = 0; i < 4; i++) { int shift = (4 - 1 - i) * 8; value += (bytes[i] & 0x000000FF) << shift;// 往高位游 } return value; } public static void main(String args[]) throws IOException { NioSocketServer server = new NioSocketServer(); server.initServer(); server.start(); } }
package org.weir.socket.socketPackage;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class NioSocketClient extends Thread { private SocketChannel socketChannel; private Selector selector = null; private int clientId; public static void main(String args[]) throws IOException { NioSocketClient client = new NioSocketClient(); client.initClient(); client.start(); } public NioSocketClient() { } public NioSocketClient(int clientId) { this.clientId = clientId; } public void initClient() throws IOException { InetSocketAddress inetSocketAddress = new InetSocketAddress(8888); selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.connect(inetSocketAddress); synchronized (selector) { socketChannel.register(selector, SelectionKey.OP_CONNECT); } } public void run() { while (true) { try { int key = selector.select(); if (key > 0) { Set运行结果如下:keySet = selector.selectedKeys(); Iterator iter = keySet.iterator(); while (iter.hasNext()) { SelectionKey selectionKey = null; synchronized (iter) { selectionKey = iter.next(); iter.remove(); } if (selectionKey.isConnectable()) { finishConnect(selectionKey); } if (selectionKey.isWritable()) { send(selectionKey); } if (selectionKey.isReadable()) { read(selectionKey); } } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public void finishConnect(SelectionKey key) { System.out.println("client finish connect!"); SocketChannel socketChannel = (SocketChannel) key.channel(); try { socketChannel.finishConnect(); synchronized (selector) { socketChannel.register(selector, SelectionKey.OP_WRITE); key.interestOps(SelectionKey.OP_WRITE); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void read(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int len = channel.read(byteBuffer); if (len > 0) { byteBuffer.flip(); byte[] byteArray = new byte[byteBuffer.limit()]; byteBuffer.get(byteArray); System.out.println("client[" + clientId + "]" + "receive from server:"); System.out.println(new String(byteArray)); len = channel.read(byteBuffer); byteBuffer.clear(); } key.interestOps(SelectionKey.OP_READ); } public void send(SelectionKey key) { SocketChannel channel = (SocketChannel) key.channel(); for (int i = 0; i < 10; i++) { String ss = i + "Server ,how are you? this is package message from NioSocketClient!"; int head = (ss).getBytes().length; ByteBuffer byteBuffer = ByteBuffer.allocate(4 + head); byteBuffer.put(intToBytes(head)); byteBuffer.put(ss.getBytes()); byteBuffer.flip(); System.out.println("[client] send:" + i + "-- " + head + ss); while (byteBuffer.hasRemaining()) { try { channel.write(byteBuffer); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } // key.interestOps(SelectionKey.OP_READ); try { synchronized (selector) { channel.register(selector, SelectionKey.OP_READ); } } catch (ClosedChannelException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * int到byte[] * * @param i * @return */ public static byte[] intToBytes(int value) { byte[] result = new byte[4]; // 由高位到低位 result[0] = (byte) ((value >> 24) & 0xFF); result[1] = (byte) ((value >> 16) & 0xFF); result[2] = (byte) ((value >> 8) & 0xFF); result[3] = (byte) (value & 0xFF); return result; } }
断包、粘包问题解决了,接下来写下关于ByteBuffer这个类的使用。