一、阻塞IO與非阻塞IO Linux網路IO模型(5種) (1)阻塞IO模型 所有文件操作都是阻塞的,以套接字介面為例,在進程空間中調用recvfrom,系統調用直到數據包到達且被覆制到應用進程緩衝區或發生錯誤時才返回,期間會一直等待(阻塞)。模型如圖: (2)非阻塞IO模型 recvfrom從應用 ...
Linux提高select/poll,進程通過將一個或多個fd(file descriptor)傳遞給select或poll系統調用,阻塞在select操作上,偵測多個fd是否處於就緒狀態。select/poll順序掃描fd是否就緒,而且支持的fd數量有限。Linux還提供了一個epoll系統調用,使用基於事件驅動的方式代替順序掃描,性能更高。當有fd就緒時,立即回調函數rollback。如圖:
//1.服務端 package com.xbai.io; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import com.xbai.executor.TimeServerHandlerExecutePool; import com.xbai.handler.TimeServerHandler; public class TimeServerExecutor { public static void main(String[] args)throws IOException { int port =8080; if(args !=null && args.length >0){ try { port = Integer.valueOf(args[0]); }catch (Exception e) { // TODO: handle exception } } ServerSocket server =null; try { server =new ServerSocket(port); System.out.println("The time server is started in port : " + port); TimeServerHandlerExecutePool singleExecutor =new TimeServerHandlerExecutePool(50,10000); while(true){ Socket socket = server.accept(); singleExecutor.execute(new TimeServerHandler(socket)); } }finally { if(server !=null){ System.out.println("The time server closed"); server.close(); server =null; } } } }
//2.服務端線程池 package com.xbai.executor; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class TimeServerHandlerExecutePool { private ExecutorService executor; public TimeServerHandlerExecutePool(int maxPoolSize,int queueSize){ executor =new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),maxPoolSize,120L,TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize));//線程池要執行的任務阻塞成一個隊列,其內部的機制是等待喚醒生產者和消費者線程,有一個生產就可喚醒一個消費,去看源碼的線程池原理 } public void execute(Runnable task){ executor.execute(task); } }
//3.服務端處理器 package com.xbai.handler; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.Socket; import java.sql.Date; public class TimeServerHandler implements Runnable{ private Socketsocket; public TimeServerHandler(Socket socket) { this.socket = socket; } @Override public void run() { // TODO Auto-generated method stub BufferedReader br =null; PrintWriter pw =null; try { br =new BufferedReader(new InputStreamReader(socket.getInputStream())); pw =new PrintWriter(socket.getOutputStream(),true); String curTime =null; String msg =null; while(true){ msg = br.readLine(); if(msg ==null){ break; } System.out.println("The time server received order:" + msg); curTime ="query time order".equalsIgnoreCase(msg) ?new Date( System.currentTimeMillis()).toString() :"bad order"; pw.println(curTime);//這裡不寫println,就無法插入換行符,那邊就不能readLine,一直阻塞,無法獲取數據 } }catch (IOException e) { if(br !=null){ try { br.close(); }catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } if(pw !=null){ pw.close(); pw =null; } if(socket !=null){ try { socket.close(); }catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } socket =null; } } } }
//4.客戶端代碼 package com.xbai.io; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.net.UnknownHostException; public class TimeClient { public static void main(String[] args) { int port =8080; if(args !=null && args.length >0){ try { port = Integer.valueOf(args[0]); }catch (Exception e) { // TODO: handle exception } } Socket socket =null; BufferedReader br =null; PrintWriter pw =null; try { socket =new Socket("localhost",port); br =new BufferedReader(new InputStreamReader(socket.getInputStream())); pw =new PrintWriter(socket.getOutputStream(),true); pw.println("query time order"); System.out.println("send order succeed"); String resp = br.readLine(); System.out.println("Now is :" + resp); }catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ if(pw !=null){ pw.close(); pw =null; } if(br !=null){ try { br.close(); }catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } br =null; } if(socket !=null){ try { socket.close(); }catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } socket =null; } } } }


