一、概念 1、同步和异步 同步:用户线程发起I/O请求后需要等待或者轮询内核I/O操作完成后才能继续执行
异步:用户线程发起I/O请求后仍需要继续执行,当内核I/O操作完成后会通知用户线程,或者调用用户线程注册的回调函数
2、阻塞和非阻塞讨论的是参与通信双方的工作机制,是否需要互相等待对方的执行
阻塞:在通信过程中, 一方在处理通信, 另一方要等待对方执行并返回信息不能去做其他无关的事
非阻塞:在通信过程中, 一方在处理通信, 另一方可以不用等待执行并返回信息而可以去做其他无关的事 直到对方处理通信完成 再在适合的时候继续处理通信过程
二、BIO (同步阻塞) 代码示例服务端业务代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 import javax.script.ScriptEngine;import javax.script.ScriptEngineManager;import javax.script.ScriptException;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.io.PrintWriter;import java.net.Socket;public class ServerHandler implements Runnable { private Socket socket; public ServerHandler (Socket socket) { this .socket = socket; } @Override public void run () { BufferedReader in = null ; PrintWriter out = null ; try { in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(),true ); String expression; String result; while (true ){ if ((expression = in.readLine())==null ) { break ; } System.out.println("server阻塞测试" ); System.out.println("服务器收到消息:" + expression); try { result = Calculator.cal(expression).toString(); }catch (Exception e){ result = "计算错误:" + e.getMessage(); } out.println(result); } }catch (Exception e){ e.printStackTrace(); }finally { if (in != null ){ try { in.close(); } catch (IOException e) { e.printStackTrace(); } in = null ; } if (out != null ){ out.close(); out = null ; } if (socket != null ){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket = null ; } } } } final class Calculator { private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript" ); public static Object cal (String expression) throws ScriptException { return jse.eval(expression); } }
服务端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 import java.io.IOException;import java.net.ServerSocket;import java.net.Socket;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ServerNormal { private static int DEFAULT_PORT = 12344 ; private static ServerSocket server; private static ExecutorService executorService = Executors.newFixedThreadPool(1 ); public static void start () throws IOException { start(DEFAULT_PORT); } public synchronized static void start (int port) throws IOException { if (server != null ) return ; try { server = new ServerSocket(port); System.out.println("服务器已启动,端口号:" + port); while (true ){ Socket socket = server.accept(); executorService.execute(new ServerHandler(socket)); } }finally { if (server != null ){ System.out.println("服务器已关闭。" ); server.close(); server = null ; } } } }
客户端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.io.PrintWriter;import java.net.Socket;public class Client { private static int DEFAULT_SERVER_PORT = 12344 ; private static String DEFAULT_SERVER_IP = "127.0.0.1" ; public static void send (String expression) { send(DEFAULT_SERVER_PORT,expression); } public static void send (int port,String expression) { System.out.println("算术表达式为:" + expression); Socket socket = null ; BufferedReader in = null ; PrintWriter out = null ; try { socket = new Socket(DEFAULT_SERVER_IP,port); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(),true ); out.println(expression); System.out.println("___结果为:" + in.readLine()); System.out.println("client阻塞测试" ); }catch (Exception e){ e.printStackTrace(); }finally { if (in != null ){ try { in.close(); } catch (IOException e) { e.printStackTrace(); } in = null ; } if (out != null ){ out.close(); out = null ; } if (socket != null ){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket = null ; } } } }
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 import java.io.IOException;import java.util.Random;public class BIOTest { public static void main (String[] args) throws InterruptedException { new Thread(() -> { try { ServerNormal.start(); } catch (IOException e) { e.printStackTrace(); } }).start(); Thread.sleep(100 ); char operators[] = {'+' ,'-' ,'*' ,'/' }; Random random = new Random(System.currentTimeMillis()); new Thread(() -> { while (true ){ String expression = random.nextInt(10 )+"" +operators[random.nextInt(4 )]+(random.nextInt(10 )+1 ); Client.send(expression); try { Thread.sleep(random.nextInt(1000 )); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }
问题同步阻塞式I/O创建的Server
结构图
1、BIO主要的问题在于每当有一个新的客户端请求接入时,服务端必须创建一个新的线程来处理这条链路,在需要满足高性能、高并发的场景是没法应用的(大量创建新的线程会严重影响服务器性能,甚至罢工) 1、限制了线程数量,如果发生大量并发请求,超过最大数量的线程就只能等待,直到线程池中的有空闲的线程可以被复用。而对Socket的输入流进行读取时,会一直阻塞所以在读取数据较慢时(比如数据量大、网络传输慢等),大量并发的情况下,其他接入的消息,只能一直等待,这就是最大的弊端。而NIO,就能解决这个难题
三、NIO 代码示例服务端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 import javax.script.ScriptEngine;import javax.script.ScriptEngineManager;import javax.script.ScriptException;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;public class Server { private static int DEFAULT_PORT = 12345 ; private static ServerHandle serverHandle; public static void start () { start(DEFAULT_PORT); } public static synchronized void start (int port) { if (serverHandle!=null ) { serverHandle.stop(); } serverHandle = new ServerHandle(port); new Thread(serverHandle,"Server" ).start(); } } class ServerHandle implements Runnable { private Selector selector; private ServerSocketChannel serverChannel; private volatile boolean started; public ServerHandle (int port) { try { selector = Selector.open(); serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false ); serverChannel.socket().bind(new InetSocketAddress(port),1024 ); serverChannel.register(selector, SelectionKey.OP_ACCEPT); started = true ; System.out.println("服务器已启动,端口号:" + port); }catch (IOException e){ e.printStackTrace(); System.exit(1 ); } } public void stop () { started = false ; } @Override public void run () { while (started){ try { selector.selectNow(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); SelectionKey key = null ; while (it.hasNext()){ key = it.next(); it.remove(); try { handleInput(key); }catch (Exception e){ if (key != null ){ key.cancel(); if (key.channel() != null ){ key.channel().close(); } } } } }catch (Throwable t){ t.printStackTrace(); } } if (selector != null ) { try { selector.close(); }catch (Exception e) { e.printStackTrace(); } } } private void handleInput (SelectionKey key) throws IOException { if (key.isValid()){ if (key.isAcceptable()){ ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false ); sc.register(selector, SelectionKey.OP_READ); } if (key.isReadable()){ SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024 ); int readBytes = sc.read(buffer); if (readBytes>0 ){ buffer.flip(); byte [] bytes = new byte [buffer.remaining()]; buffer.get(bytes); String expression = new String(bytes,"UTF-8" ); System.out.println("服务器收到消息:" + expression); String result = null ; try { result = Calculator.cal(expression).toString(); }catch (Exception e){ result = "计算错误:" + e.getMessage(); } doWrite(sc,result); } else if (readBytes<0 ){ key.cancel(); sc.close(); } } } } private void doWrite (SocketChannel channel,String response) throws IOException { byte [] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } } final class Calculator { private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript" ); public static Object cal (String expression) throws ScriptException { return jse.eval(expression); } }
客户端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;public class Client { private static String DEFAULT_HOST = "127.0.0.1" ; private static int DEFAULT_PORT = 12345 ; private static ClientHandle clientHandle; public static void start () { start(DEFAULT_HOST,DEFAULT_PORT); } public static synchronized void start (String ip,int port) { if (clientHandle!=null ) clientHandle.stop(); clientHandle = new ClientHandle(ip,port); new Thread(clientHandle,"Server" ).start(); } public static boolean sendMsg (String msg) throws Exception { if (msg.equals("q" )) return false ; clientHandle.sendMsg(msg); return true ; } } class ClientHandle implements Runnable { private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean started; public ClientHandle (String ip,int port) { this .host = ip; this .port = port; try { selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false ); started = true ; }catch (IOException e){ e.printStackTrace(); System.exit(1 ); } } public void stop () { started = false ; } @Override public void run () { try { doConnect(); }catch (IOException e){ e.printStackTrace(); System.exit(1 ); } while (started){ try { selector.select(1000 ); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); SelectionKey key = null ; while (it.hasNext()){ key = it.next(); it.remove(); try { handleInput(key); }catch (Exception e){ if (key != null ){ key.cancel(); if (key.channel() != null ){ key.channel().close(); } } } } }catch (Exception e){ e.printStackTrace(); System.exit(1 ); } } if (selector != null ) try { selector.close(); }catch (Exception e) { e.printStackTrace(); } } private void handleInput (SelectionKey key) throws IOException { if (key.isValid()){ SocketChannel sc = (SocketChannel) key.channel(); if (key.isConnectable()){ if (sc.finishConnect()); else System.exit(1 ); } if (key.isReadable()){ ByteBuffer buffer = ByteBuffer.allocate(1024 ); int readBytes = sc.read(buffer); if (readBytes>0 ){ buffer.flip(); byte [] bytes = new byte [buffer.remaining()]; buffer.get(bytes); String result = new String(bytes,"UTF-8" ); System.out.println("客户端收到消息:" + result); } else if (readBytes<0 ){ key.cancel(); sc.close(); } } } } private void doWrite (SocketChannel channel,String request) throws IOException { byte [] bytes = request.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } private void doConnect () throws IOException { if (socketChannel.connect(new InetSocketAddress(host,port))); else socketChannel.register(selector, SelectionKey.OP_CONNECT); } public void sendMsg (String msg) throws Exception { socketChannel.register(selector, SelectionKey.OP_WRITE); doWrite(socketChannel, msg); } }
测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import java.util.Random;public class NIOTest { public static void main (String[] args) throws Exception { Server.start(); Thread.sleep(1000 ); Client.start(); Thread.sleep(3000 ); Random random = new Random(System.currentTimeMillis()); char operators[] = {'+' ,'-' ,'*' ,'/' }; while ( Client.sendMsg(random.nextInt(10 )+"" +operators[random.nextInt(4 )]+(random.nextInt(10 )+1 ))){ Thread.sleep(random.nextInt(1000 )); }; } }
nio 结构
服务端打开ServerSocketChannel,监听客户端连接 绑定监听端口,设置连接为非阻塞模式 创建Reactor线程,创建多路复用器并启动线程 将ServerSocketChannel注册到Reactor线程中的Selector上,监听ACCEPT事件 Selector轮询准备就绪的key Selector监听到新的客户端接入,处理新的接入请求,完成TCP三次握手,建立链路 客户端设置客户端链路为非阻塞模式 将新接入的客户端连接注册到Reactor线程的Selector上,监听读操作,读取客户端发送的网络消息 异步读取客户端消息到缓冲区 对Buffer编解码,处理半包消息,将解码成功的消息封装成Task 将应答消息编码为Buffer,调用SocketChannel的write将消息异步发送给客户端 Selector(多路复用器|选择某个通道器)选择器类管理着一个被注册的通道集合的信息和它们的就绪状态。 通道是和选择器一起被注册的,并且使用选择器来更新通道的就绪状态, 当这么做的时候,可以选择将被激发的线程挂起直到有就绪的通道。 使用Selector的好处在于: 使用更少的线程来就可以来处理通道了, 相比使用多个线程,避免了线程上下文切换带来的开销。
SelectionKey表示了一个特定的通道对象和一个特定的选择器对象之间的注册关系。 key.attachment(); //返回SelectionKey的attachment,attachment可以在注册channel的时候指定。 key.channel(); // 返回该SelectionKey对应的channel。 key.selector(); // 返回该SelectionKey对应的Selector。 key.interestOps(); //返回代表需要Selector监控的IO操作的bit mask key.readyOps(); // 返回一个bit mask,代表在相应channel上可以进行的IO操作。
事件名 对应值 服务端接收客户端连接事件 SelectionKey.OP_ACCEPT(16) 客户端连接服务端事件 SelectionKey.OP_CONNECT(8) 读事件 SelectionKey.OP_READ(1) 写事件 SelectionKey.OP_WRITE(4)
buffer(解决bio中数据不可重复读的问题)存储基本类型数组数据:ByteBuffer、CharBuffer、FloatBuffer、ShortBuffer、StringCharBuffer等等 这些方法中大部分是对mark、position、limit、capacity的操作。 对于数组来说,需要以下一些重要元素,比如数组大小(capacity) 此时如果是对数组的读取操作时,需要表明当前读到了哪个位置(position),总共可以读到哪个位置(limit),也就是当前数组中有几个元素。 此时如果是写操作,那么需要知道现在写到了哪个位置(position),最大可以写到哪个位置(limit) 最后为了实现可重复读,产生一个备忘位置,即标记(mark)。 源码(只截取部分):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 public abstract class Buffer { private int mark = -1 ; private int position = 0 ; private int limit; private int capacity; Buffer(int mark, int pos, int lim, int cap) { if (cap < 0 ) throw new IllegalArgumentException("Negative capacity: " + cap); this .capacity = cap; limit(lim); position(pos); if (mark >= 0 ) { if (mark > pos) throw new IllegalArgumentException("mark > position: (" + mark + " > " + pos + ")" ); this .mark = mark; } } public final int capacity () { return capacity; } public final int position () { return position; } public final Buffer position (int newPosition) { if ((newPosition > limit) || (newPosition < 0 )) throw new IllegalArgumentException(); position = newPosition; if (mark > position) mark = -1 ; return this ; } public final int limit () { return limit; } public final Buffer limit (int newLimit) { if ((newLimit > capacity) || (newLimit < 0 )) throw new IllegalArgumentException(); limit = newLimit; if (position > limit) position = limit; if (mark > limit) mark = -1 ; return this ; } public final Buffer mark () { mark = position; return this ; } public final Buffer reset () { int m = mark; if (m < 0 ) throw new InvalidMarkException(); position = m; return this ; } public final Buffer clear () { position = 0 ; limit = capacity; mark = -1 ; return this ; } public final Buffer flip () { limit = position; position = 0 ; mark = -1 ; return this ; } public final Buffer rewind () { position = 0 ; mark = -1 ; return this ; } }
直接缓冲区与非直接缓冲区(ByteBuffer):
非直接缓冲区: 优点:在虚拟机内存中创建,易回收 缺点:但占用虚拟机内存开销,处理中有复制过程。 直接缓冲区: 优点:在虚拟机内存外,开辟的内存,IO操作直接进行,没有再次复制 缺点:创建和销毁开销大,没有管理权(基于系统的物理内存没有分代回收机制)
用通俗的话讲就是,比如你是个小组长(jvm堆内存),你管理者你底下的人, 但是你的领导(内核[物理空间])要知道你的情况,你需要把你的组内的情况汇报给他(复制),而他自己本身只知道你的情况,你下面人的情况他是不了解的也是不关心的,相当于他把这块区域分配给你,至于你要干什么,他是不管的
JVM创建一个缓冲区的时候,实际上做了如下几件事:
JVM确保Heap区域内的空间足够,如果不够则使用触发GC在内的方法获得空间; 获得空间之后会找一组堆内的连续地址分配数组, 这里需要注意的是,在物理内存上,这些字节是不一定连续的; 对于不涉及到IO的操作,这样的处理没有任何问题,但是当进行IO操作的时候就会出现一点性能问题. 所有的IO操作都需要操作系统进入内核态才行,而JVM进程属于用户态进程, 当JVM需要把一个缓冲区写到某个Channel或Socket的时候,需要切换到内核态. 而内核态由于并不知道JVM里面这个缓冲区存储在物理内存的什么地址,并且这些物理地址并不一定是连续的(或者说不一定是IO操作需要的块结构), 所以在切换之前JVM需要把缓冲区复制到物理内存一块连续的内存上, 然后由内核去读取这块物理内存,整合成连续的、分块的内存. 三、AIO 代码示例:服务端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 import javax.script.ScriptEngine;import javax.script.ScriptEngineManager;import javax.script.ScriptException;import java.io.IOException;import java.io.UnsupportedEncodingException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousServerSocketChannel;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import java.util.concurrent.CountDownLatch;public class Server { private static int DEFAULT_PORT = 12345 ; private static AsyncServerHandler serverHandle; public volatile static long clientCount = 0 ; public static void start () { start(DEFAULT_PORT); } public static synchronized void start (int port) { if (serverHandle!=null ) return ; serverHandle = new AsyncServerHandler(port); new Thread(serverHandle,"Server" ).start(); } public static void main (String[] args) { Server.start(); } } class AsyncServerHandler implements Runnable { public CountDownLatch latch; public AsynchronousServerSocketChannel channel; public AsyncServerHandler (int port) { try { channel = AsynchronousServerSocketChannel.open(); channel.bind(new InetSocketAddress(port)); System.out.println("服务器已启动,端口号:" + port); } catch (IOException e) { e.printStackTrace(); } } @Override public void run () { latch = new CountDownLatch(1 ); channel.accept(this ,new AcceptHandler()); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } } class AcceptHandler implements CompletionHandler <AsynchronousSocketChannel , AsyncServerHandler > { @Override public void completed (AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) { Server.clientCount++; System.out.println("连接的客户端数:" + Server.clientCount); serverHandler.channel.accept(serverHandler, this ); ByteBuffer buffer = ByteBuffer.allocate(1024 ); channel.read(buffer, buffer, new ReadHandler(channel)); } @Override public void failed (Throwable exc, AsyncServerHandler serverHandler) { exc.printStackTrace(); serverHandler.latch.countDown(); } } class ReadHandler implements CompletionHandler <Integer , ByteBuffer > { private AsynchronousSocketChannel channel; public ReadHandler (AsynchronousSocketChannel channel) { this .channel = channel; } @Override public void completed (Integer result, ByteBuffer attachment) { attachment.flip(); byte [] message = new byte [attachment.remaining()]; attachment.get(message); try { String expression = new String(message, "UTF-8" ); System.out.println("服务器收到消息: " + expression); String calrResult = null ; try { calrResult = Calculator.cal(expression).toString(); }catch (Exception e){ calrResult = "计算错误:" + e.getMessage(); } doWrite(calrResult); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } private void doWrite (String result) { byte [] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed (Integer result, ByteBuffer buffer) { if (buffer.hasRemaining()) channel.write(buffer, buffer, this ); else { ByteBuffer readBuffer = ByteBuffer.allocate(1024 ); channel.read(readBuffer, readBuffer, new ReadHandler(channel)); } } @Override public void failed (Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { } } }); } @Override public void failed (Throwable exc, ByteBuffer attachment) { try { this .channel.close(); } catch (IOException e) { e.printStackTrace(); } } } final class Calculator { private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript" ); public static Object cal (String expression) throws ScriptException { return jse.eval(expression); } }
客户端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 import java.io.IOException;import java.io.UnsupportedEncodingException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import java.util.Scanner;import java.util.concurrent.CountDownLatch;public class Client { private static String DEFAULT_HOST = "127.0.0.1" ; private static int DEFAULT_PORT = 12345 ; private static AsyncClientHandler clientHandle; public static void start () { start(DEFAULT_HOST,DEFAULT_PORT); } public static synchronized void start (String ip,int port) { if (clientHandle!=null ) return ; clientHandle = new AsyncClientHandler(ip,port); new Thread(clientHandle,"Client" ).start(); } public static boolean sendMsg (String msg) throws Exception { if (msg.equals("q" )) return false ; clientHandle.sendMsg(msg); return true ; } @SuppressWarnings("resource") public static void main (String[] args) throws Exception { Client.start(); System.out.println("请输入请求消息:" ); Scanner scanner = new Scanner(System.in); while (Client.sendMsg(scanner.nextLine())); } } class AsyncClientHandler implements CompletionHandler <Void , AsyncClientHandler >, Runnable { private AsynchronousSocketChannel clientChannel; private String host; private int port; private CountDownLatch latch; public AsyncClientHandler (String host, int port) { this .host = host; this .port = port; try { clientChannel = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run () { latch = new CountDownLatch(1 ); clientChannel.connect(new InetSocketAddress(host, port), this , this ); try { latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } @Override public void completed (Void result, AsyncClientHandler attachment) { System.out.println("客户端成功连接到服务器..." ); } @Override public void failed (Throwable exc, AsyncClientHandler attachment) { System.err.println("连接服务器失败..." ); exc.printStackTrace(); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { e.printStackTrace(); } } public void sendMsg (String msg) { System.out.println("算术表达式为:" + msg); byte [] req = msg.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch)); } } class WriteHandler implements CompletionHandler <Integer , ByteBuffer > { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public WriteHandler (AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this .clientChannel = clientChannel; this .latch = latch; } @Override public void completed (Integer result, ByteBuffer buffer) { if (buffer.hasRemaining()) { clientChannel.write(buffer, buffer, this ); } else { ByteBuffer readBuffer = ByteBuffer.allocate(1024 ); clientChannel.read(readBuffer,readBuffer,new ClientReadHandler(clientChannel, latch)); } } @Override public void failed (Throwable exc, ByteBuffer attachment) { System.err.println("数据发送失败..." ); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } } class ClientReadHandler implements CompletionHandler <Integer , ByteBuffer > { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public ClientReadHandler (AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this .clientChannel = clientChannel; this .latch = latch; } @Override public void completed (Integer result,ByteBuffer buffer) { buffer.flip(); byte [] bytes = new byte [buffer.remaining()]; buffer.get(bytes); String body; try { body = new String(bytes,"UTF-8" ); System.out.println("客户端收到结果:" + body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed (Throwable exc,ByteBuffer attachment) { System.err.println("数据读取失败..." ); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }
测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import java.util.Random;import java.util.Scanner;public class AIOTest { public static void main (String[] args) throws Exception { Server.start(); Thread.sleep(1000 ); Client.start(); Thread.sleep(3000 ); Random random = new Random(System.currentTimeMillis()); char operators[] = {'+' ,'-' ,'*' ,'/' }; while (Client.sendMsg(random.nextInt(10 )+"" +operators[random.nextInt(4 )]+(random.nextInt(10 )+1 ))){ Thread.sleep(random.nextInt(1000 )); } } }
四、总结 同步和异步我认为的网络层面io的同步和异步描述的是一种消息通知的机制,主动等待消息返回 还是被动接受消息 同步io:指的是调用方通过主动等待获取调用返回的结果来获取消息通知。 异步io:指的是被调用方通过某种方式(如,回调函数)来通知调用方获取消息。
阻塞和非阻塞NIO、AIO为什么被称为非阻塞?
BIO在发起读请求以后,会一直等待,一直到拿到结果 NIO在发起读请求以后,不会立即拿到结果 AIO通过回调方法,被动的接受 BIO(blocking IO):同步阻塞式IO 面向流 操作字节或字符 单向传输数据
NIO(non blocking IO):同步非阻塞式IO 面向通道 操作缓冲区 双向传输数据
AIO(async IO):同步非阻塞式IO 大量使用回调函数 异步处理通信过程 异步的双向传输数据