ps做汽车网站下载,计算机学院网站建设,网站开发寻找潜在客户的途径,上海发布一. NIO 基础
non-blocking io 非阻塞 IO
1. 三大组件
1.1 Channel Buffer
channel 有一点类似于 stream#xff0c;它就是读写数据的双向通道#xff0c;可以从 channel 将数据读入 buffer#xff0c;也可以将 buffer 的数据写入 channel#xff0c;而之前的 st…一. NIO 基础
non-blocking io 非阻塞 IO
1. 三大组件
1.1 Channel Buffer
channel 有一点类似于 stream它就是读写数据的双向通道可以从 channel 将数据读入 buffer也可以将 buffer 的数据写入 channel而之前的 stream 要么是输入要么是输出channel 比 stream 更为底层 #mermaid-svg-sYWm7vItnlpt0k6a {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-sYWm7vItnlpt0k6a .error-icon{fill:#552222;}#mermaid-svg-sYWm7vItnlpt0k6a .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-sYWm7vItnlpt0k6a .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-sYWm7vItnlpt0k6a .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-sYWm7vItnlpt0k6a .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-sYWm7vItnlpt0k6a .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-sYWm7vItnlpt0k6a .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-sYWm7vItnlpt0k6a .marker{fill:#333333;stroke:#333333;}#mermaid-svg-sYWm7vItnlpt0k6a .marker.cross{stroke:#333333;}#mermaid-svg-sYWm7vItnlpt0k6a svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-sYWm7vItnlpt0k6a .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-sYWm7vItnlpt0k6a .cluster-label text{fill:#333;}#mermaid-svg-sYWm7vItnlpt0k6a .cluster-label span{color:#333;}#mermaid-svg-sYWm7vItnlpt0k6a .label text,#mermaid-svg-sYWm7vItnlpt0k6a span{fill:#333;color:#333;}#mermaid-svg-sYWm7vItnlpt0k6a .node rect,#mermaid-svg-sYWm7vItnlpt0k6a .node circle,#mermaid-svg-sYWm7vItnlpt0k6a .node ellipse,#mermaid-svg-sYWm7vItnlpt0k6a .node polygon,#mermaid-svg-sYWm7vItnlpt0k6a .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-sYWm7vItnlpt0k6a .node .label{text-align:center;}#mermaid-svg-sYWm7vItnlpt0k6a .node.clickable{cursor:pointer;}#mermaid-svg-sYWm7vItnlpt0k6a .arrowheadPath{fill:#333333;}#mermaid-svg-sYWm7vItnlpt0k6a .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-sYWm7vItnlpt0k6a .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-sYWm7vItnlpt0k6a .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-sYWm7vItnlpt0k6a .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-sYWm7vItnlpt0k6a .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-sYWm7vItnlpt0k6a .cluster text{fill:#333;}#mermaid-svg-sYWm7vItnlpt0k6a .cluster span{color:#333;}#mermaid-svg-sYWm7vItnlpt0k6a div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-sYWm7vItnlpt0k6a :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} channel buffer 常见的 Channel 有
FileChannelDatagramChannelSocketChannelServerSocketChannel
buffer 则用来缓冲读写数据常见的 buffer 有
ByteBuffer MappedByteBufferDirectByteBufferHeapByteBuffer ShortBufferIntBufferLongBufferFloatBufferDoubleBufferCharBuffer
1.2 Selector
selector 单从字面意思不好理解需要结合服务器的设计演化来理解它的用途
多线程版设计 #mermaid-svg-rSKldgHTr1duJauJ {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-rSKldgHTr1duJauJ .error-icon{fill:#552222;}#mermaid-svg-rSKldgHTr1duJauJ .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-rSKldgHTr1duJauJ .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-rSKldgHTr1duJauJ .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-rSKldgHTr1duJauJ .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-rSKldgHTr1duJauJ .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-rSKldgHTr1duJauJ .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-rSKldgHTr1duJauJ .marker{fill:#333333;stroke:#333333;}#mermaid-svg-rSKldgHTr1duJauJ .marker.cross{stroke:#333333;}#mermaid-svg-rSKldgHTr1duJauJ svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-rSKldgHTr1duJauJ .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-rSKldgHTr1duJauJ .cluster-label text{fill:#333;}#mermaid-svg-rSKldgHTr1duJauJ .cluster-label span{color:#333;}#mermaid-svg-rSKldgHTr1duJauJ .label text,#mermaid-svg-rSKldgHTr1duJauJ span{fill:#333;color:#333;}#mermaid-svg-rSKldgHTr1duJauJ .node rect,#mermaid-svg-rSKldgHTr1duJauJ .node circle,#mermaid-svg-rSKldgHTr1duJauJ .node ellipse,#mermaid-svg-rSKldgHTr1duJauJ .node polygon,#mermaid-svg-rSKldgHTr1duJauJ .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-rSKldgHTr1duJauJ .node .label{text-align:center;}#mermaid-svg-rSKldgHTr1duJauJ .node.clickable{cursor:pointer;}#mermaid-svg-rSKldgHTr1duJauJ .arrowheadPath{fill:#333333;}#mermaid-svg-rSKldgHTr1duJauJ .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-rSKldgHTr1duJauJ .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-rSKldgHTr1duJauJ .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-rSKldgHTr1duJauJ .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-rSKldgHTr1duJauJ .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-rSKldgHTr1duJauJ .cluster text{fill:#333;}#mermaid-svg-rSKldgHTr1duJauJ .cluster span{color:#333;}#mermaid-svg-rSKldgHTr1duJauJ div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-rSKldgHTr1duJauJ :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 多线程版 socket1 thread socket2 thread socket3 thread ⚠️ 多线程版缺点
内存占用高线程上下文切换成本高只适合连接数少的场景
线程池版设计 #mermaid-svg-t7UbX9JneXA61PzW {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-t7UbX9JneXA61PzW .error-icon{fill:#552222;}#mermaid-svg-t7UbX9JneXA61PzW .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-t7UbX9JneXA61PzW .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-t7UbX9JneXA61PzW .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-t7UbX9JneXA61PzW .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-t7UbX9JneXA61PzW .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-t7UbX9JneXA61PzW .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-t7UbX9JneXA61PzW .marker{fill:#333333;stroke:#333333;}#mermaid-svg-t7UbX9JneXA61PzW .marker.cross{stroke:#333333;}#mermaid-svg-t7UbX9JneXA61PzW svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-t7UbX9JneXA61PzW .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-t7UbX9JneXA61PzW .cluster-label text{fill:#333;}#mermaid-svg-t7UbX9JneXA61PzW .cluster-label span{color:#333;}#mermaid-svg-t7UbX9JneXA61PzW .label text,#mermaid-svg-t7UbX9JneXA61PzW span{fill:#333;color:#333;}#mermaid-svg-t7UbX9JneXA61PzW .node rect,#mermaid-svg-t7UbX9JneXA61PzW .node circle,#mermaid-svg-t7UbX9JneXA61PzW .node ellipse,#mermaid-svg-t7UbX9JneXA61PzW .node polygon,#mermaid-svg-t7UbX9JneXA61PzW .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-t7UbX9JneXA61PzW .node .label{text-align:center;}#mermaid-svg-t7UbX9JneXA61PzW .node.clickable{cursor:pointer;}#mermaid-svg-t7UbX9JneXA61PzW .arrowheadPath{fill:#333333;}#mermaid-svg-t7UbX9JneXA61PzW .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-t7UbX9JneXA61PzW .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-t7UbX9JneXA61PzW .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-t7UbX9JneXA61PzW .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-t7UbX9JneXA61PzW .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-t7UbX9JneXA61PzW .cluster text{fill:#333;}#mermaid-svg-t7UbX9JneXA61PzW .cluster span{color:#333;}#mermaid-svg-t7UbX9JneXA61PzW div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-t7UbX9JneXA61PzW :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 线程池版 socket1 thread socket2 thread socket3 socket4 ⚠️ 线程池版缺点
阻塞模式下线程仅能处理一个 socket 连接仅适合短连接场景
selector 版设计
selector 的作用就是配合一个线程来管理多个 channel获取这些 channel 上发生的事件这些 channel 工作在非阻塞模式下不会让线程吊死在一个 channel 上。适合连接数特别多但流量低的场景low traffic #mermaid-svg-Uymq3dKVQcq1cRGR {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-Uymq3dKVQcq1cRGR .error-icon{fill:#552222;}#mermaid-svg-Uymq3dKVQcq1cRGR .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-Uymq3dKVQcq1cRGR .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-Uymq3dKVQcq1cRGR .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-Uymq3dKVQcq1cRGR .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-Uymq3dKVQcq1cRGR .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-Uymq3dKVQcq1cRGR .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-Uymq3dKVQcq1cRGR .marker{fill:#333333;stroke:#333333;}#mermaid-svg-Uymq3dKVQcq1cRGR .marker.cross{stroke:#333333;}#mermaid-svg-Uymq3dKVQcq1cRGR svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-Uymq3dKVQcq1cRGR .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-Uymq3dKVQcq1cRGR .cluster-label text{fill:#333;}#mermaid-svg-Uymq3dKVQcq1cRGR .cluster-label span{color:#333;}#mermaid-svg-Uymq3dKVQcq1cRGR .label text,#mermaid-svg-Uymq3dKVQcq1cRGR span{fill:#333;color:#333;}#mermaid-svg-Uymq3dKVQcq1cRGR .node rect,#mermaid-svg-Uymq3dKVQcq1cRGR .node circle,#mermaid-svg-Uymq3dKVQcq1cRGR .node ellipse,#mermaid-svg-Uymq3dKVQcq1cRGR .node polygon,#mermaid-svg-Uymq3dKVQcq1cRGR .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-Uymq3dKVQcq1cRGR .node .label{text-align:center;}#mermaid-svg-Uymq3dKVQcq1cRGR .node.clickable{cursor:pointer;}#mermaid-svg-Uymq3dKVQcq1cRGR .arrowheadPath{fill:#333333;}#mermaid-svg-Uymq3dKVQcq1cRGR .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-Uymq3dKVQcq1cRGR .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-Uymq3dKVQcq1cRGR .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-Uymq3dKVQcq1cRGR .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-Uymq3dKVQcq1cRGR .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-Uymq3dKVQcq1cRGR .cluster text{fill:#333;}#mermaid-svg-Uymq3dKVQcq1cRGR .cluster span{color:#333;}#mermaid-svg-Uymq3dKVQcq1cRGR div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-Uymq3dKVQcq1cRGR :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} selector 版 selector thread channel channel channel 调用 selector 的 select() 会阻塞直到 channel 发生了读写就绪事件这些事件发生select 方法就会返回这些事件交给 thread 来处理
2. ByteBuffer
有一普通文本文件 data.txt内容为
1234567890abcd使用 FileChannel 来读取文件内容
Slf4j
public class ChannelDemo1 {public static void main(String[] args) {try (RandomAccessFile file new RandomAccessFile(helloword/data.txt, rw)) {FileChannel channel file.getChannel();ByteBuffer buffer ByteBuffer.allocate(10);do {// 向 buffer 写入int len channel.read(buffer);log.debug(读到字节数{}, len);if (len -1) {break;}// 切换 buffer 读模式buffer.flip();while(buffer.hasRemaining()) {log.debug({}, (char)buffer.get());}// 切换 buffer 写模式buffer.clear();} while (true);} catch (IOException e) {e.printStackTrace();}}
}输出
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 读到字节数10
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 1
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 2
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 3
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 4
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 5
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 6
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 7
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 8
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 9
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 0
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 读到字节数4
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - a
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - b
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - c
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - d
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 读到字节数-12.1 ByteBuffer 正确使用姿势
向 buffer 写入数据例如调用 channel.read(buffer)调用 flip() 切换至读模式从 buffer 读取数据例如调用 buffer.get()调用 clear() 或 compact() 切换至写模式重复 1~4 步骤
2.2 ByteBuffer 结构
ByteBuffer 有以下重要属性
capacitypositionlimit
一开始 写模式下position 是写入位置limit 等于容量下图表示写入了 4 个字节后的状态 flip 动作发生后position 切换为读取位置limit 切换为读取限制 读取 4 个字节后状态 clear 动作发生后状态 compact 方法是把未读完的部分向前压缩然后切换至写模式 调试工具类
public class ByteBufferUtil {private static final char[] BYTE2CHAR new char[256];private static final char[] HEXDUMP_TABLE new char[256 * 4];private static final String[] HEXPADDING new String[16];private static final String[] HEXDUMP_ROWPREFIXES new String[65536 4];private static final String[] BYTE2HEX new String[256];private static final String[] BYTEPADDING new String[16];static {final char[] DIGITS 0123456789abcdef.toCharArray();for (int i 0; i 256; i) {HEXDUMP_TABLE[i 1] DIGITS[i 4 0x0F];HEXDUMP_TABLE[(i 1) 1] DIGITS[i 0x0F];}int i;// Generate the lookup table for hex dump paddingsfor (i 0; i HEXPADDING.length; i) {int padding HEXPADDING.length - i;StringBuilder buf new StringBuilder(padding * 3);for (int j 0; j padding; j) {buf.append( );}HEXPADDING[i] buf.toString();}// Generate the lookup table for the start-offset header in each row (up to 64KiB).for (i 0; i HEXDUMP_ROWPREFIXES.length; i) {StringBuilder buf new StringBuilder(12);buf.append(NEWLINE);buf.append(Long.toHexString(i 4 0xFFFFFFFFL | 0x100000000L));buf.setCharAt(buf.length() - 9, |);buf.append(|);HEXDUMP_ROWPREFIXES[i] buf.toString();}// Generate the lookup table for byte-to-hex-dump conversionfor (i 0; i BYTE2HEX.length; i) {BYTE2HEX[i] StringUtil.byteToHexStringPadded(i);}// Generate the lookup table for byte dump paddingsfor (i 0; i BYTEPADDING.length; i) {int padding BYTEPADDING.length - i;StringBuilder buf new StringBuilder(padding);for (int j 0; j padding; j) {buf.append( );}BYTEPADDING[i] buf.toString();}// Generate the lookup table for byte-to-char conversionfor (i 0; i BYTE2CHAR.length; i) {if (i 0x1f || i 0x7f) {BYTE2CHAR[i] .;} else {BYTE2CHAR[i] (char) i;}}}/*** 打印所有内容* param buffer*/public static void debugAll(ByteBuffer buffer) {int oldlimit buffer.limit();buffer.limit(buffer.capacity());StringBuilder origin new StringBuilder(256);appendPrettyHexDump(origin, buffer, 0, buffer.capacity());System.out.println(---------------------------- all ----------------------------------------);System.out.printf(position: [%d], limit: [%d]\n, buffer.position(), oldlimit);System.out.println(origin);buffer.limit(oldlimit);}/*** 打印可读取内容* param buffer*/public static void debugRead(ByteBuffer buffer) {StringBuilder builder new StringBuilder(256);appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());System.out.println(---------------------------- read ---------------------------------------);System.out.printf(position: [%d], limit: [%d]\n, buffer.position(), buffer.limit());System.out.println(builder);}private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {if (isOutOfBounds(offset, length, buf.capacity())) {throw new IndexOutOfBoundsException(expected: 0 offset( offset ) offset length( length ) buf.capacity( buf.capacity() ));}if (length 0) {return;}dump.append( ------------------------------------------------- NEWLINE | 0 1 2 3 4 5 6 7 8 9 a b c d e f | NEWLINE -------------------------------------------------------------------------);final int startIndex offset;final int fullRows length 4;final int remainder length 0xF;// Dump the rows which have 16 bytes.for (int row 0; row fullRows; row) {int rowStartIndex (row 4) startIndex;// Per-row prefix.appendHexDumpRowPrefix(dump, row, rowStartIndex);// Hex dumpint rowEndIndex rowStartIndex 16;for (int j rowStartIndex; j rowEndIndex; j) {dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);}dump.append( |);// ASCII dumpfor (int j rowStartIndex; j rowEndIndex; j) {dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);}dump.append(|);}// Dump the last row which has less than 16 bytes.if (remainder ! 0) {int rowStartIndex (fullRows 4) startIndex;appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);// Hex dumpint rowEndIndex rowStartIndex remainder;for (int j rowStartIndex; j rowEndIndex; j) {dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);}dump.append(HEXPADDING[remainder]);dump.append( |);// Ascii dumpfor (int j rowStartIndex; j rowEndIndex; j) {dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);}dump.append(BYTEPADDING[remainder]);dump.append(|);}dump.append(NEWLINE -------------------------------------------------------------------------);}private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {if (row HEXDUMP_ROWPREFIXES.length) {dump.append(HEXDUMP_ROWPREFIXES[row]);} else {dump.append(NEWLINE);dump.append(Long.toHexString(rowStartIndex 0xFFFFFFFFL | 0x100000000L));dump.setCharAt(dump.length() - 9, |);dump.append(|);}}public static short getUnsignedByte(ByteBuffer buffer, int index) {return (short) (buffer.get(index) 0xFF);}
}2.3 ByteBuffer 常见方法
分配空间
可以使用 allocate 方法为 ByteBuffer 分配空间其它 buffer 类也有该方法
Bytebuffer buf ByteBuffer.allocate(16);向 buffer 写入数据
有两种办法
调用 channel 的 read 方法调用 buffer 自己的 put 方法
int readBytes channel.read(buf);和
buf.put((byte)127);从 buffer 读取数据
同样有两种办法
调用 channel 的 write 方法调用 buffer 自己的 get 方法
int writeBytes channel.write(buf);和
byte b buf.get();get 方法会让 position 读指针向后走如果想重复读取数据
可以调用 rewind 方法将 position 重新置为 0或者调用 get(int i) 方法获取索引 i 的内容它不会移动读指针
mark 和 reset
mark 是在读取时做一个标记即使 position 改变只要调用 reset 就能回到 mark 的位置 注意 rewind 和 flip 都会清除 mark 位置 字符串与 ByteBuffer 互转
ByteBuffer buffer1 StandardCharsets.UTF_8.encode(你好);
ByteBuffer buffer2 Charset.forName(utf-8).encode(你好);debug(buffer1);
debug(buffer2);CharBuffer buffer3 StandardCharsets.UTF_8.decode(buffer1);
System.out.println(buffer3.getClass());
System.out.println(buffer3.toString());输出 -------------------------------------------------| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
-------------------------------------------------------------------------
|00000000| e4 bd a0 e5 a5 bd |...... |
--------------------------------------------------------------------------------------------------------------------------| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
-------------------------------------------------------------------------
|00000000| e4 bd a0 e5 a5 bd |...... |
-------------------------------------------------------------------------
class java.nio.HeapCharBuffer
你好⚠️ Buffer 的线程安全 Buffer 是非线程安全的 2.4 Scattering Reads
分散读取有一个文本文件 3parts.txt
onetwothree使用如下方式读取可以将数据填充至多个 buffer
try (RandomAccessFile file new RandomAccessFile(helloword/3parts.txt, rw)) {FileChannel channel file.getChannel();ByteBuffer a ByteBuffer.allocate(3);ByteBuffer b ByteBuffer.allocate(3);ByteBuffer c ByteBuffer.allocate(5);channel.read(new ByteBuffer[]{a, b, c});a.flip();b.flip();c.flip();debug(a);debug(b);debug(c);
} catch (IOException e) {e.printStackTrace();
}结果 -------------------------------------------------| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
-------------------------------------------------------------------------
|00000000| 6f 6e 65 |one |
--------------------------------------------------------------------------------------------------------------------------| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
-------------------------------------------------------------------------
|00000000| 74 77 6f |two |
--------------------------------------------------------------------------------------------------------------------------| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
-------------------------------------------------------------------------
|00000000| 74 68 72 65 65 |three |
-------------------------------------------------------------------------2.5 Gathering Writes
使用如下方式写入可以将多个 buffer 的数据填充至 channel
try (RandomAccessFile file new RandomAccessFile(helloword/3parts.txt, rw)) {FileChannel channel file.getChannel();ByteBuffer d ByteBuffer.allocate(4);ByteBuffer e ByteBuffer.allocate(4);channel.position(11);d.put(new byte[]{f, o, u, r});e.put(new byte[]{f, i, v, e});d.flip();e.flip();debug(d);debug(e);channel.write(new ByteBuffer[]{d, e});
} catch (IOException e) {e.printStackTrace();
}输出 -------------------------------------------------| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
-------------------------------------------------------------------------
|00000000| 66 6f 75 72 |four |
--------------------------------------------------------------------------------------------------------------------------| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
-------------------------------------------------------------------------
|00000000| 66 69 76 65 |five |
-------------------------------------------------------------------------文件内容
onetwothreefourfive2.6 练习
网络上有多条数据发送给服务端数据之间使用 \n 进行分隔 但由于某种原因这些数据在接收时被进行了重新组合例如原始数据有3条为
Hello,world\nI’m zhangsan\nHow are you?\n
变成了下面的两个 byteBuffer (黏包半包)
Hello,world\nI’m zhangsan\nHow are you?\n
现在要求你编写程序将错乱的数据恢复成原始的按 \n 分隔的数据
public static void main(String[] args) {ByteBuffer source ByteBuffer.allocate(32);// 11 24source.put(Hello,world\nIm zhangsan\nHo.getBytes());split(source);source.put(w are you?\nhaha!\n.getBytes());split(source);
}private static void split(ByteBuffer source) {source.flip();int oldLimit source.limit();for (int i 0; i oldLimit; i) {if (source.get(i) \n) {System.out.println(i);ByteBuffer target ByteBuffer.allocate(i 1 - source.position());// 0 ~ limitsource.limit(i 1);target.put(source); // 从source 读向 target 写debugAll(target);source.limit(oldLimit);}}source.compact();
}3. 文件编程
3.1 FileChannel
⚠️ FileChannel 工作模式 FileChannel 只能工作在阻塞模式下 获取
不能直接打开 FileChannel必须通过 FileInputStream、FileOutputStream 或者 RandomAccessFile 来获取 FileChannel它们都有 getChannel 方法
通过 FileInputStream 获取的 channel 只能读通过 FileOutputStream 获取的 channel 只能写通过 RandomAccessFile 是否能读写根据构造 RandomAccessFile 时的读写模式决定
读取
会从 channel 读取数据填充 ByteBuffer返回值表示读到了多少字节-1 表示到达了文件的末尾
int readBytes channel.read(buffer);写入
写入的正确姿势如下 SocketChannel
ByteBuffer buffer ...;
buffer.put(...); // 存入数据
buffer.flip(); // 切换读模式while(buffer.hasRemaining()) {channel.write(buffer);
}在 while 中调用 channel.write 是因为 write 方法并不能保证一次将 buffer 中的内容全部写入 channel
关闭
channel 必须关闭不过调用了 FileInputStream、FileOutputStream 或者 RandomAccessFile 的 close 方法会间接地调用 channel 的 close 方法
位置
获取当前位置
long pos channel.position();设置当前位置
long newPos ...;
channel.position(newPos);设置当前位置时如果设置为文件的末尾
这时读取会返回 -1这时写入会追加内容但要注意如果 position 超过了文件末尾再写入时在新内容和原末尾之间会有空洞00
大小
使用 size 方法获取文件的大小
强制写入
操作系统出于性能的考虑会将数据缓存不是立刻写入磁盘。可以调用 force(true) 方法将文件内容和元数据文件的权限等信息立刻写入磁盘
3.2 两个 Channel 传输数据
String FROM helloword/data.txt;
String TO helloword/to.txt;
long start System.nanoTime();
try (FileChannel from new FileInputStream(FROM).getChannel();FileChannel to new FileOutputStream(TO).getChannel();) {from.transferTo(0, from.size(), to);
} catch (IOException e) {e.printStackTrace();
}
long end System.nanoTime();
System.out.println(transferTo 用时 (end - start) / 1000_000.0);输出
transferTo 用时8.2011超过 2g 大小的文件传输
public class TestFileChannelTransferTo {public static void main(String[] args) {try (FileChannel from new FileInputStream(data.txt).getChannel();FileChannel to new FileOutputStream(to.txt).getChannel();) {// 效率高底层会利用操作系统的零拷贝进行优化long size from.size();// left 变量代表还剩余多少字节for (long left size; left 0; ) {System.out.println(position: (size - left) left: left);left - from.transferTo((size - left), left, to);}} catch (IOException e) {e.printStackTrace();}}
}实际传输一个超大文件
position:0 left:7769948160
position:2147483647 left:5622464513
position:4294967294 left:3474980866
position:6442450941 left:13274972193.3 Path
jdk7 引入了 Path 和 Paths 类
Path 用来表示文件路径Paths 是工具类用来获取 Path 实例
Path source Paths.get(1.txt); // 相对路径 使用 user.dir 环境变量来定位 1.txtPath source Paths.get(d:\\1.txt); // 绝对路径 代表了 d:\1.txtPath source Paths.get(d:/1.txt); // 绝对路径 同样代表了 d:\1.txtPath projects Paths.get(d:\\data, projects); // 代表了 d:\data\projects. 代表了当前路径.. 代表了上一级路径
例如目录结构如下
d:|- data|- projects|- a|- b代码
Path path Paths.get(d:\\data\\projects\\a\\..\\b);
System.out.println(path);
System.out.println(path.normalize()); // 正常化路径会输出
d:\data\projects\a\..\b
d:\data\projects\b3.4 Files
检查文件是否存在
Path path Paths.get(helloword/data.txt);
System.out.println(Files.exists(path));创建一级目录
Path path Paths.get(helloword/d1);
Files.createDirectory(path);如果目录已存在会抛异常 FileAlreadyExistsException不能一次创建多级目录否则会抛异常 NoSuchFileException
创建多级目录用
Path path Paths.get(helloword/d1/d2);
Files.createDirectories(path);拷贝文件
Path source Paths.get(helloword/data.txt);
Path target Paths.get(helloword/target.txt);Files.copy(source, target);如果文件已存在会抛异常 FileAlreadyExistsException
如果希望用 source 覆盖掉 target需要用 StandardCopyOption 来控制
Files.copy(source, target, StandardCopyOption.REPLACE_EXISTING);移动文件
Path source Paths.get(helloword/data.txt);
Path target Paths.get(helloword/data.txt);Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);StandardCopyOption.ATOMIC_MOVE 保证文件移动的原子性
删除文件
Path target Paths.get(helloword/target.txt);Files.delete(target);如果文件不存在会抛异常 NoSuchFileException
删除目录
Path target Paths.get(helloword/d1);Files.delete(target);如果目录还有内容会抛异常 DirectoryNotEmptyException
遍历目录文件
public static void main(String[] args) throws IOException {Path path Paths.get(C:\\Program Files\\Java\\jdk1.8.0_91);AtomicInteger dirCount new AtomicInteger();AtomicInteger fileCount new AtomicInteger();Files.walkFileTree(path, new SimpleFileVisitorPath(){Overridepublic FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {System.out.println(dir);dirCount.incrementAndGet();return super.preVisitDirectory(dir, attrs);}Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {System.out.println(file);fileCount.incrementAndGet();return super.visitFile(file, attrs);}});System.out.println(dirCount); // 133System.out.println(fileCount); // 1479
}统计 jar 的数目
Path path Paths.get(C:\\Program Files\\Java\\jdk1.8.0_91);
AtomicInteger fileCount new AtomicInteger();
Files.walkFileTree(path, new SimpleFileVisitorPath(){Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {if (file.toFile().getName().endsWith(.jar)) {fileCount.incrementAndGet();}return super.visitFile(file, attrs);}
});
System.out.println(fileCount); // 724删除多级目录
Path path Paths.get(d:\\a);
Files.walkFileTree(path, new SimpleFileVisitorPath(){Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {Files.delete(file);return super.visitFile(file, attrs);}Overridepublic FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {Files.delete(dir);return super.postVisitDirectory(dir, exc);}
});⚠️ 删除很危险 删除是危险操作确保要递归删除的文件夹没有重要内容 拷贝多级目录
long start System.currentTimeMillis();
String source D:\\Snipaste-1.16.2-x64;
String target D:\\Snipaste-1.16.2-x64aaa;Files.walk(Paths.get(source)).forEach(path - {try {String targetName path.toString().replace(source, target);// 是目录if (Files.isDirectory(path)) {Files.createDirectory(Paths.get(targetName));}// 是普通文件else if (Files.isRegularFile(path)) {Files.copy(path, Paths.get(targetName));}} catch (IOException e) {e.printStackTrace();}
});
long end System.currentTimeMillis();
System.out.println(end - start);4. 网络编程
4.1 非阻塞 vs 阻塞
阻塞
阻塞模式下相关方法都会导致线程暂停 ServerSocketChannel.accept 会在没有连接建立时让线程暂停SocketChannel.read 会在没有数据可读时让线程暂停阻塞的表现其实就是线程暂停了暂停期间不会占用 cpu但线程相当于闲置 单线程下阻塞方法之间相互影响几乎不能正常工作需要多线程支持但多线程下有新的问题体现在以下方面 32 位 jvm 一个线程 320k64 位 jvm 一个线程 1024k如果连接数过多必然导致 OOM并且线程太多反而会因为频繁上下文切换导致性能降低可以采用线程池技术来减少线程数和线程上下文切换但治标不治本如果有很多连接建立但长时间 inactive会阻塞线程池中所有线程因此不适合长连接只适合短连接
服务器端
// 使用 nio 来理解阻塞模式, 单线程
// 0. ByteBuffer
ByteBuffer buffer ByteBuffer.allocate(16);
// 1. 创建了服务器
ServerSocketChannel ssc ServerSocketChannel.open();// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));// 3. 连接集合
ListSocketChannel channels new ArrayList();
while (true) {// 4. accept 建立与客户端连接 SocketChannel 用来与客户端之间通信log.debug(connecting...);SocketChannel sc ssc.accept(); // 阻塞方法线程停止运行log.debug(connected... {}, sc);channels.add(sc);for (SocketChannel channel : channels) {// 5. 接收客户端发送的数据log.debug(before read... {}, channel);channel.read(buffer); // 阻塞方法线程停止运行buffer.flip();debugRead(buffer);buffer.clear();log.debug(after read...{}, channel);}
}客户端
SocketChannel sc SocketChannel.open();
sc.connect(new InetSocketAddress(localhost, 8080));
System.out.println(waiting...);非阻塞
非阻塞模式下相关方法都会不会让线程暂停 在 ServerSocketChannel.accept 在没有连接建立时会返回 null继续运行SocketChannel.read 在没有数据可读时会返回 0但线程不必阻塞可以去执行其它 SocketChannel 的 read 或是去执行 ServerSocketChannel.accept写数据时线程只是等待数据写入 Channel 即可无需等 Channel 通过网络把数据发送出去 但非阻塞模式下即使没有连接建立和可读数据线程仍然在不断运行白白浪费了 cpu数据复制过程中线程实际还是阻塞的AIO 改进的地方
服务器端客户端代码不变
// 使用 nio 来理解非阻塞模式, 单线程
// 0. ByteBuffer
ByteBuffer buffer ByteBuffer.allocate(16);
// 1. 创建了服务器
ServerSocketChannel ssc ServerSocketChannel.open();
ssc.configureBlocking(false); // 非阻塞模式
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// 3. 连接集合
ListSocketChannel channels new ArrayList();
while (true) {// 4. accept 建立与客户端连接 SocketChannel 用来与客户端之间通信SocketChannel sc ssc.accept(); // 非阻塞线程还会继续运行如果没有连接建立但sc是nullif (sc ! null) {log.debug(connected... {}, sc);sc.configureBlocking(false); // 非阻塞模式channels.add(sc);}for (SocketChannel channel : channels) {// 5. 接收客户端发送的数据int read channel.read(buffer);// 非阻塞线程仍然会继续运行如果没有读到数据read 返回 0if (read 0) {buffer.flip();debugRead(buffer);buffer.clear();log.debug(after read...{}, channel);}}
}多路复用
单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控这称之为多路复用
多路复用仅针对网络 IO、普通文件 IO 没法利用多路复用如果不用 Selector 的非阻塞模式线程大部分时间都在做无用功而 Selector 能够保证 有可连接事件时才去连接有可读事件才去读取有可写事件才去写入 限于网络传输能力Channel 未必时时可写一旦 Channel 可写会触发 Selector 的可写事件
4.2 Selector #mermaid-svg-UNfh11PbCSTHB8Ok {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-UNfh11PbCSTHB8Ok .error-icon{fill:#552222;}#mermaid-svg-UNfh11PbCSTHB8Ok .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-UNfh11PbCSTHB8Ok .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-UNfh11PbCSTHB8Ok .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-UNfh11PbCSTHB8Ok .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-UNfh11PbCSTHB8Ok .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-UNfh11PbCSTHB8Ok .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-UNfh11PbCSTHB8Ok .marker{fill:#333333;stroke:#333333;}#mermaid-svg-UNfh11PbCSTHB8Ok .marker.cross{stroke:#333333;}#mermaid-svg-UNfh11PbCSTHB8Ok svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-UNfh11PbCSTHB8Ok .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-UNfh11PbCSTHB8Ok .cluster-label text{fill:#333;}#mermaid-svg-UNfh11PbCSTHB8Ok .cluster-label span{color:#333;}#mermaid-svg-UNfh11PbCSTHB8Ok .label text,#mermaid-svg-UNfh11PbCSTHB8Ok span{fill:#333;color:#333;}#mermaid-svg-UNfh11PbCSTHB8Ok .node rect,#mermaid-svg-UNfh11PbCSTHB8Ok .node circle,#mermaid-svg-UNfh11PbCSTHB8Ok .node ellipse,#mermaid-svg-UNfh11PbCSTHB8Ok .node polygon,#mermaid-svg-UNfh11PbCSTHB8Ok .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-UNfh11PbCSTHB8Ok .node .label{text-align:center;}#mermaid-svg-UNfh11PbCSTHB8Ok .node.clickable{cursor:pointer;}#mermaid-svg-UNfh11PbCSTHB8Ok .arrowheadPath{fill:#333333;}#mermaid-svg-UNfh11PbCSTHB8Ok .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-UNfh11PbCSTHB8Ok .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-UNfh11PbCSTHB8Ok .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-UNfh11PbCSTHB8Ok .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-UNfh11PbCSTHB8Ok .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-UNfh11PbCSTHB8Ok .cluster text{fill:#333;}#mermaid-svg-UNfh11PbCSTHB8Ok .cluster span{color:#333;}#mermaid-svg-UNfh11PbCSTHB8Ok div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-UNfh11PbCSTHB8Ok :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} selector 版 selector thread channel channel channel 好处
一个线程配合 selector 就可以监控多个 channel 的事件事件发生线程才去处理。避免非阻塞模式下所做无用功让这个线程能够被充分利用节约了线程的数量减少了线程上下文切换
创建
Selector selector Selector.open();绑定 Channel 事件
也称之为注册事件绑定的事件 selector 才会关心
channel.configureBlocking(false);
SelectionKey key channel.register(selector, 绑定事件);channel 必须工作在非阻塞模式FileChannel 没有非阻塞模式因此不能配合 selector 一起使用绑定的事件类型可以有 connect - 客户端连接成功时触发accept - 服务器端成功接受连接时触发read - 数据可读入时触发有因为接收能力弱数据暂不能读入的情况write - 数据可写出时触发有因为发送能力弱数据暂不能写出的情况
监听 Channel 事件
可以通过下面三种方法来监听是否有事件发生方法的返回值代表有多少 channel 发生了事件
方法1阻塞直到绑定事件发生
int count selector.select();方法2阻塞直到绑定事件发生或是超时时间单位为 ms
int count selector.select(long timeout);方法3不会阻塞也就是不管有没有事件立刻返回自己根据返回值检查是否有事件
int count selector.selectNow();select 何时不阻塞 事件发生时 客户端发起连接请求会触发 accept 事件客户端发送数据过来客户端正常、异常关闭时都会触发 read 事件另外如果发送的数据大于 buffer 缓冲区会触发多次读取事件channel 可写会触发 write 事件在 linux 下 nio bug 发生时 调用 selector.wakeup()调用 selector.close()selector 所在线程 interrupt 4.3 处理 accept 事件
客户端代码为
public class Client {public static void main(String[] args) {try (Socket socket new Socket(localhost, 8080)) {System.out.println(socket);socket.getOutputStream().write(world.getBytes());System.in.read();} catch (IOException e) {e.printStackTrace();}}
}服务器端代码为
Slf4j
public class ChannelDemo6 {public static void main(String[] args) {try (ServerSocketChannel channel ServerSocketChannel.open()) {channel.bind(new InetSocketAddress(8080));System.out.println(channel);Selector selector Selector.open();channel.configureBlocking(false);channel.register(selector, SelectionKey.OP_ACCEPT);while (true) {int count selector.select();
// int count selector.selectNow();log.debug(select count: {}, count);
// if(count 0) {
// continue;
// }// 获取所有事件SetSelectionKey keys selector.selectedKeys();// 遍历所有事件逐一处理IteratorSelectionKey iter keys.iterator();while (iter.hasNext()) {SelectionKey key iter.next();// 判断事件类型if (key.isAcceptable()) {ServerSocketChannel c (ServerSocketChannel) key.channel();// 必须处理SocketChannel sc c.accept();log.debug({}, sc);}// 处理完毕必须将事件移除iter.remove();}}} catch (IOException e) {e.printStackTrace();}}
}事件发生后能否不处理 事件发生后要么处理要么取消cancel不能什么都不做否则下次该事件仍会触发这是因为 nio 底层使用的是水平触发 4.4 处理 read 事件
Slf4j
public class ChannelDemo6 {public static void main(String[] args) {try (ServerSocketChannel channel ServerSocketChannel.open()) {channel.bind(new InetSocketAddress(8080));System.out.println(channel);Selector selector Selector.open();channel.configureBlocking(false);channel.register(selector, SelectionKey.OP_ACCEPT);while (true) {int count selector.select();
// int count selector.selectNow();log.debug(select count: {}, count);
// if(count 0) {
// continue;
// }// 获取所有事件SetSelectionKey keys selector.selectedKeys();// 遍历所有事件逐一处理IteratorSelectionKey iter keys.iterator();while (iter.hasNext()) {SelectionKey key iter.next();// 判断事件类型if (key.isAcceptable()) {ServerSocketChannel c (ServerSocketChannel) key.channel();// 必须处理SocketChannel sc c.accept();sc.configureBlocking(false);sc.register(selector, SelectionKey.OP_READ);log.debug(连接已建立: {}, sc);} else if (key.isReadable()) {SocketChannel sc (SocketChannel) key.channel();ByteBuffer buffer ByteBuffer.allocate(128);int read sc.read(buffer);if(read -1) {key.cancel();sc.close();} else {buffer.flip();debug(buffer);}}// 处理完毕必须将事件移除iter.remove();}}} catch (IOException e) {e.printStackTrace();}}
}开启两个客户端修改一下发送文字输出
sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8080]
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - 连接已建立: java.nio.channels.SocketChannel[connected local/127.0.0.1:8080 remote/127.0.0.1:60367]
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1-------------------------------------------------| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
-------------------------------------------------------------------------
|00000000| 68 65 6c 6c 6f |hello |
-------------------------------------------------------------------------
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - 连接已建立: java.nio.channels.SocketChannel[connected local/127.0.0.1:8080 remote/127.0.0.1:60378]
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1-------------------------------------------------| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
-------------------------------------------------------------------------
|00000000| 77 6f 72 6c 64 |world |
-------------------------------------------------------------------------为何要 iter.remove() 因为 select 在事件发生后就会将相关的 key 放入 selectedKeys 集合但不会在处理完后从 selectedKeys 集合中移除需要我们自己编码删除。例如 第一次触发了 ssckey 上的 accept 事件没有移除 ssckey第二次触发了 sckey 上的 read 事件但这时 selectedKeys 中还有上次的 ssckey 在处理时因为没有真正的 serverSocket 连上了就会导致空指针异常 cancel 的作用 cancel 会取消注册在 selector 上的 channel并从 keys 集合中删除 key 后续不会再监听事件 ⚠️ 不处理边界的问题
以前有同学写过这样的代码思考注释中两个问题以 bio 为例其实 nio 道理是一样的
public class Server {public static void main(String[] args) throws IOException {ServerSocket ssnew ServerSocket(9000);while (true) {Socket s ss.accept();InputStream in s.getInputStream();// 这里这么写有没有问题byte[] arr new byte[4];while(true) {int read in.read(arr);// 这里这么写有没有问题if(read -1) {break;}System.out.println(new String(arr, 0, read));}}}
}客户端
public class Client {public static void main(String[] args) throws IOException {Socket max new Socket(localhost, 9000);OutputStream out max.getOutputStream();out.write(hello.getBytes());out.write(world.getBytes());out.write(你好.getBytes());max.close();}
}输出
hell
owor
ld
好
为什么
处理消息的边界 一种思路是固定消息长度数据包大小一样服务器按预定长度读取缺点是浪费带宽另一种思路是按分隔符拆分缺点是效率低TLV 格式即 Type 类型、Length 长度、Value 数据类型和长度已知的情况下就可以方便获取消息大小分配合适的 buffer缺点是 buffer 需要提前分配如果内容过大则影响 server 吞吐量 Http 1.1 是 TLV 格式Http 2.0 是 LTV 格式 #mermaid-svg-4JDlXmHnDU3PVhlZ {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-4JDlXmHnDU3PVhlZ .error-icon{fill:#552222;}#mermaid-svg-4JDlXmHnDU3PVhlZ .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-4JDlXmHnDU3PVhlZ .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-4JDlXmHnDU3PVhlZ .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-4JDlXmHnDU3PVhlZ .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-4JDlXmHnDU3PVhlZ .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-4JDlXmHnDU3PVhlZ .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-4JDlXmHnDU3PVhlZ .marker{fill:#333333;stroke:#333333;}#mermaid-svg-4JDlXmHnDU3PVhlZ .marker.cross{stroke:#333333;}#mermaid-svg-4JDlXmHnDU3PVhlZ svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-4JDlXmHnDU3PVhlZ .actor{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;}#mermaid-svg-4JDlXmHnDU3PVhlZ text.actortspan{fill:black;stroke:none;}#mermaid-svg-4JDlXmHnDU3PVhlZ .actor-line{stroke:grey;}#mermaid-svg-4JDlXmHnDU3PVhlZ .messageLine0{stroke-width:1.5;stroke-dasharray:none;stroke:#333;}#mermaid-svg-4JDlXmHnDU3PVhlZ .messageLine1{stroke-width:1.5;stroke-dasharray:2,2;stroke:#333;}#mermaid-svg-4JDlXmHnDU3PVhlZ #arrowhead path{fill:#333;stroke:#333;}#mermaid-svg-4JDlXmHnDU3PVhlZ .sequenceNumber{fill:white;}#mermaid-svg-4JDlXmHnDU3PVhlZ #sequencenumber{fill:#333;}#mermaid-svg-4JDlXmHnDU3PVhlZ #crosshead path{fill:#333;stroke:#333;}#mermaid-svg-4JDlXmHnDU3PVhlZ .messageText{fill:#333;stroke:#333;}#mermaid-svg-4JDlXmHnDU3PVhlZ .labelBox{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;}#mermaid-svg-4JDlXmHnDU3PVhlZ .labelText,#mermaid-svg-4JDlXmHnDU3PVhlZ .labelTexttspan{fill:black;stroke:none;}#mermaid-svg-4JDlXmHnDU3PVhlZ .loopText,#mermaid-svg-4JDlXmHnDU3PVhlZ .loopTexttspan{fill:black;stroke:none;}#mermaid-svg-4JDlXmHnDU3PVhlZ .loopLine{stroke-width:2px;stroke-dasharray:2,2;stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);}#mermaid-svg-4JDlXmHnDU3PVhlZ .note{stroke:#aaaa33;fill:#fff5ad;}#mermaid-svg-4JDlXmHnDU3PVhlZ .noteText,#mermaid-svg-4JDlXmHnDU3PVhlZ .noteTexttspan{fill:black;stroke:none;}#mermaid-svg-4JDlXmHnDU3PVhlZ .activation0{fill:#f4f4f4;stroke:#666;}#mermaid-svg-4JDlXmHnDU3PVhlZ .activation1{fill:#f4f4f4;stroke:#666;}#mermaid-svg-4JDlXmHnDU3PVhlZ .activation2{fill:#f4f4f4;stroke:#666;}#mermaid-svg-4JDlXmHnDU3PVhlZ .actorPopupMenu{position:absolute;}#mermaid-svg-4JDlXmHnDU3PVhlZ .actorPopupMenuPanel{position:absolute;fill:#ECECFF;box-shadow:0px 8px 16px 0px rgba(0,0,0,0.2);filter:drop-shadow(3px 5px 2px rgb(0 0 0 / 0.4));}#mermaid-svg-4JDlXmHnDU3PVhlZ .actor-man line{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;}#mermaid-svg-4JDlXmHnDU3PVhlZ .actor-man circle,#mermaid-svg-4JDlXmHnDU3PVhlZ line{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;stroke-width:2px;}#mermaid-svg-4JDlXmHnDU3PVhlZ :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 客户端1 服务器 ByteBuffer1 ByteBuffer2 发送 01234567890abcdef3333\r 第一次 read 存入 01234567890abcdef 扩容 拷贝 01234567890abcdef 第二次 read 存入 3333\r 01234567890abcdef3333\r 客户端1 服务器 ByteBuffer1 ByteBuffer2 服务器端
private static void split(ByteBuffer source) {source.flip();for (int i 0; i source.limit(); i) {// 找到一条完整消息if (source.get(i) \n) {int length i 1 - source.position();// 把这条完整消息存入新的 ByteBufferByteBuffer target ByteBuffer.allocate(length);// 从 source 读向 target 写for (int j 0; j length; j) {target.put(source.get());}debugAll(target);}}source.compact(); // 0123456789abcdef position 16 limit 16
}public static void main(String[] args) throws IOException {// 1. 创建 selector, 管理多个 channelSelector selector Selector.open();ServerSocketChannel ssc ServerSocketChannel.open();ssc.configureBlocking(false);// 2. 建立 selector 和 channel 的联系注册// SelectionKey 就是将来事件发生后通过它可以知道事件和哪个channel的事件SelectionKey sscKey ssc.register(selector, 0, null);// key 只关注 accept 事件sscKey.interestOps(SelectionKey.OP_ACCEPT);log.debug(sscKey:{}, sscKey);ssc.bind(new InetSocketAddress(8080));while (true) {// 3. select 方法, 没有事件发生线程阻塞有事件线程才会恢复运行// select 在事件未处理时它不会阻塞, 事件发生后要么处理要么取消不能置之不理selector.select();// 4. 处理事件, selectedKeys 内部包含了所有发生的事件IteratorSelectionKey iter selector.selectedKeys().iterator(); // accept, readwhile (iter.hasNext()) {SelectionKey key iter.next();// 处理key 时要从 selectedKeys 集合中删除否则下次处理就会有问题iter.remove();log.debug(key: {}, key);// 5. 区分事件类型if (key.isAcceptable()) { // 如果是 acceptServerSocketChannel channel (ServerSocketChannel) key.channel();SocketChannel sc channel.accept();sc.configureBlocking(false);ByteBuffer buffer ByteBuffer.allocate(16); // attachment// 将一个 byteBuffer 作为附件关联到 selectionKey 上SelectionKey scKey sc.register(selector, 0, buffer);scKey.interestOps(SelectionKey.OP_READ);log.debug({}, sc);log.debug(scKey:{}, scKey);} else if (key.isReadable()) { // 如果是 readtry {SocketChannel channel (SocketChannel) key.channel(); // 拿到触发事件的channel// 获取 selectionKey 上关联的附件ByteBuffer buffer (ByteBuffer) key.attachment();int read channel.read(buffer); // 如果是正常断开read 的方法的返回值是 -1if(read -1) {key.cancel();} else {split(buffer);// 需要扩容if (buffer.position() buffer.limit()) {ByteBuffer newBuffer ByteBuffer.allocate(buffer.capacity() * 2);buffer.flip();newBuffer.put(buffer); // 0123456789abcdef3333\nkey.attach(newBuffer);}}} catch (IOException e) {e.printStackTrace();key.cancel(); // 因为客户端断开了,因此需要将 key 取消从 selector 的 keys 集合中真正删除 key}}}}
}客户端
SocketChannel sc SocketChannel.open();
sc.connect(new InetSocketAddress(localhost, 8080));
SocketAddress address sc.getLocalAddress();
// sc.write(Charset.defaultCharset().encode(hello\nworld\n));
sc.write(Charset.defaultCharset().encode(0123\n456789abcdef));
sc.write(Charset.defaultCharset().encode(0123456789abcdef3333\n));
System.in.read();ByteBuffer 大小分配
每个 channel 都需要记录可能被切分的消息因为 ByteBuffer 不能被多个 channel 共同使用因此需要为每个 channel 维护一个独立的 ByteBufferByteBuffer 不能太大比如一个 ByteBuffer 1Mb 的话要支持百万连接就要 1Tb 内存因此需要设计大小可变的 ByteBuffer 一种思路是首先分配一个较小的 buffer例如 4k如果发现数据不够再分配 8k 的 buffer将 4k buffer 内容拷贝至 8k buffer优点是消息连续容易处理缺点是数据拷贝耗费性能参考实现 http://tutorials.jenkov.com/java-performance/resizable-array.html另一种思路是用多个数组组成 buffer一个数组不够把多出来的内容写入新的数组与前面的区别是消息存储不连续解析复杂优点是避免了拷贝引起的性能损耗
4.5 处理 write 事件
一次无法写完例子
非阻塞模式下无法保证把 buffer 中所有数据都写入 channel因此需要追踪 write 方法的返回值代表实际写入字节数用 selector 监听所有 channel 的可写事件每个 channel 都需要一个 key 来跟踪 buffer但这样又会导致占用内存过多就有两阶段策略 当消息处理器第一次写入消息时才将 channel 注册到 selector 上selector 检查 channel 上的可写事件如果所有的数据写完了就取消 channel 的注册如果不取消会每次可写均会触发 write 事件
public class WriteServer {public static void main(String[] args) throws IOException {ServerSocketChannel ssc ServerSocketChannel.open();ssc.configureBlocking(false);ssc.bind(new InetSocketAddress(8080));Selector selector Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT);while(true) {selector.select();IteratorSelectionKey iter selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key iter.next();iter.remove();if (key.isAcceptable()) {SocketChannel sc ssc.accept();sc.configureBlocking(false);SelectionKey sckey sc.register(selector, SelectionKey.OP_READ);// 1. 向客户端发送内容StringBuilder sb new StringBuilder();for (int i 0; i 3000000; i) {sb.append(a);}ByteBuffer buffer Charset.defaultCharset().encode(sb.toString());int write sc.write(buffer);// 3. write 表示实际写了多少字节System.out.println(实际写入字节: write);// 4. 如果有剩余未读字节才需要关注写事件if (buffer.hasRemaining()) {// read 1 write 4// 在原有关注事件的基础上多关注 写事件sckey.interestOps(sckey.interestOps() SelectionKey.OP_WRITE);// 把 buffer 作为附件加入 sckeysckey.attach(buffer);}} else if (key.isWritable()) {ByteBuffer buffer (ByteBuffer) key.attachment();SocketChannel sc (SocketChannel) key.channel();int write sc.write(buffer);System.out.println(实际写入字节: write);if (!buffer.hasRemaining()) { // 写完了key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);key.attach(null);}}}}}
}客户端
public class WriteClient {public static void main(String[] args) throws IOException {Selector selector Selector.open();SocketChannel sc SocketChannel.open();sc.configureBlocking(false);sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);sc.connect(new InetSocketAddress(localhost, 8080));int count 0;while (true) {selector.select();IteratorSelectionKey iter selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key iter.next();iter.remove();if (key.isConnectable()) {System.out.println(sc.finishConnect());} else if (key.isReadable()) {ByteBuffer buffer ByteBuffer.allocate(1024 * 1024);count sc.read(buffer);buffer.clear();System.out.println(count);}}}}
}write 为何要取消
只要向 channel 发送数据时socket 缓冲可写这个事件会频繁触发因此应当只在 socket 缓冲区写不下时再关注可写事件数据写完之后再取消关注
4.6 更进一步 利用多线程优化 现在都是多核 cpu设计时要充分考虑别让 cpu 的力量被白白浪费 前面的代码只有一个选择器没有充分利用多核 cpu如何改进呢
分两组选择器
单线程配一个选择器专门处理 accept 事件创建 cpu 核心数的线程每个线程配一个选择器轮流处理 read 事件
public class ChannelDemo7 {public static void main(String[] args) throws IOException {new BossEventLoop().register();}Slf4jstatic class BossEventLoop implements Runnable {private Selector boss;private WorkerEventLoop[] workers;private volatile boolean start false;AtomicInteger index new AtomicInteger();public void register() throws IOException {if (!start) {ServerSocketChannel ssc ServerSocketChannel.open();ssc.bind(new InetSocketAddress(8080));ssc.configureBlocking(false);boss Selector.open();SelectionKey ssckey ssc.register(boss, 0, null);ssckey.interestOps(SelectionKey.OP_ACCEPT);workers initEventLoops();new Thread(this, boss).start();log.debug(boss start...);start true;}}public WorkerEventLoop[] initEventLoops() {
// EventLoop[] eventLoops new EventLoop[Runtime.getRuntime().availableProcessors()];WorkerEventLoop[] workerEventLoops new WorkerEventLoop[2];for (int i 0; i workerEventLoops.length; i) {workerEventLoops[i] new WorkerEventLoop(i);}return workerEventLoops;}Overridepublic void run() {while (true) {try {boss.select();IteratorSelectionKey iter boss.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key iter.next();iter.remove();if (key.isAcceptable()) {ServerSocketChannel c (ServerSocketChannel) key.channel();SocketChannel sc c.accept();sc.configureBlocking(false);log.debug({} connected, sc.getRemoteAddress());workers[index.getAndIncrement() % workers.length].register(sc);}}} catch (IOException e) {e.printStackTrace();}}}}Slf4jstatic class WorkerEventLoop implements Runnable {private Selector worker;private volatile boolean start false;private int index;private final ConcurrentLinkedQueueRunnable tasks new ConcurrentLinkedQueue();public WorkerEventLoop(int index) {this.index index;}public void register(SocketChannel sc) throws IOException {if (!start) {worker Selector.open();new Thread(this, worker- index).start();start true;}tasks.add(() - {try {SelectionKey sckey sc.register(worker, 0, null);sckey.interestOps(SelectionKey.OP_READ);worker.selectNow();} catch (IOException e) {e.printStackTrace();}});worker.wakeup();}Overridepublic void run() {while (true) {try {worker.select();Runnable task tasks.poll();if (task ! null) {task.run();}SetSelectionKey keys worker.selectedKeys();IteratorSelectionKey iter keys.iterator();while (iter.hasNext()) {SelectionKey key iter.next();if (key.isReadable()) {SocketChannel sc (SocketChannel) key.channel();ByteBuffer buffer ByteBuffer.allocate(128);try {int read sc.read(buffer);if (read -1) {key.cancel();sc.close();} else {buffer.flip();log.debug({} message:, sc.getRemoteAddress());debugAll(buffer);}} catch (IOException e) {e.printStackTrace();key.cancel();sc.close();}}iter.remove();}} catch (IOException e) {e.printStackTrace();}}}}
}如何拿到 cpu 个数 Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下因为容器不是物理隔离的会拿到物理 cpu 个数而不是容器申请时的个数这个问题直到 jdk 10 才修复使用 jvm 参数 UseContainerSupport 配置 默认开启 4.7 UDP
UDP 是无连接的client 发送数据不会管 server 是否开启server 这边的 receive 方法会将接收到的数据存入 byte buffer但如果数据报文超过 buffer 大小多出来的数据会被默默抛弃
首先启动服务器端
public class UdpServer {public static void main(String[] args) {try (DatagramChannel channel DatagramChannel.open()) {channel.socket().bind(new InetSocketAddress(9999));System.out.println(waiting...);ByteBuffer buffer ByteBuffer.allocate(32);channel.receive(buffer);buffer.flip();debug(buffer);} catch (IOException e) {e.printStackTrace();}}
}输出
waiting...运行客户端
public class UdpClient {public static void main(String[] args) {try (DatagramChannel channel DatagramChannel.open()) {ByteBuffer buffer StandardCharsets.UTF_8.encode(hello);InetSocketAddress address new InetSocketAddress(localhost, 9999);channel.send(buffer, address);} catch (Exception e) {e.printStackTrace();}}
}接下来服务器端输出 -------------------------------------------------| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
-------------------------------------------------------------------------
|00000000| 68 65 6c 6c 6f |hello |
-------------------------------------------------------------------------5. NIO vs BIO
5.1 stream vs channel
stream 不会自动缓冲数据channel 会利用系统提供的发送缓冲区、接收缓冲区更为底层stream 仅支持阻塞 APIchannel 同时支持阻塞、非阻塞 API网络 channel 可配合 selector 实现多路复用二者均为全双工即读写可以同时进行
5.2 IO 模型
同步阻塞、同步非阻塞、同步多路复用、异步阻塞没有此情况、异步非阻塞
同步线程自己去获取结果一个线程异步线程自己不去获取结果而是由其它线程送结果至少两个线程
当调用一次 channel.read 或 stream.read 后会切换至操作系统内核态来完成真正数据读取而读取又分为两个阶段分别为
等待数据阶段复制数据阶段 阻塞 IO 非阻塞 IO 多路复用 信号驱动 异步 IO 阻塞 IO vs 多路复用 参考
UNIX 网络编程 - 卷 I
5.3 零拷贝
传统 IO 问题
传统的 IO 将一个文件通过 socket 写出
File f new File(helloword/data.txt);
RandomAccessFile file new RandomAccessFile(file, r);byte[] buf new byte[(int)f.length()];
file.read(buf);Socket socket ...;
socket.getOutputStream().write(buf);内部工作流程是这样的 java 本身并不具备 IO 读写能力因此 read 方法调用后要从 java 程序的用户态切换至内核态去调用操作系统Kernel的读能力将数据读入内核缓冲区。这期间用户线程阻塞操作系统使用 DMADirect Memory Access来实现文件读其间也不会使用 cpu DMA 也可以理解为硬件单元用来解放 cpu 完成文件 IO 从内核态切换回用户态将数据从内核缓冲区读入用户缓冲区即 byte[] buf这期间 cpu 会参与拷贝无法利用 DMA 调用 write 方法这时将数据从用户缓冲区byte[] buf写入 socket 缓冲区cpu 会参与拷贝 接下来要向网卡写数据这项能力 java 又不具备因此又得从用户态切换至内核态调用操作系统的写能力使用 DMA 将 socket 缓冲区的数据写入网卡不会使用 cpu
可以看到中间环节较多java 的 IO 实际不是物理设备级别的读写而是缓存的复制底层的真正读写是操作系统来完成的
用户态与内核态的切换发生了 3 次这个操作比较重量级数据拷贝了共 4 次
NIO 优化
通过 DirectByteBuf
ByteBuffer.allocate(10) HeapByteBuffer 使用的还是 java 内存ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是操作系统内存 大部分步骤与优化前相同不再赘述。唯有一点java 可以使用 DirectByteBuf 将堆外内存映射到 jvm 内存中来直接访问使用
这块内存不受 jvm 垃圾回收的影响因此内存地址固定有助于 IO 读写java 中的 DirectByteBuf 对象仅维护了此内存的虚引用内存回收分成两步 DirectByteBuf 对象被垃圾回收将虚引用加入引用队列通过专门线程访问引用队列根据虚引用释放堆外内存 减少了一次数据拷贝用户态与内核态的切换次数没有减少
进一步优化底层采用了 linux 2.1 后提供的 sendFile 方法java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据 java 调用 transferTo 方法后要从 java 程序的用户态切换至内核态使用 DMA将数据读入内核缓冲区不会使用 cpu数据从内核缓冲区传输到 socket 缓冲区cpu 会参与拷贝最后使用 DMA 将 socket 缓冲区的数据写入网卡不会使用 cpu
可以看到
只发生了一次用户态与内核态的切换数据拷贝了 3 次
进一步优化linux 2.4 java 调用 transferTo 方法后要从 java 程序的用户态切换至内核态使用 DMA将数据读入内核缓冲区不会使用 cpu只会将一些 offset 和 length 信息拷入 socket 缓冲区几乎无消耗使用 DMA 将 内核缓冲区的数据写入网卡不会使用 cpu
整个过程仅只发生了一次用户态与内核态的切换数据拷贝了 2 次。所谓的【零拷贝】并不是真正无拷贝而是在不会拷贝重复数据到 jvm 内存中零拷贝的优点有
更少的用户态与内核态的切换不利用 cpu 计算减少 cpu 缓存伪共享零拷贝适合小文件传输
5.3 AIO
AIO 用来解决数据复制阶段的阻塞问题
同步意味着在进行读写操作时线程需要等待结果还是相当于闲置异步意味着在进行读写操作时线程不必等待结果而是将来由操作系统来通过回调方式由另外的线程来获得结果 异步模型需要底层操作系统Kernel提供支持 Windows 系统通过 IOCP 实现了真正的异步 IOLinux 系统异步 IO 在 2.6 版本引入但其底层实现还是用多路复用模拟了异步 IO性能没有优势 文件 AIO
先来看看 AsynchronousFileChannel
Slf4j
public class AioDemo1 {public static void main(String[] args) throws IOException {try{AsynchronousFileChannel s AsynchronousFileChannel.open(Paths.get(1.txt), StandardOpenOption.READ);ByteBuffer buffer ByteBuffer.allocate(2);log.debug(begin...);s.read(buffer, 0, null, new CompletionHandlerInteger, ByteBuffer() {Overridepublic void completed(Integer result, ByteBuffer attachment) {log.debug(read completed...{}, result);buffer.flip();debug(buffer);}Overridepublic void failed(Throwable exc, ByteBuffer attachment) {log.debug(read failed...);}});} catch (IOException e) {e.printStackTrace();}log.debug(do other things...);System.in.read();}
}输出
13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - begin...
13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - do other things...
13:44:56 [DEBUG] [Thread-5] c.i.aio.AioDemo1 - read completed...2-------------------------------------------------| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
-------------------------------------------------------------------------
|00000000| 61 0d |a. |
-------------------------------------------------------------------------可以看到
响应文件读取成功的是另一个线程 Thread-5主线程并没有 IO 操作阻塞 守护线程
默认文件 AIO 使用的线程都是守护线程所以最后要执行 System.in.read() 以避免守护线程意外结束
网络 AIO
public class AioServer {public static void main(String[] args) throws IOException {AsynchronousServerSocketChannel ssc AsynchronousServerSocketChannel.open();ssc.bind(new InetSocketAddress(8080));ssc.accept(null, new AcceptHandler(ssc));System.in.read();}private static void closeChannel(AsynchronousSocketChannel sc) {try {System.out.printf([%s] %s close\n, Thread.currentThread().getName(), sc.getRemoteAddress());sc.close();} catch (IOException e) {e.printStackTrace();}}private static class ReadHandler implements CompletionHandlerInteger, ByteBuffer {private final AsynchronousSocketChannel sc;public ReadHandler(AsynchronousSocketChannel sc) {this.sc sc;}Overridepublic void completed(Integer result, ByteBuffer attachment) {try {if (result -1) {closeChannel(sc);return;}System.out.printf([%s] %s read\n, Thread.currentThread().getName(), sc.getRemoteAddress());attachment.flip();System.out.println(Charset.defaultCharset().decode(attachment));attachment.clear();// 处理完第一个 read 时需要再次调用 read 方法来处理下一个 read 事件sc.read(attachment, attachment, this);} catch (IOException e) {e.printStackTrace();}}Overridepublic void failed(Throwable exc, ByteBuffer attachment) {closeChannel(sc);exc.printStackTrace();}}private static class WriteHandler implements CompletionHandlerInteger, ByteBuffer {private final AsynchronousSocketChannel sc;private WriteHandler(AsynchronousSocketChannel sc) {this.sc sc;}Overridepublic void completed(Integer result, ByteBuffer attachment) {// 如果作为附件的 buffer 还有内容需要再次 write 写出剩余内容if (attachment.hasRemaining()) {sc.write(attachment);}}Overridepublic void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();closeChannel(sc);}}private static class AcceptHandler implements CompletionHandlerAsynchronousSocketChannel, Object {private final AsynchronousServerSocketChannel ssc;public AcceptHandler(AsynchronousServerSocketChannel ssc) {this.ssc ssc;}Overridepublic void completed(AsynchronousSocketChannel sc, Object attachment) {try {System.out.printf([%s] %s connected\n, Thread.currentThread().getName(), sc.getRemoteAddress());} catch (IOException e) {e.printStackTrace();}ByteBuffer buffer ByteBuffer.allocate(16);// 读事件由 ReadHandler 处理sc.read(buffer, buffer, new ReadHandler(sc));// 写事件由 WriteHandler 处理sc.write(Charset.defaultCharset().encode(server hello!), ByteBuffer.allocate(16), new WriteHandler(sc));// 处理完第一个 accpet 时需要再次调用 accept 方法来处理下一个 accept 事件ssc.accept(null, this);}Overridepublic void failed(Throwable exc, Object attachment) {exc.printStackTrace();}}
}achment.clear(); // 处理完第一个 read 时需要再次调用 read 方法来处理下一个 read 事件 sc.read(attachment, attachment, this); } catch (IOException e) { e.printStackTrace(); } } Overridepublic void failed(Throwable exc, ByteBuffer attachment) {closeChannel(sc);exc.printStackTrace();}
}private static class WriteHandler implements CompletionHandlerInteger, ByteBuffer {private final AsynchronousSocketChannel sc;private WriteHandler(AsynchronousSocketChannel sc) {this.sc sc;}Overridepublic void completed(Integer result, ByteBuffer attachment) {// 如果作为附件的 buffer 还有内容需要再次 write 写出剩余内容if (attachment.hasRemaining()) {sc.write(attachment);}}Overridepublic void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();closeChannel(sc);}
}private static class AcceptHandler implements CompletionHandlerAsynchronousSocketChannel, Object {private final AsynchronousServerSocketChannel ssc;public AcceptHandler(AsynchronousServerSocketChannel ssc) {this.ssc ssc;}Overridepublic void completed(AsynchronousSocketChannel sc, Object attachment) {try {System.out.printf([%s] %s connected\n, Thread.currentThread().getName(), sc.getRemoteAddress());} catch (IOException e) {e.printStackTrace();}ByteBuffer buffer ByteBuffer.allocate(16);// 读事件由 ReadHandler 处理sc.read(buffer, buffer, new ReadHandler(sc));// 写事件由 WriteHandler 处理sc.write(Charset.defaultCharset().encode(server hello!), ByteBuffer.allocate(16), new WriteHandler(sc));// 处理完第一个 accpet 时需要再次调用 accept 方法来处理下一个 accept 事件ssc.accept(null, this);}Overridepublic void failed(Throwable exc, Object attachment) {exc.printStackTrace();}
}}