三、非阻塞IO的例子(原生Java NIO,目前有寫半包等問題,懷疑服務端沒有寫出去導致的客戶端Selector的關閉狀態異常)
//1.服務端主程式 package com.xiaobai.nio; public class NIOServer { public static void main(String[] args) { MultiplexerTimeServer timeServer = new MultiplexerTimeServer(); new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start(); } }
//2.服務端timeServer package com.xiaobai.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Date; import java.util.Iterator; import java.util.Set; public class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel servChannel; private volatile boolean stop; public MultiplexerTimeServer() { try { selector = Selector.open();//建立Selector servChannel = ServerSocketChannel.open();//建立Channel servChannel.configureBlocking(false); servChannel.socket().bind(new InetSocketAddress(2048), 1024);//ServerSocket綁定 servChannel.register(selector, SelectionKey.OP_ACCEPT);//向Selector註冊ACCEPT事件 System.out.println("The time server is started in port 2048"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void run() { while(!stop){ try { selector.select(1000);//輪詢Channel Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove();//移除它 try { handleInput(key); } catch (Exception e) { if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } if(selector != null){ try { selector.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ //處理新接入的請求 if(key.isAcceptable()){//此前已向Selector註冊,並已open //獲取server channel ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); //獲取client channel SocketChannel sc = ssc.accept(); sc.configureBlocking(false); //第一次捕捉到的客戶端向Selector註冊READ事件 sc.register(selector, SelectionKey.OP_READ); } //處理已註冊的讀事件 if(key.isReadable()){ //獲取客戶端Channel SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer);//讀到緩衝 if(readBytes > 0){ readBuffer.flip(); // Buffer java.nio.Buffer.flip() // // // Flips this buffer. The limit is set to the current position and then the position is set to zero. If the mark is defined then it is discarded. // // After a sequence of channel-read or put operations, invoke this method to prepare for a sequence of channel-write or relative get operations. For example: // // buf.put(magic); // Prepend header // in.read(buf); // Read data into rest of buffer // buf.flip(); // Flip buffer // out.write(buf); // Write header + data to channel byte[] bytes = new byte[readBuffer.remaining()];//緩衝中有多少個位元組數據 readBuffer.get(bytes); String body = new String(bytes,"UTF-8"); System.out.println("The time server receive order : " + body); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(// System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(sc,currentTime); }else if(readBytes < 0){ //貴在堅持! //對端鏈路關閉 // key.cancel(); // sc.close(); }else{ ;//讀到0位元組,忽略 } } } } private void doWrite(SocketChannel channel, String response) throws IOException{ if(response != null && response.trim().length() > 0){ byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);//根據位元組數組容量創建ByteBuffer writeBuffer.put(bytes);//位元組數組複製到緩衝區 writeBuffer.flip(); channel.write(writeBuffer);//SocketChannel是非同步非阻塞的,不保證一次發送完,出現“寫半包”問題, //這裡缺少註冊寫操作,不斷輪詢Selector將沒有發送完的ByteBuffer發送完畢 //TODO 這裡有問題,沒有寫出去,導致客戶端無法收到消息,顯示Selector關閉狀態異常 } } }
//3.客戶端主程式 package com.xiaobai.nio; public class NIOClient { public static void main(String[] args) { TimeClientHandle timeClientHandle = new TimeClientHandle("",2048); new Thread(timeClientHandle,"NIO-TimeClient-001").start(); } }
//4.客戶端timeClient package com.xiaobai.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Date; import java.util.Iterator; import java.util.Set; public class TimeClientHandle implements Runnable { private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean stop; public TimeClientHandle(String host,int port) { this.host = host==null?"":host; this.port = port; try { selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); } catch (Exception e) { // TODO: handle exception } } @Override public void run() { try { doConnect(); } catch (Exception e) { // TODO: handle exception } while(!stop){ try { selector.select(3000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } if(selector != null){ try { selector.close(); } catch (Exception e) { // TODO: handle exception } } } } private void handleInput(SelectionKey key) throws Exception{ if(key.isValid()){ //判斷是否連接成功 //連接方法中已有連接不成功註冊連接事件的邏輯,反覆嘗試連接,這裡判斷,如果成功,註冊該客戶連接的read事件準備接收數據 SocketChannel sc = (SocketChannel) key.channel(); if(key.isConnectable()){ if(sc.finishConnect()){ sc.register(selector, SelectionKey.OP_READ); doWrite(sc);//本客戶向外寫東西 } } //下麵是從伺服器接收數據 if(key.isReadable()){ ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer);//讀到緩衝 if(readBytes > 0){ readBuffer.flip(); // Buffer java.nio.Buffer.flip() // // // Flips this buffer. The limit is set to the current position and then the position is set to zero. If the mark is defined then it is discarded. // // After a sequence of channel-read or put operations, invoke this method to prepare for a sequence of channel-write or relative get operations. For example: // // buf.put(magic); // Prepend header // in.read(buf); // Read data into rest of buffer // buf.flip(); // Flip buffer // out.write(buf); // Write header + data to channel byte[] bytes = new byte[readBuffer.remaining()];//緩衝中有多少個位元組數據 readBuffer.get(bytes); String body = new String(bytes,"UTF-8"); System.out.println("Now is : " + body); this.stop = true; }else if(readBytes < 0){ //貴在堅持! //對端鏈路關閉 key.cancel(); sc.close(); }else{ ;//讀到0位元組,忽略 } } } } private void doConnect() throws IOException { //如果連接成功,則直接註冊到多路復用器上,發送請求消息,讀應答 if(socketChannel.connect(new InetSocketAddress(host, port))){//非同步連接,直至成功 socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); }else{//註冊連接事件,輪詢直至連接成功 //非同步,到底是什麼概念?底層是什麼原理?TCP/IP層面 socketChannel.register(selector, SelectionKey.OP_CONNECT); } } private void doWrite(SocketChannel sc) throws IOException { //本客戶向外寫東西 byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); sc.write(writeBuffer); if(!writeBuffer.hasRemaining()){ System.out.println("Send order 2 server succeed."); } } }

