Java 網路編程 —— 非同步通道和非同步運算結果

来源:https://www.cnblogs.com/Yee-Q/archive/2023/05/21/17418459.html
-Advertisement-
Play Games

從 JDK7 開始,引入了表示非同步通道的 `AsynchronousSockerChannel` 類和 `AsynchronousServerSocketChannel` 類,這兩個類的作用與 `SocketChannel` 類和 `ServerSockelChannel` 相似,區別在於非同步通道的 ...


從 JDK7 開始,引入了表示非同步通道的 AsynchronousSockerChannel 類和 AsynchronousServerSocketChannel 類,這兩個類的作用與 SocketChannel 類和 ServerSockelChannel 相似,區別在於非同步通道的一些方法總是採用非阻塞模式,並且它們的非阻塞方法會立即返回一個 Future 對象,用來存放方法的非同步運算結果

AsynchronousSocketChannel 類有以下非阻塞方法:

// 連接遠程主機
Future<Void> connect(SocketAddress remote);
// 從通道中讀入數據,存放到ByteBuffer中
// Future對象包含了實際從通道中讀到的位元組數
Future<Inleger> read(ByteBuffer dst);
// 把ByteBuffer的數據寫入通道
// Future對象包含了實際寫入通道的位元組數
Future<Integer> write(ByteBuffer src);

AsynchronousServerSocketChannel 類有以下非阻塞方法:

// 接受客戶連接請求
// Future對象包含連接建立成功後創建的AsynchronousSockelChannel對象
Future<AsynchronousSocketChannel> accept();

使用非同步通道,可以使程式並行執行多個非同步操作,例如:

SocketAddress socketAddress = ...;
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();

//請求建立連接
Future<Void> connected = client.connect(socketAddress);
ByteBuffer byteBuffer = ByteBuffer.allocate(128);

//執行其他操作
//...

//等待連接完成
connected.get();

//讀取數據
Future<Integer> future = client.read(byteBuffer);

//執行其他操作
//...

//等待從通道讀取數據完成
future.get();

byteBuffer.flip();
WritableByteChannel out = Channels.newChannel(System.out);
out.write(byteBuffer);

下例的代碼演示了非同步通道的用法,它不斷接收用戶輸入的功能變數名稱並嘗試建立連接,最後列印建立連接所花費的時間。如果程式無法連接到指定的主機,就列印相關錯誤信息。如果用戶輸入 bye,就結束程式

//表示連接一個主機的結果
class PingResult {
    
    InetSocketAddress address;
    long connectStart; //開始連接時的時間
    long connectFinish = 0; //連接成功時的時間
    String failure;
    Future<Void> connectResult; //連接操作的非同步運算結果
    AsynchronousSocketChannel socketChannel;
    String host;
    final String ERROR = "連接失敗";
        
    PingResult(String host) {
        try {
            this.host = host;
            address = new InetSocketAddress(InetAddress.getByName(host), 80);
        } catch (IOException x) {
            failure = ERROR;
        }
    }
    
    //列印連接一個主機的執行結果
    public void print() {
        String result;
        if (connectFinish != 0) {
            result = Long.toString(connectFinish - connectStart) + "ms";
        } else if (failure != null) {
			result = failure;
        } else {
            result = "Timed out";
        }
        System,out,println("ping "+ host + "的結果" + ":" + result);
    }
    
    public class PingClient {
        //存放所有PingResult結果的隊列
        private LinkedList<PingResult> pingResults = new Linkedlist<PingResult>();
        boolean shutdown = false;
        ExecutorService executorService;
        
        public PingClient() throws IOException {
            executorService = Executors.newFixedThreadPool(4);
            executorService.execute(new Printer());
            receivePingAddress();
        }
    }
    
    public static void main(String args[]) throws IOException {
        new PingClient();
    }
    
    /*接收用戶輸入的主機地址,由線程池執行PingHandler任務 */
    public void receivePingAddress() {
        try {
            BufferedReader localReader = new BufferedReader(new InputStreamReader(System.in));
            String msg = null;
            //接收用戶輸入的主機地址
            while((msg = localReader.readLine()) != null) {
                if(msg.equals("bye")) {
                    shutdown = true;
                    executorService.shutdown();
                    break;
                }
                executorService.execute(new PingHandler(msg));
            }
        } catch(IOException e) {}
    }
    
