1.Socket鏈接的建立 java程式(server) 在應用空間中運行,當建立一個socket鏈接時,會向內核空間中的內核程式(sc)發送指令,內核程式中一定會執行Socket(AF_UNIX,SOCK_STAM,0) -> fd(文件標識符)6 --傳遞-> bind(6,9999) 綁定埠 ...
1.Socket鏈接的建立
java程式(server) 在應用空間中運行,當建立一個socket鏈接時,會向內核空間中的內核程式(sc)發送指令,內核程式中一定會執行Socket(AF_UNIX,SOCK_STAM,0) -> fd(文件標識符)6 --傳遞-> bind(6,9999) 綁定埠和文件標識符 --監聽-->listen(6)
2.BIO (Blocking I/O)
概念:同步阻塞型IO,對於客戶端的每個鏈接都將開啟一個新的線程處理.
流程:在BIO中apccet的阻塞是在內核程式中阻塞,在使用strace 監控線程時可以看到 accept(6, 後停止列印,直到有鏈接建立然後才會繼續剩餘代碼的執行. IO的阻塞在strace中可以看到是recv(5 停止列印直到有數據傳輸
優點:可以與多個客戶端建立鏈接
缺點:一鏈接一線程,導致記憶體浪費.鏈接建立的越多,CPU線程切換更加頻繁.
代碼:
1 2 3 import java.io.*; 4 import java.net.ServerSocket; 5 import java.net.Socket; 6 import java.util.concurrent.*; 7 8 /** 9 * @author baiyang 10 * @version 1.0 11 * @date 2020/6/9 11:10 下午 12 */ 13 public class Server { 14 15 public static void main(String[] args) throws Exception { 16 17 ExecutorService executorService = Executors.newFixedThreadPool(50); 18 final ServerSocket serverSocket = new ServerSocket(9999); 19 while (true){ 20 Socket socket = serverSocket.accept(); // 阻塞 21 new Thread(() ->{ 22 BufferedReader bufferedReader = null; 23 int port = socket.getPort(); 24 System.out.println(port); 25 try { 26 bufferedReader = new BufferedReader(new InputStreamReader(new BufferedInputStream(socket.getInputStream()))); // IO阻塞 27 String clientMessage = bufferedReader.readLine(); 28 System.out.println(clientMessage); 29 30 BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new BufferedOutputStream(socket.getOutputStream()))); 31 bufferedWriter.write("this is service\n"); 32 bufferedWriter.flush(); 33 bufferedWriter.close(); 34 bufferedReader.close(); 35 } catch (IOException e) { 36 e.printStackTrace(); 37 } 38 }).start(); 39 40 } 41 } 42 }
1 package com.diandian.client.bio; 2 3 import java.io.*; 4 import java.net.InetSocketAddress; 5 import java.net.Socket; 6 7 /** 8 * @author baiyang 9 * @version 1.0 10 * @date 2020/6/9 11:18 下午 11 */ 12 public class Client { 13 14 public static void main(String[] args) throws Exception { 15 Socket socket = new Socket(); 16 socket.connect(new InetSocketAddress(9090)); 17 BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new BufferedOutputStream(socket.getOutputStream()))); 18 bufferedWriter.write("this is client\n"); 19 bufferedWriter.flush(); 20 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new BufferedInputStream(socket.getInputStream()))); 21 String clientMessage = bufferedReader.readLine(); 22 bufferedReader.close(); 23 bufferedWriter.close(); 24 System.out.println(clientMessage); 25 26 27 } 28 }View Code
3.NIO(java -> New I/O 操作系統 -> NONBLOCKING)
概念:在java中NIO 指的是new IO 1.4版本之後新的一個包出現
在操作系統中的體現是NONBLOCKING的支持
同步非阻塞
流程:在NIO中,ServerSocketChannel類中調用方法configureBlocking(false),accept將不在阻塞.當沒有鏈接時accept() = -1,當客戶端沒有發送信息時recv() = -1.BIO中的阻塞就解決了
優點: 解決了BIO多線程的問題,解決了C10K問題
缺點: 迴圈遍歷已鏈接的客戶端,實現監控是否有數據寫入. ---> 每一次的迴圈將會向內核程式發送一條指令,假設有1W個鏈接建立且只有1個客戶端向伺服器發送指令,那麼需要向內核發送1W次指令,無效指令數9999次.
代碼:
1 package com.diandian.server.nio; 2 3 import java.net.InetSocketAddress; 4 import java.nio.ByteBuffer; 5 import java.nio.channels.ServerSocketChannel; 6 import java.nio.channels.SocketChannel; 7 import java.util.ArrayList; 8 9 /** 10 * @author baiyang 11 * @version 1.0 12 * @date 2020/6/11 11:32 下午 13 */ 14 public class NioServer { 15 16 public static void main(String[] args) { 17 // 保存已鏈接的客戶端 18 ArrayList<SocketChannel> clients = new ArrayList<>(); 19 try { 20 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); 21 serverSocketChannel.bind(new InetSocketAddress(9999)); 22 // 設置false不阻塞 調用 OS NONBLOCKING 23 serverSocketChannel.configureBlocking(false); 24 while (true) { 25 // 可有可無方便測試 26 Thread.sleep(2000); 27 SocketChannel client = serverSocketChannel.accept(); 28 if (null == client) { 29 System.out.println("沒有客戶端建立鏈接"); 30 } else { 31 client.configureBlocking(false); 32 clients.add(client); 33 System.out.println(client.socket().getPort()); 34 } 35 ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024); 36 for (SocketChannel s : clients){ 37 int read = s.read(byteBuffer); 38 if(read > 0){ 39 byteBuffer.flip(); 40 byte[] bytes = new byte[byteBuffer.limit()]; 41 byteBuffer.get(bytes); 42 System.out.println("收到客戶端信息:"+new String(bytes)); 43 byteBuffer.clear(); 44 } 45 } 46 47 } 48 } catch (Exception e) { 49 e.printStackTrace(); 50 } 51 } 52 }
4.多路復用
本質上select poll epoll 都是IO同步的,讀寫操作在就緒後都將由自己進行讀寫,所以讀寫操作是阻塞的
1 package com.diandian.service.nio; 2 3 import java.net.InetSocketAddress; 4 import java.nio.ByteBuffer; 5 import java.nio.channels.SelectionKey; 6 import java.nio.channels.Selector; 7 import java.nio.channels.ServerSocketChannel; 8 import java.nio.channels.SocketChannel; 9 import java.util.Iterator; 10 import java.util.Set; 11 12 /** 13 * @author baiyang 14 * @version 1.0 15 * @date 2020/6/12 9:46 上午 16 */ 17 public class NioSelect { 18 19 public static void main(String[] args) { 20 try { 21 ServerSocketChannel server = ServerSocketChannel.open(); 22 server.configureBlocking(false); 23 server.bind(new InetSocketAddress(9999)); 24 // 開啟selector 25 Selector selector = Selector.open(); 26 // 將accept註冊到selector 27 server.register(selector,SelectionKey.OP_ACCEPT); 28 while (true){ 29 System.out.println( " key Sizes:" + selector.selectedKeys().size()); 30 while (selector.select(1000) > 0){ 31 Set<SelectionKey> keys = selector.selectedKeys(); 32 Iterator<SelectionKey> iterator = keys.iterator(); 33 while (iterator.hasNext()){ 34 SelectionKey next = iterator.next(); 35 // 移除防止重覆遍歷 36 iterator.remove(); 37 // 是否有新的鏈接進來 38 if(next.isAcceptable()){ 39 ServerSocketChannel channel = (ServerSocketChannel) next.channel(); 40 SocketChannel cline = channel.accept(); 41 // 設置非阻塞 42 cline.configureBlocking(false); 43 System.out.println(cline.socket().getPort()); 44 ByteBuffer buffer = ByteBuffer.allocateDirect(1024); 45 // 將read事件註冊到selector中 46 cline.register(selector,SelectionKey.OP_READ,buffer); 47 }else if (next.isReadable()){ 48 SocketChannel client = (SocketChannel) next.channel(); 49 ByteBuffer buffer = ByteBuffer.allocateDirect(1024); 50 buffer.clear(); 51 int read; 52 while (true) { 53 read = client.read(buffer); 54 if (read > 0) { 55 buffer.flip(); 56 while (buffer.hasRemaining()) { 57 client.write(buffer); 58 } 59 buffer.clear(); 60 } else if (read == 0) { 61 break; 62 } else { 63 client.close(); 64 break; 65 } 66 } 67 } 68 69 } 70 } 71 } 72 73 } catch (Exception e) { 74 e.printStackTrace(); 75 } 76 } 77 }
4.1 select poll
概念:NIO中解決多次向內核程式發送無效指令問題
流程: select(fd) --全量遍歷--> 用戶空間
優點:減少了向內核程式發送指令次數,一次將所有鏈接的fd發送給內核,由內核遍歷
缺點:每次都將全量的fd進行發送,內核將進行全量遍歷,只有一個selector來來進行監控accept() IO操作
select和poll的區別: 本質上是一致的, poll用鏈表存儲無鏈接限制. select 監聽fd有鏈接數量限制,LINUX32位預設 1024 64位預設 2048
4.2 epoll (even poll)
概念:會將有發生IO操作的fd通知到應用程式
優點:不再每次進行全量的遍歷 複雜度降低至O(1)可使用多個selector進行監控
流程: 建立鏈接後 epoll_create(256) -> 7 ----> epoll_crl(7,ADD,6(scoket建立時返回的文件標識符),accept) 將6的accept註冊到7空間(紅黑樹保存fd)中 ----> epoll_wait() 阻塞的等待客戶端的鏈接 O(1) -accept(6)->8 與客戶端建立鏈接8 -> epoll_crl(7,ADD,8,read)將讀事件註冊到7空間 ----> epoll_wait(6,8)