//1.NettyServer package com.xiaobai.server.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.net.InetSocketAddress; public class NettyServer { private final int port; public NettyServer(int port) { this.port = port; } public static void main(String[] args) throws InterruptedException { if(args.length != 1) { System.err.println("Usage:" + NettyServer.class.getSimpleName() + " <port>"); return; } int port = Integer.parseInt(args[0]); new NettyServer(port).start(); } private void start() throws InterruptedException { final NettyServerHandler serverHandler = new NettyServerHandler(); EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(group).channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(serverHandler); } }); ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully().sync(); } } }

//2.NettyServerHandler package com.xiaobai.server.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; @ChannelHandler.Sharable public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; System.out.println("Server received:" + in.toString(CharsetUtil.UTF_8)); ctx.write(in); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) .addListener(ChannelFutureListener.CLOSE);//關閉該Channel } }

//3.NettyClient package com.xiaobai.server.netty; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; public class NettyClient { private final String host; private final int port; public NettyClient(String host, int port) { this.host = host; this.port = port; } public static void main(String[] args) throws InterruptedException { if(args.length != 2) { System.err.println("Usage:" + NettyServer.class.getSimpleName() + " <host> <port>"); return; } String host = args[0]; int port = Integer.parseInt(args[1]); new NettyClient(host,port).start(); } private void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host,port)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyClientHandler()); } }); ChannelFuture f = b.connect().sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully().sync(); } } }

//4.NettyClientHandler package com.xiaobai.server.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerInvoker; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; import io.netty.util.concurrent.EventExecutorGroup; public class NettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override protected void messageReceived(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { System.out.println("Client received: " + byteBuf.toString(CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!",CharsetUtil.UTF_8)); } }


//服務端啟動spring配置文件applicationContext.xml <?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> <bean id="nettyServer" class="com.xiaobai.netty.server.NettyServer" init-method="start"> <constructor-arg index="0" type="int"> <value>8888</value> </constructor-arg> </bean> </beans>

//服務端啟動類 package com.xiaobai.netty.spring; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class SpringStart { public static void main(String[] args) { ApplicationContext application = new ClassPathXmlApplicationContext("com/xiaobai/netty/spring/applicationContext.xml"); } }

//NettyServer package com.xiaobai.netty.server; import com.xiaobai.netty.handlers.ChildChannelHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import org.apache.log4j.Logger; public class NettyServer { private static final Logger logger = Logger.getLogger(NettyServer.class); //無參 public NettyServer() { } //用於spring管理構造函數初始化bean public NettyServer(int port) { this.port = port; } private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private ServerBootstrap bootstrap; private int port; public void start() { bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(); try { bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) .childHandler(new ChildChannelHandler()); //