從 JDK7 開始,引入了表示非同步通道的 `AsynchronousSockerChannel` 類和 `AsynchronousServerSocketChannel` 類,這兩個類的作用與 `SocketChannel` 類和 `ServerSockelChannel` 相似,區別在於非同步通道的 ...
從 JDK7 開始,引入了表示非同步通道的 AsynchronousSockerChannel
類和 AsynchronousServerSocketChannel
類,這兩個類的作用與 SocketChannel
類和 ServerSockelChannel
相似,區別在於非同步通道的一些方法總是採用非阻塞模式,並且它們的非阻塞方法會立即返回一個 Future
對象,用來存放方法的非同步運算結果
AsynchronousSocketChannel
類有以下非阻塞方法:
// 連接遠程主機
Future<Void> connect(SocketAddress remote);
// 從通道中讀入數據,存放到ByteBuffer中
// Future對象包含了實際從通道中讀到的位元組數
Future<Inleger> read(ByteBuffer dst);
// 把ByteBuffer的數據寫入通道
// Future對象包含了實際寫入通道的位元組數
Future<Integer> write(ByteBuffer src);
AsynchronousServerSocketChannel
類有以下非阻塞方法:
// 接受客戶連接請求
// Future對象包含連接建立成功後創建的AsynchronousSockelChannel對象
Future<AsynchronousSocketChannel> accept();
使用非同步通道,可以使程式並行執行多個非同步操作,例如:
SocketAddress socketAddress = ...;
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
//請求建立連接
Future<Void> connected = client.connect(socketAddress);
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
//執行其他操作
//...
//等待連接完成
connected.get();
//讀取數據
Future<Integer> future = client.read(byteBuffer);
//執行其他操作
//...
//等待從通道讀取數據完成
future.get();
byteBuffer.flip();
WritableByteChannel out = Channels.newChannel(System.out);
out.write(byteBuffer);
下例的代碼演示了非同步通道的用法,它不斷接收用戶輸入的功能變數名稱並嘗試建立連接,最後列印建立連接所花費的時間。如果程式無法連接到指定的主機,就列印相關錯誤信息。如果用戶輸入 bye,就結束程式
//表示連接一個主機的結果
class PingResult {
InetSocketAddress address;
long connectStart; //開始連接時的時間
long connectFinish = 0; //連接成功時的時間
String failure;
Future<Void> connectResult; //連接操作的非同步運算結果
AsynchronousSocketChannel socketChannel;
String host;
final String ERROR = "連接失敗";
PingResult(String host) {
try {
this.host = host;
address = new InetSocketAddress(InetAddress.getByName(host), 80);
} catch (IOException x) {
failure = ERROR;
}
}
//列印連接一個主機的執行結果
public void print() {
String result;
if (connectFinish != 0) {
result = Long.toString(connectFinish - connectStart) + "ms";
} else if (failure != null) {
result = failure;
} else {
result = "Timed out";
}
System,out,println("ping "+ host + "的結果" + ":" + result);
}
public class PingClient {
//存放所有PingResult結果的隊列
private LinkedList<PingResult> pingResults = new Linkedlist<PingResult>();
boolean shutdown = false;
ExecutorService executorService;
public PingClient() throws IOException {
executorService = Executors.newFixedThreadPool(4);
executorService.execute(new Printer());
receivePingAddress();
}
}
public static void main(String args[]) throws IOException {
new PingClient();
}
/*接收用戶輸入的主機地址,由線程池執行PingHandler任務 */
public void receivePingAddress() {
try {
BufferedReader localReader = new BufferedReader(new InputStreamReader(System.in));
String msg = null;
//接收用戶輸入的主機地址
while((msg = localReader.readLine()) != null) {
if(msg.equals("bye")) {
shutdown = true;
executorService.shutdown();
break;
}
executorService.execute(new PingHandler(msg));
}
} catch(IOException e) {}
}
/* 嘗試連接特定主機,生成一個PingResult對象,把它加入PingResults結果隊列中 */
public class PingHandler implements Runnable {
String msg;
public PingHandler(String msg) {
this.msg = msg;
}
public void run() {
if(!msg.equals("bye")) {
PingResult pingResult = new PingResult(msg);
AsynchronousSocketChannel socketChannel = null;
try {
socketChannel = AsynchronousSocketChannel.open();
pingResult.connectStart = System.currentTimeMillis();
synchronized (pingResults) {
//向pingResults隊列加入一個PingResult對象
pingResults.add(pingResult);
pingResults,notify();
}
Future<Void> connectResult = socketChannel.connect(pingResult.address);
pingResult.connectResult = connectResult;
} catch (Exception x) {
if (socketChannel != null) {
try { socketChannel.close();} catch (IOException e) {)
}
pingResult.failure = pingResult.ERROR;
}
}
}
}
/* 列印PingResults結果隊列中已經執行完畢的任務的結果 */
public class Printer implements Runnable {
public void run() {
PingResult pingResult = null;
while(!shutdown) {
synchronized (pingResults) {
while (!shutdown && pingResults.size() == 0 ) {
try {
pingResults.wait(100);
} catch(InterruptedException e) {
e.printStackTrace();
}
}
if(shutdown && pingResults.size() == 0 ) break;
pingResult = pingResults.getFirst();
try {
if(pingResult.connectResult != null) {
pingResult.connectResult.get(500, TimeUnit,MILLISECONDS);
} catch(Exception e) {
pingResult.failure = pingResult.ERROR;
}
}
if(pingResult.connectResult != null && pingResult.connectResult.isDone()) {
pingResult.connectFinish = System.currentTimeMillis();
}
if(pingResult,connectResult != null && pingResult.connectResult.isDone() || || pingResult,failure != null) {
pingResult.print();
pingResults.removeFirst();
try {
pingResult.socketChannel.close();
} catch (IOException e) {}
}
}
}
}
}
}
PingClient
類定義了兩個表示特定任務的內部類:
PingHandler
:負責通過非同步通道去嘗試連接客戶端輸入的主機地址,並且創建一個PingResult
對象,它包含了連接操作的非同步運算結果,再將其加入PingResults
結果隊列Printer
:負責列印PingResults
結果隊列已經執行完畢的任務結果,列印完畢的PingResult
對象會從隊列中刪除