瞭解過 Hex 六邊形架構、Onion 洋蔥架構、Clean 整潔架構的同學可以將本篇文章介紹的實踐方法與自身項目代碼架構對比並互通有無,共同改進。沒瞭解過上述架構的同學可以學習一種新的架構方法,並嘗試將其應用到業務項目中,降低項目維護成本,提高效率。 本文提及的架構主要指項目組織的“代碼架構”,註... ...
前言
這周學習尼恩編著的《Netty、Redis、ZooKeeper高併發實戰》, 這本書寫的很不錯,通過十幾個例子帶領大家去體會高併發如何實現, 這周我看了最基礎的JavaNOI部分,讀書的時候好像明白了作者寫的內容,但是又體會不深,非得自己動手寫一些書上得例子,有時候還要改動下例子,才能體會深刻,得出自己得結論。下麵我們進入例子演示。
Blocking IO
首先我們會來一個Blocking IO的例子,也就是同步阻塞方式,在Java中,預設創建的socket都是阻塞的。
先來看Server端代碼, 寫一個繼承ServerSocket的類
@Slf4j
public class BlockReceiveServer extends ServerSocket {
public BlockReceiveServer() throws Exception
{
super(SERVER_PORT);
}
public void startServer() throws Exception
{
while (true)
{
// server嘗試接收其他Socket的連接請求,server的accept方法是阻塞式的
log.debug("server listen at:" + SERVER_PORT);
Socket socket = this.accept();
/**
* 我們的服務端處理客戶端的連接請求是同步進行的, 每次接收到來自客戶端的連接請求後,
* 都要先跟當前的客戶端通信完之後才能再處理下一個連接請求。 這在併發比較多的情況下會嚴重影響程式的性能,
* 為此,我們可以把它改為如下這種非同步處理與客戶端通信的方式
*/
// 每接收到一個Socket就建立一個新的線程來處理它
new Thread(new Task(socket)).start();
}
}
this.accept()是阻塞的,也就是說沒有請求來的時候,它是停在這裡的,有請求來了後,我們起一個新的線程來出來它。 這裡定義了一個Task來出來客戶端請求。
class Task implements Runnable
{
private Socket socket;
private DataInputStream dis;
public Task(Socket socket)
{
this.socket = socket;
}
@Override
public void run()
{
try
{
dis = new DataInputStream(socket.getInputStream());
log.debug("start receive");
for (int i = 0;i < 1000 ;i++)
{
dis.readUTF();
}
log.debug("finish receive" );
} catch (Exception e)
{
e.printStackTrace();
} finally
{
IOUtil.closeQuietly(dis);
IOUtil.closeQuietly(socket);
}
}
}
書中的例子是傳送文件,我這裡把它改為接收客戶端傳過來的文本,就是客戶端寫一千次,服務端讀一千次,這個也是寫死的,因為阻塞,沒有讀完,你是不能進入下一個環節的,不然會出錯。
客戶端代碼
@Slf4j
public class BlockSendClient extends Socket
{
private Socket client;
private DataOutputStream outputStream;
/**
* 構造函數<br/>
* 與伺服器建立連接
*
* @throws Exception
*/
public BlockSendClient() throws Exception
{
super(SOCKET_SERVER_IP
, SERVER_PORT);
this.client = this;
}
/**
* 向服務端傳輸文件
*
* @throws Exception
*/
public void sendFile() throws Exception
{
try
{
outputStream = new DataOutputStream(client.getOutputStream());
for (int i = 0;i < 1000 ;i++)
{
outputStream.writeUTF(String.valueOf(i));
outputStream.flush();
Thread.sleep(1000);
}
log.debug("======== file transfer success ========");
} catch (Exception e)
{
e.printStackTrace();
} finally
{
IOUtil.closeQuietly(outputStream);
IOUtil.closeQuietly(client);
}
}
客戶端就是繼承了socket,然後往socket裡面發送1000次數字。
為了體現效果,我把客戶端也讓它多線程的去和服務端連接。 這裡啟動50個線程去和服務端連接,如果啟動100個,會報“java.net.ConnectException: Connection refused: connect”, 但是你多啟動幾個客戶端去連接,是不會有問題的。
public static void main(String[] args)
{
for(int i =0;i<50;i++){
new Thread(new Task()).start();
}
}
class Task implements Runnable
{
@Override
public void run() {
try
{
BlockSendClient client = new BlockSendClient(); // 啟動客戶端連接
client.sendFile();
} catch (Exception e)
{
e.printStackTrace();
}
}
}
我們可以看到服務端的輸出
我們打開多少客戶端,那麼服務端就必須要打開50倍的線程數來連接。 而線程能開的數量是有限制的,一個Java線程大約占1M的空間。 對於高併發,這種方式肯定是不可取的。
Java NIO
我們來看下如何用Java NOI 來寫一個和上面一樣的功能。先來看下服務端代碼
@Slf4j
public class NioDiscardServer {
public static void startServer() throws IOException {
// 1、創建一個 Selector選擇器
Selector selector = Selector.open();
// 2、獲取通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverSocketChannel.socket();
// 3.設置為非阻塞
serverSocketChannel.configureBlocking(false);
// 4、綁定監聽埠
serverSocket.bind(new InetSocketAddress(SERVER_PORT));
log.info("server start success");
// 5、將通道註冊到選擇器上,並註冊的IO事件為:“接收新連接”
SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// sk.interestOps(SelectionKey.OP_ACCEPT) ;
// 6、輪詢感興趣的I/O就緒事件(選擇鍵集合)
while (selector.select() > 0) {
// 7、獲取選擇鍵集合
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
// 8、獲取單個的選擇鍵,並處理
SelectionKey selectedKey = selectedKeys.next();
// 9、判斷key是具體的什麼事件
if (selectedKey.isAcceptable()) {
log.info("new connection coming" + selectedKey.channel());
ServerSocketChannel server = (ServerSocketChannel) selectedKey.channel();
SocketChannel socketChannel = server.accept();
// 10、若選擇鍵的IO事件是“連接就緒”事件,就獲取客戶端連接
// 11、切換為非阻塞模式
socketChannel.configureBlocking(false);
socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
// 12、將該通道註冊到selector選擇器上
SelectionKey channleSk= socketChannel.register(selector,
SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT);
}
if (selectedKey.isWritable()) {
//log.info("write ready:" + selectedKey.channel());
}
if (selectedKey.isConnectable()) {
log.info("client connect success:" + selectedKey.channel());
}
if (selectedKey.isReadable()) {
log.info("read ready:" + selectedKey.channel());
// 13、若選擇鍵的IO事件是“可讀”事件,讀取數據
SocketChannel socketChannel = (SocketChannel) selectedKey.channel();
// 14、讀取數據
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int length = 0;
while ((length = socketChannel.read(byteBuffer)) > 0) {
byteBuffer.flip();
log.info(new String(byteBuffer.array(), 0, length));
byteBuffer.clear();
}
if(length == -1)
{
socketChannel.close();
}
}
// 15、移除選擇鍵
selectedKeys.remove();
}
}
// 7、關閉連接
serverSocketChannel.close();
}
public static void main(String[] args) throws IOException {
startServer();
}
}
客戶端代碼如下
@Slf4j
public class NioDiscardClient {
public void startClient(int clientNum) throws IOException {
InetSocketAddress address =
new InetSocketAddress(SOCKET_SERVER_IP,
SERVER_PORT);
// 1、獲取通道(channel)
SocketChannel socketChannel = SocketChannel.open(address);
// 2、切換成非阻塞模式
socketChannel.configureBlocking(false);
//不斷的自旋、等待連接完成,或者做一些其他的事情
while (!socketChannel.finishConnect()) {
}
log.info("client connect success");
for(int i = 0;i<1000;i++) {
// 3、分配指定大小的緩衝區
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put((clientNum + "hello" + i).getBytes());
byteBuffer.flip();
socketChannel.write(byteBuffer);
ThreadUtil.sleepSeconds(1);
}
log.info("client write success");
socketChannel.shutdownOutput();
socketChannel.close();
}
public static void main(String[] args) throws IOException {
for(int i =0;i<50;i++){
new Thread(new NoiTask(i)).start();
}
}
}
class NoiTask implements Runnable
{
private int clientNum;
public NoiTask(int clientNum)
{
this.clientNum = clientNum;
}
@Override
public void run() {
try
{
NioDiscardClient client = new NioDiscardClient(); // 啟動客戶端連接
client.startClient(clientNum); // 傳輸文件
} catch (Exception e)
{
e.printStackTrace();
}
}
}
代碼這裡不解釋了。 這裡服務端和客戶端不需要約定多少內容要發送,
服務端有段代碼需要註意
while ((length = socketChannel.read(byteBuffer)) > 0),這段很容易就退出迴圈,因為read是非同步的,很容易就返回0, 讓迴圈中斷, 開始的時候socketChannel.close()沒有加上條件,服務端列印出幾次後,就close了,客戶端就再寫不進去了。報“java.io.IOException: An established connection was aborted by the software in your host machine”,因為服務端關閉了。
服務端的輸出我們可以看出,它只啟動了一個main線程,不論多少個客戶端連接過來。
我再用CMD啟動多個進行客戶端進行測試的時候,發現它會啟用300多M的記憶體,後面多加幾個客戶端,它也不會增長,保持再這個數字上面。 前面的例子中,記憶體使用的要小,但是增加客戶端後,記憶體使用量會增加,但是也不是很明顯。 這裡面為什麼會用這麼多記憶體,我不是太明白,也許代碼有什麼缺陷在這裡,暫時不細究了。
總結
我在使用NIO的時候,發現它在單機測試性能的時候不比傳統的阻塞式的優秀,甚至還慢一些。它的應用場景是在量大的時候,高併發的時候。具體NIO的原理我也解釋不清楚,只能從應用的層面來用例子實踐一下,後面看能不能再深入下它的原理,書中講述的很好,但是我只能說大致理解,談不上深入。