    /* 嘗試連接特定主機,生成一個PingResult對象,把它加入PingResults結果隊列中 */
    public class PingHandler implements Runnable {
        String msg;
        public PingHandler(String msg) {
            this.msg = msg;
        }
        public void run() {
            if(!msg.equals("bye")) {
                PingResult pingResult = new PingResult(msg);
                AsynchronousSocketChannel socketChannel = null;
                try {
                    socketChannel = AsynchronousSocketChannel.open();
                    pingResult.connectStart = System.currentTimeMillis();
                    synchronized (pingResults) {
                        //向pingResults隊列加入一個PingResult對象
                        pingResults.add(pingResult);
                        pingResults,notify();
                    }
                    Future<Void> connectResult = socketChannel.connect(pingResult.address);
                    pingResult.connectResult = connectResult;
                } catch (Exception x) {
                    if (socketChannel != null) {
                        try { socketChannel.close();} catch (IOException e) {)
                    }
                    pingResult.failure = pingResult.ERROR;
                }
            }
        }
    }
    
    /* 列印PingResults結果隊列中已經執行完畢的任務的結果 */
    public class Printer implements Runnable {
        public void run() {
            PingResult pingResult = null;
            while(!shutdown) {
                synchronized (pingResults) {
                    while (!shutdown && pingResults.size() == 0 ) {
                        try {
                            pingResults.wait(100);
                        } catch(InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    if(shutdown && pingResults.size() == 0 ) break;
                    pingResult = pingResults.getFirst();
                    
                    try {
                        if(pingResult.connectResult != null) {
                            pingResult.connectResult.get(500, TimeUnit,MILLISECONDS);
                        } catch(Exception e) {
                            pingResult.failure = pingResult.ERROR;
                        }
                    }
                    
                    if(pingResult.connectResult != null && pingResult.connectResult.isDone()) {
                        pingResult.connectFinish = System.currentTimeMillis();
                    }
                    
                    if(pingResult,connectResult != null && pingResult.connectResult.isDone() || || pingResult,failure != null) {
                        pingResult.print();
                        pingResults.removeFirst();
                        try {
                            pingResult.socketChannel.close();
                        } catch (IOException e) {}
                    }
                }
            }
        }
    }
}

PingClient 類定義了兩個表示特定任務的內部類:

  • PingHandler:負責通過非同步通道去嘗試連接客戶端輸入的主機地址,並且創建一個 PingResult 對象,它包含了連接操作的非同步運算結果,再將其加入 PingResults 結果隊列
  • Printer:負責列印 PingResults 結果隊列已經執行完畢的任務結果,列印完畢的 PingResult 對象會從隊列中刪除


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • > 本文首發於公眾號:Hunter後端 > 原文鏈接:[es筆記四之中文分詞插件安裝與使用](https://mp.weixin.qq.com/s/aQuwrUzLZDKLv_K8dKeVzw) 前面我們介紹的操作及演示都是基於英語單詞的分詞,但我們大部分使用的肯定都是中文,所以如果需要使用分詞的操 ...
  • 在上一篇博客中,我們介紹了用Python對來實現一個Scheme求值器。然而,我們跳過了部分特殊形式(special forms)和基本過程(primitive procedures)實現的介紹,如特殊形式中的delay、cons-stream,基本過程中的force、streawn-car、str... ...
  • ## 1.自定義starter的作用 在我們的日常開發工作中,經常會有一些獨立於業務之外的配置模塊,比如阿裡雲oss存儲的時候,我們需要一個工具類進行文件上傳。我們經常將其放到一個特定的包下,然後如果另一個工程需要復用這塊功能的時候,需要將代碼硬拷貝到另一個工程,重新集成一遍,這樣會非常麻煩。如果我 ...
  • # 使用 Async Rust 構建簡單的 P2P 節點 ### P2P 簡介 - P2P:peer-to-peer - P2P 是一種網路技術,可以在不同的電腦之間共用各種計算資源,如 CPU、網路帶寬和存儲。 - P2P 是當今用戶線上共用文件(如音樂、圖像和其他數字媒體)的一種非常常用的方法 ...
  • ## 1.1 為什麼要學 Qt Qt是一個跨平臺的 C++ 圖形用戶界面應用程式框架 Qt 為應用程式開發者提供建立藝術級圖形界面所需的所有功能 Qt 是完全面向對象的,很容易擴展,並且允許真正的組件編程 (1)Qt 發展史 在講解學習 Qt 的必要性之前, 先來瞭解下 Qt 的發展歷史: 1991 ...
  • 用go設計開發一個自己的輕量級登錄庫/框架吧(拓展篇),給自己的庫/框架拓展一下吧,主庫:https://github.com/weloe/token-go ...
  • ### 1.0 匿名對象的基本知識 * 匿名對象 顧名思義,匿名對象指的就是沒有名字的對象,在使用中理解為實例化一個類對象,但是並不把它賦給一個對應的類變數,而是直接使用。在理解匿名對象前,我們先創建一個類便於後面的使用。 * 匿名對象具有以下特征: 語法上:只創建對象,但不用變數來接收,例如:假設 ...
  • Groovy是一種基於Java平臺的動態編程語言,它結合了Python、Ruby和Smalltalk等語言的特性,同時與Java無縫集成。 ...
一周排行
    -Advertisement-
    Play Games
  • Dapr Outbox 是1.12中的功能。 本文只介紹Dapr Outbox 執行流程,Dapr Outbox基本用法請閱讀官方文檔 。本文中appID=order-processor,topic=orders 本文前提知識:熟悉Dapr狀態管理、Dapr發佈訂閱和Outbox 模式。 Outbo ...
  • 引言 在前幾章我們深度講解了單元測試和集成測試的基礎知識,這一章我們來講解一下代碼覆蓋率,代碼覆蓋率是單元測試運行的度量值,覆蓋率通常以百分比表示,用於衡量代碼被測試覆蓋的程度,幫助開發人員評估測試用例的質量和代碼的健壯性。常見的覆蓋率包括語句覆蓋率(Line Coverage)、分支覆蓋率(Bra ...
  • 前言 本文介紹瞭如何使用S7.NET庫實現對西門子PLC DB塊數據的讀寫,記錄了使用電腦模擬,模擬PLC,自至完成測試的詳細流程,並重點介紹了在這個過程中的易錯點,供參考。 用到的軟體: 1.Windows環境下鏈路層網路訪問的行業標準工具(WinPcap_4_1_3.exe)下載鏈接:http ...
  • 從依賴倒置原則(Dependency Inversion Principle, DIP)到控制反轉(Inversion of Control, IoC)再到依賴註入(Dependency Injection, DI)的演進過程,我們可以理解為一種逐步抽象和解耦的設計思想。這種思想在C#等面向對象的編 ...
  • 關於Python中的私有屬性和私有方法 Python對於類的成員沒有嚴格的訪問控制限制,這與其他面相對對象語言有區別。關於私有屬性和私有方法,有如下要點: 1、通常我們約定,兩個下劃線開頭的屬性是私有的(private)。其他為公共的(public); 2、類內部可以訪問私有屬性(方法); 3、類外 ...
  • C++ 訪問說明符 訪問說明符是 C++ 中控制類成員(屬性和方法)可訪問性的關鍵字。它們用於封裝類數據並保護其免受意外修改或濫用。 三種訪問說明符: public:允許從類外部的任何地方訪問成員。 private:僅允許在類內部訪問成員。 protected:允許在類內部及其派生類中訪問成員。 示 ...
  • 寫這個隨筆說一下C++的static_cast和dynamic_cast用在子類與父類的指針轉換時的一些事宜。首先,【static_cast,dynamic_cast】【父類指針,子類指針】,兩兩一組,共有4種組合:用 static_cast 父類轉子類、用 static_cast 子類轉父類、使用 ...
  • /******************************************************************************************************** * * * 設計雙向鏈表的介面 * * * * Copyright (c) 2023-2 ...
  • 相信接觸過spring做開發的小伙伴們一定使用過@ComponentScan註解 @ComponentScan("com.wangm.lifecycle") public class AppConfig { } @ComponentScan指定basePackage,將包下的類按照一定規則註冊成Be ...
  • 操作系統 :CentOS 7.6_x64 opensips版本: 2.4.9 python版本:2.7.5 python作為腳本語言,使用起來很方便,查了下opensips的文檔,支持使用python腳本寫邏輯代碼。今天整理下CentOS7環境下opensips2.4.9的python模塊筆記及使用 ...