Java 網路編程 —— 實現非阻塞式的伺服器

来源:https://www.cnblogs.com/Yee-Q/archive/2023/05/20/17416992.html
-Advertisement-
Play Games

## 創建阻塞的伺服器 當 `ServerSocketChannel` 與 `SockelChannel` 採用預設的阻塞模式時,為了同時處理多個客戶的連接,必須使用多線程 ```java public class EchoServer { private int port = 8000; priv ...


創建阻塞的伺服器

ServerSocketChannelSockelChannel 採用預設的阻塞模式時,為了同時處理多個客戶的連接,必須使用多線程

public class EchoServer {
    
	private int port = 8000;
    private ServerSocketChannel serverSocketChannel = null;
    private ExecutorService executorService; //線程池
    private static final int POOL_MULTIPLE = 4; //線程池中工作線程的數目
    
    public EchoServer() throws IOException {
        //創建一個線程池
        executorService = Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors() * POOL_MULTIPLE);
        //創建一個ServerSocketChannel對象
        serverSocketChannel = ServerSocketChannel.open();
        //使得在同一個主機上關閉了伺服器程式,緊接著再啟動該伺服器程式時,可以順利綁定相同的埠
        serverSocketChannel.socket().setReuseAddress(true);
        //把伺服器進程與一個本地埠綁定
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        System.out.println("伺服器啟動");
    }
    
    public void service() {
        while (true) {
            SocketChannel socketChannel = null;
            try {
                socketChannel = serverSocketChannel.accept();
                //處理客戶連接
                executorService.execute(new Handler(socketChannel));
            } catch(IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    public static void main(String args[])throws IOException {
        new EchoServer().service();
    }
    
    //處理客戶連按
    class Handler implements Runnable {

        private SocketChannel socketChannel;
		
        public Handler(SocketChannel socketChannel) {
            this.socketChannel = socketChannel;
        }
        
        public void run() {
            handle(socketChannel);
        }
        
        public void handle(SocketChannel socketChannel) {
            try {
                //獲得與socketChannel關聯的Socket對象
                Socket socket = socketChannel.socket();
                System.out.println("接收到客戶連接,來自:" + socket.getInetAddress() + ":" + socket.getPort());
                
                BufferedReader br = getReader(socket);
                PrintWriter pw = getWriter(socket);
                
                String msg = null;
                while ((msg = br.readLine()) != null) {
                    System.out.println(msg);
                    pw.println(echo(msg));
                    if (msg.equals("bye")) {
                        break;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if(socketChannel != null) {
                        socketChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    } 
    
    private PrintWriter getWriter(Socket socket) throws IOException {
        OutputStream socketOut = socket.getOutputStream();
        return new PrintWriter(socketOut,true);
    }
    
    private BufferedReader getReader(Socket socket) throws IOException {
        InputStream socketIn = socket.getInputStream();
        return new BufferedReader(new InputStreamReader(socketIn));
    }
    
    public String echo(String msg) {
        return "echo:" + msg;
    }
}

創建非阻塞的伺服器

在非阻塞模式下,EchoServer 只需要啟動一個主線程,就能同時處理三件事:

  • 接收客戶的連接
  • 接收客戶發送的數據
  • 向客戶發迴響應數據

EchoServer 委托 Selector 來負責監控接收連接就緒事件、讀就緒事件和寫就緒事件如果有特定事件發生,就處理該事件

// 創建一個Selector對象
selector = Selector.open();
//創建一個ServerSocketChannel對象
serverSocketChannel = ServerSocketChannel.open();
//使得在同一個主機上關閉了伺服器程式,緊接著再啟動該伺服器程式時
//可以順利綁定到相同的埠
serverSocketChannel.socket().setReuseAddress(true);
//使ServerSocketChannel工作於非阻塞模式
serverSocketChannel.configureBlocking(false):
//把伺服器進程與一個本地埠綁定
serverSocketChannelsocket().bind(new InetSocketAddress(port));

EchoServer 類的 service() 方法負責處理本節開頭所說的三件事,體現其主要流程的代碼如下:

public void service() throws IOException {
    serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT);
    //第1層while迴圈
    while(selector.select() > 0) {
        //獲得Selector的selected-keys集合
        Set readyKeys = selector.selectedKeys();
        Iterator it = readyKeys.iterator();
        //第2層while迴圈
        while (it.hasNext()) {
            SelectionKey key = null;
            //處理SelectionKey
            try {
                //取出一個SelectionKey
                key = (SelectionKey) it.next();
                //把 SelectionKey從Selector 的selected-key 集合中刪除
                it.remove();
                1f (key.isAcceptable()) { 處理接收連接就緒事件; }
                if (key.isReadable()) { 處理讀就緒水件; }
                if (key.isWritable()) { 處理寫就緒事件; }
            } catch(IOException e) {
                e.printStackTrace();
                try {
                    if(key != null) {
                        //使這個SelectionKey失效
                        key.cancel();
                        //關閉與這個SelectionKey關聯的SocketChannel
                        key.channel().close();
                    }
                } catch(Exception ex) { 
                    e.printStackTrace();
                }
            }
        }
    }
}
  • 首先由 ServerSocketChannelSelector 註冊接收連接就緒事件,如果 Selector 監控到該事件發生,就會把相應的 SelectionKey 對象加入 selected-keys 集合
  • 第一層 while 迴圈,不斷詢問 Selector 已經發生的事件,select() 方法返回當前相關事件已經發生的 SelectionKey 的個數,如果當前沒有任何事件發生,該方法會阻塞下去,直到至少有一個事件發生。SelectorselectedKeys() 方法返回 selected-keys 集合,它存放了相關事件已經發生的 SelectionKey 對象
  • 第二層 while 迴圈,從 selected-keys 集合中依次取出每個 SelectionKey 對象並從集合中刪除,,然後調用 isAcceptable()isReadable()isWritable() 方法判斷到底是哪種事件發生了,從而做出相應的處理

1. 處理接收連接就緒事件

if (key.isAcceptable()) {
    //獲得與SelectionKey關聯的ServerSocketChannel
    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
    //獲得與客戶連接的SocketChannel
    SocketChannel socketChannel = (SocketChannel) ssc.accept();
    //把Socketchannel設置為非阻塞模式
    socketChannel.configureBlocking(false);
    //創建一個用於存放用戶發送來的數據的級沖區
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    //Socketchannel向Selector註冊讀就緒事件和寫就緒事件
    socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
}

2. 處理讀就緒事件

public void receive(SelectionKey key) throws IOException {
    //獲得與SelectionKey關聯的附件
    ByteBuffer buffer = (ByteBuffer) key.attachment();
    //獲得與SelectionKey關聯的Socketchannel
    SocketChannel socketChannel = (SocketChannel)key.channel();
    //創建一個ByteBuffer用於存放讀到的數據
    ByteBuffer readBuff = ByteBuffer.allocate(32);
    socketChannel.read(readBuff);
    readBuff.flip();
    //把buffer的極限設為容量
    buffer.limit(buffer.capacity());
    //把readBuff中的內容拷貝到buffer
    buffer.put(readBuff);
}

3. 處理寫就緒事件

public void send(SelectionKey key) throws IOException {
    //獲得與SelectionKey關聯的ByteBuffer
    ByteBuffer buffer = (ByteBuffer) key.attachment();
    //獲得與SelectionKey關聯的SocketChannel
    SocketChannel socketChannel = (SocketChannel) key.channel();
    buffer.flip();
    //按照GBK編碼把buffer中的位元組轉換為字元串
    String data = decode(buffer);
    //如果還沒有讀到一行數據就返回
    if(data.indexOf("\r\n") == -1)
        return;
    //截取一行數據
    String outputData = data.substring(0, data.indexOf("\n") + 1);
    //把輸出的字元串按照GBK編碼轉換為位元組,把它放在outputBuffer中
    ByteBuffer outputBuffer = encode("echo:" + outputData);
    //輸出outputBuffer的所有位元組
    while(outputBuffer,hasRemaining())
        socketChannel.write(outputBuffer);
    //把outputData字元審按照GBK編碼,轉換為位元組,把它放在ByteBuffer
    ByteBuffer temp = encode(outputData);
    //把buffer的位置設為temp的極限
    buffer.position(temp.limit()):
    //刪除buffer已經處理的數據
    buffer.compact();
    //如果已經輸出了字元串“bye\r\n”,就使SelectionKey失效,並關閉SocketChannel
    if(outputData.equals("bye\r\n")) {
        key.cancel();
        socketChannel.close();
    }
}

完整代碼如下:

public class EchoServer {
    
	private int port = 8000;
    private ServerSocketChannel serverSocketChannel = null;
    private Selector selector;
    private Charset charset = Charset.forName("GBK");

	public EchoServer() throws IOException {
        // 創建一個Selector對象
        selector = Selector.open();
        //創建一個ServerSocketChannel對象
        serverSocketChannel = ServerSocketChannel.open();
        //使得在同一個主機上關閉了伺服器程式,緊接著再啟動該伺服器程式時
        //可以順利綁定到相同的埠
        serverSocketChannel.socket().setReuseAddress(true);
        //使ServerSocketChannel工作於非阻塞模式
        serverSocketChannel.configureBlocking(false):
        //把伺服器進程與一個本地埠綁定
        serverSocketChannelsocket().bind(new InetSocketAddress(port));
    }
    
    public void service() throws IOException {
        serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT);
        //第1層while迴圈
        while(selector.select() > 0) {
            //獲得Selector的selected-keys集合
            Set readyKeys = selector.selectedKeys();
            Iterator it = readyKeys.iterator();
            //第2層while迴圈
            while (it.hasNext()) {
                SelectionKey key = null;
                //處理SelectionKey
                try {
                    //取出一個SelectionKey
                    key = (SelectionKey) it.next();
                    //把 SelectionKey從Selector 的selected-key 集合中刪除
                    it.remove();
                    1f (key.isAcceptable()) {
                         //獲得與SelectionKey關聯的ServerSocketChannel
                        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                        //獲得與客戶連接的SocketChannel
                        SocketChannel socketChannel = (SocketChannel) ssc.accept();
                        //把Socketchannel設置為非阻塞模式
                        socketChannel.configureBlocking(false);
                        //創建一個用於存放用戶發送來的數據的級沖區
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        //Socketchannel向Selector註冊讀就緒事件和寫就緒事件
                        socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
                    }
                    if (key.isReadable()) { receive(key); }
                    if (key.isWritable()) { send(key); }
                } catch(IOException e) {
                    e.printStackTrace();
                    try {
                        if(key != null) {
                            //使這個SelectionKey失效
                            key.cancel();
                            //關閉與這個SelectionKey關聯的SocketChannel
                            key.channel().close();
                        }
                    } catch(Exception ex) { 
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    
    public void receive(SelectionKey key) throws IOException {
        //獲得與SelectionKey關聯的附件
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        //獲得與SelectionKey關聯的Socketchannel
        SocketChannel socketChannel = (SocketChannel)key.channel();
        //創建一個ByteBuffer用於存放讀到的數據
        ByteBuffer readBuff = ByteBuffer.allocate(32);
        socketChannel.read(readBuff);
        readBuff.flip();
        //把buffer的極限設為容量
        buffer.limit(buffer.capacity());
        //把readBuff中的內容拷貝到buffer
        buffer.put(readBuff);
    }
    
    public void send(SelectionKey key) throws IOException {
        //獲得與SelectionKey關聯的ByteBuffer
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        //獲得與SelectionKey關聯的SocketChannel
        SocketChannel socketChannel = (SocketChannel) key.channel();
        buffer.flip();
        //按照GBK編碼把buffer中的位元組轉換為字元串
        String data = decode(buffer);
        //如果還沒有讀到一行數據就返回
        if(data.indexOf("\r\n") == -1)
            return;
        //截取一行數據
        String outputData = data.substring(0, data.indexOf("\n") + 1);
        //把輸出的字元串按照GBK編碼轉換為位元組,把它放在outputBuffer中
        ByteBuffer outputBuffer = encode("echo:" + outputData);
        //輸出outputBuffer的所有位元組
        while(outputBuffer,hasRemaining())
            socketChannel.write(outputBuffer);
        //把outputData字元審按照GBK編碼,轉換為位元組,把它放在ByteBuffer
        ByteBuffer temp = encode(outputData);
        //把buffer的位置設為temp的極限
        buffer.position(temp.limit()):
        //刪除buffer已經處理的數據
        buffer.compact();
        //如果已經輸出了字元串“bye\r\n”,就使SelectionKey失效,並關閉SocketChannel
        if(outputData.equals("bye\r\n")) {
            key.cancel();
            socketChannel.close();
        }
    }
    
    //解碼
    public String decode(ByteBuffer buffer) {
        CharBuffer charBuffer = charset.decode(buffer);
        return charBuffer.toStrinq();
    }
    
    //編碼
    public ByteBuffer encode(String str) {
        return charset.encode(str);
    }
    
    public static void main(String args[])throws Exception {
        EchoServer server = new EchoServer();
        server.service();
    }
}

阻塞模式與非阻塞模式混合使用

使用非阻塞模式時,ServerSocketChannel 以及 SocketChannel 都被設置為非阻塞模式,這使得接收連接、接收數據和發送數據的操作都採用非阻塞模式,EchoServer 採用一個線程同時完成這些操作

假如有許多客戶請求連接,可以把接收客戶連接的操作單獨由一個線程完成,把接收數據和發送數據的操作由另一個線程完成,這可以提高伺服器的併發性能

負責接收客戶連接的線程按照阻塞模式工作,如果收到客戶連接,就向 Selector 註冊讀就緒和寫就緒事件,否則進入阻塞狀態,直到接收到了客戶的連接。負責接收數據和發送數據的線程按照非阻塞模式工作,只有在讀就緒或寫就緒事件發生時,才執行相應的接收數據和發送數據操作

public class EchoServer {
    
	private int port = 8000;
    private ServerSocketChannel serverSocketChannel = null;
    private Selector selector = null;
    private Charset charset = Charset.forName("GBK");

	public EchoServer() throws IOException {
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().setReuseAddress(true);
        serverSocketChannelsocket().bind(new InetSocketAddress(port));
    }
    
    public void accept() {
        while(true) {
            try {
                SocketChannel socketChannel = serverSocketChannel.accept();
                socketChannel.configureBlocking(false);
                
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                synchronized(gate) {
                    selector.wakeup();
                    socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
                }
            } catch(IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    private Object gate=new Object();
    
    public void service() throws IOException {
        while(true) {
            synchronized(gate){}
            int n = selector.select();
            if(n == 0) continue;
            Set readyKeys = selector.selectedKeys();
            Iterator it = readyKeys.iterator();
            while (it.hasNext()) {
                SelectionKey key = null;
                try {
    				it.remove();
                    if (key.isReadable()) {
                        receive(key);
                    }
                    if (key.isWritable()) {
                        send(key);
                    }
                } catch(IOException e) {
                    e.printStackTrace();
                    try {
                        if(key != null) {
                            key.cancel();
                            key.channel().close();
                        }
                    } catch(Exception ex) { e.printStackTrace(); }
                }
            }
        }
    }
    
    public void receive(SelectionKey key) throws IOException {
        ...
    }
    
    public void send(SelectionKey key) throws IOException {
        ...
    }
    
    public String decode(ByteBuffer buffer) {
        ...
    }
    
    public ByteBuffer encode(String str) {
        ...
    }
    
    public static void main(String args[])throws Exception {
        final EchoServer server = new EchoServer();
        Thread accept = new Thread() {
            public void run() {
                server.accept();
            }
        };
        accept.start();
		server.service();
    }
}

註意一點:主線程的 selector select() 方法和 Accept 線程的 register(...) 方法都會造成阻塞,因為他們都會操作 Selector 對象的共用資源 all-keys 集合,這有可能會導致死鎖

導致死鎖的具體情形是:Selector 中尚沒有任何註冊的事件,即 all-keys 集合為空,主線程執行 selector.select() 方法時將進入阻塞狀態,只有當 Accept 線程向 Selector 註冊了事件,並且該事件發生後,主線程才會從 selector.select() 方法返回。然而,由於主線程正在 selector.select() 方法中阻塞,這使得 Acccept 線程也在 register() 方法中阻塞。Accept 線程無法向 Selector 註冊事件,而主線程沒有任何事件可以監控,所以這兩個線程將永遠阻塞下去

為了避免對共用資源的競爭,同步機制使得一個線程執行 register() 時,不允許另一個線程同時執行 select() 方法,反之亦然



您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 關於JWT,可以說是分散式系統下的一個利器,我在我的很多項目實踐中,認證系統的第一選擇都是JWT。它的優勢會讓你欲罷不能,就像你領優惠券一樣。 ...
  • ## 創建阻塞的 EchoClient 客戶程式一般不需要同時建立與伺服器的多個連接,因此用一個線程,按照阻塞模式運行就能滿足需求 ```java public class EchoClient { private SocketChannel socketChannel = null; public ...
  • ## 1. 安裝Django 在命令行中輸入以下命令安裝Django ```shell pip install django ``` ## 2. 創建Django項目 在命令行中輸入以下命令創建一個名為myblog的Django項目 ```shell django-admin startprojec ...
  • ## 文章首發 [【重學C++】01| C++ 如何進行記憶體資源管理?](https://mp.weixin.qq.com/s/ZhRhN07wjypnkWXcu_Lz3g) ## 前言 大家好,我是只講技術乾貨的會玩code,今天是【重學C++】的第一講,我們來學習下C++的記憶體管理。 與java ...
  • 前言: 最近生產環境系統發現一個疑難雜症,看了很久的問題但是始終無法定位到問題並處理,然後查閱了相關資料也是定位不到問題,不過資料查閱卻給了個新的思路,以此為跳板最終解決了問題。 一、問題描述 功能介紹: “主計劃拆分子計劃”是APS系統很常見的功能,功能大概意思是用戶可選多個主計劃一次性進行“展開 ...
  • 本文圍繞 Spring Boot 中如何讓你的 bean 在其他 bean 之前完成載入展開討論。 問題 今天有個小伙伴給我出了一個難題:在 SpringBoot 中如何讓自己的某個指定的 Bean 在其他 Bean 前完成被 Spring 載入?我聽到這個問題的第一反應是,為什麼會有這樣奇怪的需求 ...
  • IDEA常用設置(提高開發效率) IDEA是一款當前比較主流的編譯器,我個人也用的比較多,但是有時出於各種原因,比如更換設備等等,IDEA總是需要重新安裝配置。這就讓我比較苦惱,因為總是記不全自己之前都修改了哪些地方(原諒腦子不好使hh),所以就以此篇文章記錄一下目前我的IDEA的設置情況。可能依舊 ...
  • # AOP概念 AOP(Aspect Oriented Programming),即面向切麵編程,可以說是OOP(Object Oriented Programming,面向對象編程)的補充和完善。OOP引入封裝、繼承、多態等概念來建立一種對象層次結構,用於模擬公共行為的一個集合。不過OOP允許開發 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...