【Java Nio】 从Buffer到Channel
这篇文章主要整理近期我看到的java.nio高效编程中Buffer和Channel的知识点。
一、java.nio
两个基础术语:缓冲区buffer和通道channel。缓冲区用于容纳数据,通道表示打开的到I/O设备(例如文件或套接字)的连接。
二、Buffer
1、Buffer的要素:
1)position:当前位置,缓冲区下一次发生读写操作的索引。
2)limit:当前界限,缓冲区最后一个有效位置之后下一个位置的索引值。
3)capacity:容量
4)mark:标记。临时记录位置。buffer.mark(buffer.position()) -> buffer.postion(6) -> buffer.reset() 使得 position = mark
2、Buffer基础方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
Object array() 返回缓冲区(基于数组)的引用 int arrayOffset() 返回缓冲区(基于数组)第一个元素的索引 Boolean hasArray() 返回缓冲区是否基于数组,是则true,否则false -------------------------------------------------------------------------------- int capacity() 返回缓冲区的元素容量 int limit() 返回缓冲区的界限 int position() 返回当前位置 Buffer position(int n) 设定当前位置 Buffer mark(int n) 设定临时标记,mark只能小于等于position Buffer reset() 与设定标记一起使用,恢复标记位置 Buffer clear() 清空缓冲区 Buffer rewind() 将缓冲区位置设置为0 Boolean hasRemaining() 返回缓冲区是否还有剩余元素,是则true,否则false int remaining() 返回缓冲区剩下元素的数量 Boolean isDirect() 返回缓冲区是否可以直接进行I/O操作 |
Buffer并没有提供get、put方法,由派生类提供。派生类有:ByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、MappedByteBuffer、ShortBuffer。
3、Buffer重点方法:
1 2 3 4 5 6 7 8 9 10 11 12 |
Buffer allocate(int n) 创建buffer Buffer compact() 清空已读的元素,将未读的元素复制到buffer开始的位置0。 position = limit - position limit = capacity Buffer flip() 写模式转成读模式。将缓冲区的界限设置为当前位置,将当前位置设为0,从头读。 limit = position position = 0 Buffer rewind() position = 0 Buffer clear() position = 0 limit = capacity 不改变任何元素 int compareTo() buffer1.compareTo(buffer2) < 0 buffer2的缓冲区小于buffer1,比较是针对每个缓冲区你剩余数据(从 position 到 limit) Buffer wrap(buffer Array) 将一个数组包装为缓冲区 Buffer slice() 创建缓冲区的子缓冲区,共用数据底层。 buffer.position(2);buffer.limit(7);buffer.slice(); |
4、ByteBuffer基础方法
1 2 3 4 5 6 7 8 9 10 11 |
byte get() 返回当前位置的字节 byte get(int idx) 返回idx位置的字节 ByteBuffer get(byte vals[]) 将缓冲区复制到vals数组中 ByteBuffer get(byte vals[], int start, int num) 将缓冲区start后num个元素复制到vals数组中 ByteBuffer put(byte b) 将b复制到缓冲区当前位置 ByteBuffer put(int idx, byte b) 将b复制到idx位置 ByteBuffer put(byte vals[]) 将vals数组中的元素复制到缓冲区 ByteBuffer put(byte vals[], int start, int num) 将vals数组中start后num个元素复制到缓冲区 ByteBuffer put(ByteBuffer bb) 将bb中的元素复制到缓冲区 |
三、Channel
1、通道表示到I/O源或目标的打开的连接。支持:
DatagramSocket 通过UDP读写网络数据
FileInputStream、FileOutputStream、RandomAccessFile 文件输入输出流
ServerSocket 监听新进来的TCP连接,每一个连接创建一个新的SocketChannel
Socket 通过TCP读写网络中的数据
2、Channel基础方法
1 2 3 4 5 6 7 8 9 |
getChannel() 返回特定类型的通道。 new FileOutputStream("C://test.txt").getChannel() newByteChannel() 静态方法返回字节通道 int read(ByteBuffer bb) 调用通道读取字节到bb,直到缓冲区已满或者不再有输入内容为止。返回实际读取的字节数,或者-1。 int read(ByteBuffer bb, long start) 从文件start位置开始,从调用通道读取字节到bb。 int write(ByteBuffer bb) 将bb的内容写入调用通道,从当前位置开始。返回写入的字节数。 int write(ByteBuffer bb, long start) 从文件start位置开始,将bb中的内容写入调用通道。 |
3、FileChannel用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
//用FileChannel读取和写入文件 try (FileChannel inChannel = FileChannel.open(Paths.get("src/a.txt"),StandardOpenOption.READ); FileChannel outChannel = FileChannel.open(Paths.get("src/b.txt"),StandardOpenOption.WRITE);) { ByteBuffer buf = ByteBuffer.allocate(48); int bytesRead = inChannel.read(buf); while (bytesRead != -1) { buf.flip(); while (buf.hasRemaining()) { outChannel.write(buf); } buf.clear(); bytesRead = inChannel.read(buf); } inChannel.close(); } //FileChannel快速复制文件 //第二个参数表示,数据转移的起始位置,第三个参数表示转移的长度,channel.size()表示通道的长度 outChannel.transferFrom(inChannel, 0, inChannel.size()); //以下方式也可 inChannel.transferTo(0, inChannel.size(), outChannel); |
4、ServerSocketChannel用法
1 2 3 4 5 6 7 8 9 10 11 12 13 |
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(9999)); //设置为非阻塞,accept可以立即返回,若无连接则为null serverSocketChannel.configureBlocking(false); //轮询监听 while(true){ SocketChannel socketChannel = serverSocketChannel.accept(); if(socketChannel != null){ //do something with socketChannel... } } |
5、I/O非阻塞方法
1)FileChannel的非阻塞
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
/* 将来式: * 这种方式是由主线程发起I/O操作并轮询等待结果。这里用了java.util.concurrent.Future接口,它的能力是不让当前线程阻塞。 * 通过将I/O操作转移到另一线程上,并在完成时返回结果,来达到异步的目的。 */ try (AsynchronousFileChannel inChannel = AsynchronousFileChannel.open( Paths.get("src/a.txt"), StandardOpenOption.READ);) { ByteBuffer buffer = ByteBuffer.allocate(1024); //read的第二个参数指定了channel的起始位置 Future<Integer> result = inChannel.read(buffer, 0); //一直轮询I/O操作是否完成 while (!result.isDone()) { // do something } buffer.flip(); while (buffer.hasRemaining()) { System.out.print((char) buffer.get()); } } /* 回调式 * 这种方式是预先制定好I/O成功或失败时的应对策略,等待I/O操作完成后就自动执行该策略。 * 所以必须得重写两个方法completionHandler.completed()和completionHandler.failed(). */ try (AsynchronousFileChannel inChannel = AsynchronousFileChannel.open( Paths.get("src/a.txt"), StandardOpenOption.READ);) { ByteBuffer buffer = ByteBuffer.allocate(1024); //completed(Integer result, A attachment)的第一个参数是完成了多少个字节 //failed(Throwable exc, A attachment)的第一个参数是引起失败的异常类型 inChannel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() { public void completed(Integer result, ByteBuffer attachment) { System.out.println(result); attachment.flip(); while (attachment.hasRemaining()) { System.out.print((char) attachment.get()); } } public void failed(Throwable exception, ByteBuffer attachment) { System.out.println("failed" + exception.getMessage()); } }); // do something } |
2)ServerSocketChannel非阻塞
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
// 服务端TestReadServer.java import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class TestReadServer { /*标识数字*/ private int flag = 0; /*缓冲区大小*/ private int BLOCK = 1024*1024*10; /*接受数据缓冲区*/ private ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK); /*发送数据缓冲区*/ private ByteBuffer receivebuffer = ByteBuffer.allocate(BLOCK); private Selector selector; public TestReadServer(int port) throws IOException { // 打开服务器套接字通道 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 服务器配置为非阻塞 serverSocketChannel.configureBlocking(false); // 检索与此通道关联的服务器套接字 ServerSocket serverSocket = serverSocketChannel.socket(); // 进行服务的绑定 serverSocket.bind(new InetSocketAddress(port)); // 通过open()方法找到Selector selector = Selector.open(); // 注册到selector,等待连接 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("Server Start----"+port+":"); } // 监听 private void listen() throws IOException { while (true) { // 选择一组键,并且相应的通道已经打开 selector.select(); // 返回此选择器的已选择键集。 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); handleKey(selectionKey); } } } // 处理请求 private void handleKey(SelectionKey selectionKey) throws IOException { // 接受请求 ServerSocketChannel server = null; SocketChannel client = null; String receiveText; String sendText; int count=0; // 测试此键的通道是否已准备好接受新的套接字连接。 if (selectionKey.isAcceptable()) { // 返回为之创建此键的通道。 server = (ServerSocketChannel) selectionKey.channel(); // 接受到此通道套接字的连接。 // 此方法返回的套接字通道(如果有)将处于阻塞模式。 client = server.accept(); // 配置为非阻塞 client.configureBlocking(false); // 注册到selector,等待连接 client.register(selector, SelectionKey.OP_READ); } else if (selectionKey.isReadable()) { // 返回为之创建此键的通道。 client = (SocketChannel) selectionKey.channel(); //将缓冲区清空以备下次读取 receivebuffer.clear(); //读取服务器发送来的数据到缓冲区中 System.out.println(System.currentTimeMillis()); count = client.read(receivebuffer); System.out.println(System.currentTimeMillis() + "~"+count); } } /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { // TODO Auto-generated method stub int port = 1234; TestReadServer server = new TestReadServer(port); server.listen(); } } // 客户端TestReadClient.java import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class TestReadClient { /*标识数字*/ private static int flag = 0; /*缓冲区大小*/ private static int BLOCK = 1024*1024*10; /*接受数据缓冲区*/ private static ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK); /*发送数据缓冲区*/ private static ByteBuffer receivebuffer = ByteBuffer.allocate(BLOCK); /*服务器端地址*/ private final static InetSocketAddress SERVER_ADDRESS = new InetSocketAddress( "localhost", 1234); public static void main(String[] args) throws IOException { // TODO Auto-generated method stub // 打开socket通道 SocketChannel socketChannel = SocketChannel.open(); // 设置为非阻塞方式 socketChannel.configureBlocking(false); // 打开选择器 Selector selector = Selector.open(); // 注册连接服务端socket动作 socketChannel.register(selector, SelectionKey.OP_CONNECT); // 连接 socketChannel.connect(SERVER_ADDRESS); // 分配缓冲区大小内存 Set<SelectionKey> selectionKeys; Iterator<SelectionKey> iterator; SelectionKey selectionKey; SocketChannel client; String receiveText; String sendText; int count=0; while (true) { //选择一组键,其相应的通道已为 I/O 操作准备就绪。 //此方法执行处于阻塞模式的选择操作。 selector.select(); //返回此选择器的已选择键集。 selectionKeys = selector.selectedKeys(); //System.out.println(selectionKeys.size()); iterator = selectionKeys.iterator(); while (iterator.hasNext()) { selectionKey = iterator.next(); if (selectionKey.isConnectable()) { System.out.println("client connect"); client = (SocketChannel) selectionKey.channel(); // 判断此通道上是否正在进行连接操作。 // 完成套接字通道的连接过程。 if (client.isConnectionPending()) { client.finishConnect(); System.out.println("完成连接!"); sendbuffer.clear(); BufferedInputStream br = new BufferedInputStream(new FileInputStream(new File("D:\\BigData.zip"))); byte[] b = new byte[BLOCK]; br.read(b); sendbuffer.put(b); sendbuffer.flip(); System.out.println(System.currentTimeMillis()); client.write(sendbuffer); System.out.println(System.currentTimeMillis()); } client.register(selector, SelectionKey.OP_READ); } else if (selectionKey.isReadable()) { client = (SocketChannel) selectionKey.channel(); //将缓冲区清空以备下次读取 receivebuffer.clear(); //读取服务器发送来的数据到缓冲区中 count=client.read(receivebuffer); if(count>0){ receiveText = new String( receivebuffer.array(),0,count); System.out.println("客户端接受服务器端数据--:"+receiveText); client.register(selector, SelectionKey.OP_WRITE); } } } selectionKeys.clear(); } } } |