1.1 基於TCP協議的RPC 1.1.1 RPC名詞解釋 RPC的全稱是Remote Process Call,即遠程過程調用,RPC的實現包括客戶端和服務端,即服務調用方和服務提供方。服務調用方發送RPC請求到服務提供方,服務提供方根據請求的參數執行請求方法,並將結果返回給服務調用方,一次RPC ...
1.1 基於TCP協議的RPC
1.1.1 RPC名詞解釋
RPC的全稱是Remote Process Call,即遠程過程調用,RPC的實現包括客戶端和服務端,即服務調用方和服務提供方。服務調用方發送RPC請求到服務提供方,服務提供方根據請求的參數執行請求方法,並將結果返回給服務調用方,一次RPC調用完成。
1.1.2 對象的序列化
在網路上傳輸的數據,無論何種類型,最終都需要轉化為二進位流。在面向對象的程式設計中,客戶端將對象轉化為二進位流發送給服務端,服務端接收數據後將二進位流轉化為對象,java中將這兩種轉化方式稱為對象的序列化和反序列化。下麵介紹java內置的序列化方式和基於java的Hessian序列化方式:
java內置的序列化和反序列化關鍵代碼:
1 //序列化操作 2 Person person = new Person(); 3 ByteArrayOutputStream os = new ByteArrayOutputStream(); 4 ObjectOutputStream out = new ObjectOutputStream(os); 5 out.writeObject(person); 6 byte[] byteArray = os.toByteArray(); 7 8 //反序列化操作 9 ByteArrayInputStream is = new ByteArrayInputStream(byteArray); 10 ObjectInputStream in = new ObjectInputStream(is); 11 Person newPerson = new Person(); 12 newPerson = (Person) in.readObject();
基於java的Hessian序列化和反序列化關鍵代碼:
1 //序列化操作 2 ByteArrayOutputStream osH = new ByteArrayOutputStream(); 3 HessianOutput outH = new HessianOutput(osH); 4 outH.writeObject(person); 5 byte[] byteArrayH = osH.toByteArray(); 6 7 //反序列化操作 8 ByteArrayInputStream isH = new ByteArrayInputStream(byteArrayH); 9 HessianInput inH = new HessianInput(isH); 10 newPerson = (Person) inH.readObject();
1.1.3 基於TCP協議實現RPC
我們利用java的SocketAPI實現一個簡單的RPC調用,服務的介面和實現比較簡單,根據傳入的參數來判斷返回"hello" or "bye bye"。
1 public interface SayHelloService { 2 3 public String sayHello(String arg); 4 } 5 6 public class SayHelloServiceImpl implements SayHelloService { 7 8 public String sayHello(String arg) { 9 return "hello".equals(arg) ? "hello" : "bye bye"; 10 } 11 12 }
服務消費者Consumer類:
1 /** 2 * 基於TCP協議實現RPC -- 服務消費者 3 * @author admin 4 * 5 */ 6 public class Consumer { 7 8 public static void main(String[] args) throws Exception { 9 //介面名稱 10 String interfaceName = SayHelloService.class.getName(); 11 //需要執行遠程的方法 12 Method method = SayHelloService.class.getMethod("sayHello", String.class); 13 //傳遞到遠程的參數 14 Object [] arguments = {"hello"}; 15 Socket socket = new Socket("127.0.0.1", 1234); 16 //將方法名和參數傳遞到遠端 17 ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream()); 18 out.writeUTF(interfaceName);//介面名稱 19 out.writeUTF(method.getName());//方法名稱 20 out.writeObject(method.getParameterTypes());//方法參數類型 21 out.writeObject(arguments);//傳遞的參數 22 System.out.println("發送信息到服務端,發送的信息為:" + arguments[0]); 23 //從遠端讀取返回結果 24 ObjectInputStream in = new ObjectInputStream(socket.getInputStream()); 25 String result = (String) in.readObject(); 26 System.out.println("服務返回的結果為:" + result); 27 } 28 }
服務提供者Provider類:
1 /** 2 * 基於TCP協議實現RPC -- 服務提供者 3 * @author admin 4 * 5 */ 6 public class Provider { 7 8 public static void main(String[] args) throws Exception { 9 ServerSocket server = new ServerSocket(1234); 10 Map<Object, Object> services = new HashMap<Object, Object>(); 11 services.put(SayHelloService.class.getName(), new SayHelloServiceImpl()); 12 while(true) { 13 System.out.println("服務提供者啟動,等待客戶端調用…………"); 14 Socket socket = server.accept(); 15 //讀取服務信息 16 ObjectInputStream in = new ObjectInputStream(socket.getInputStream()); 17 String interfaceName = in.readUTF(); 18 String methodName = in.readUTF(); 19 Class<?>[] parameterTypes = (Class<?>[]) in.readObject(); 20 Object [] arguments = (Object[]) in.readObject(); 21 System.out.println("客戶端調用服務端介面" + interfaceName + "的" + methodName + "方法"); 22 //執行調用 23 Class serviceClass = Class.forName(interfaceName);//得到介面的class 24 Object service = services.get(interfaceName);//取得服務實現的對象 25 Method method = serviceClass.getMethod(methodName, parameterTypes);//獲得要調用的方法 26 Object result = method.invoke(service, arguments); 27 ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream()); 28 out.writeObject(result); 29 System.out.println("服務端返回結果為:" + result); 30 } 31 } 32 }
在真實的生產環境中往往是多個客戶端同時請求服務端,服務端則需要同時接收和處理多個客戶端請求消息,涉及併發處理、服務路由、負載均衡等現實問題,以上代碼顯然不能完成。
1.2 基於HTTP協議的RPC
1.2.1 HTTP協議棧
HTTP的全稱是HyperText Transfer Protocol,即超文本傳輸協議,當今普遍採用的版本是HTTP1.1。HTTP協議屬於應用層協議,它構建在TCP和IP協議之上,處於TCP/IP架構的頂端,為了更好的理解HTTP協議,我們基於java的SocketAPI設計一個簡單的應用層通信協議,來窺探協議實現的一些過程與細節。
客戶端向服務端發送一條命令,服務端接收到命令後,會判斷命令是否為"HELLO",若是則返回客戶端"hello!",否則返回客戶端"bye bye"。
1 /** 2 * 協議請求 3 * 4 * @author admin 5 * 6 */ 7 public class Request { 8 9 /** 10 * 協議編碼 0:GBK;1:UTF-8 11 */ 12 private byte encode; 13 /** 14 * 命令 15 */ 16 private String command; 17 /** 18 * 命令長度 19 */ 20 private int commandLength; 21 22 public byte getEncode() { 23 return encode; 24 } 25 26 public void setEncode(byte encode) { 27 this.encode = encode; 28 } 29 30 public String getCommand() { 31 return command; 32 } 33 34 public void setCommand(String command) { 35 this.command = command; 36 } 37 38 public int getCommandLength() { 39 return commandLength; 40 } 41 42 public void setCommandLength(int commandLength) { 43 this.commandLength = commandLength; 44 } 45 46 }
1 /** 2 * 協議響應 3 * 4 * @author admin 5 * 6 */ 7 public class Response { 8 /** 9 * 編碼 10 */ 11 private byte encode; 12 /** 13 * 響應 14 */ 15 private String response; 16 /** 17 * 響應長度 18 */ 19 private int responseLength; 20 21 public byte getEncode() { 22 return encode; 23 } 24 25 public void setEncode(byte encode) { 26 this.encode = encode; 27 } 28 29 public String getResponse() { 30 return response; 31 } 32 33 public void setResponse(String response) { 34 this.response = response; 35 } 36 37 public int getResponseLength() { 38 return responseLength; 39 } 40 41 public void setResponseLength(int responseLength) { 42 this.responseLength = responseLength; 43 } 44 45 @Override 46 public String toString() { 47 return "Response [encode=" + encode + ", response=" + response + ", responseLength=" + responseLength + "]"; 48 } 49 50 }
客戶端發送以及服務端響應處理代碼:
1 /** 2 * 服務端 3 * @author admin 4 * 5 */ 6 public class Server { 7 8 public static void main(String[] args) throws Exception { 9 ServerSocket server = new ServerSocket(1234); 10 while(true) { 11 Socket client = server.accept(); 12 //讀取請求數據 13 Request request = ProtocolUtil.readRequest(client.getInputStream()); 14 //封裝響應數據 15 Response response = new Response(); 16 response.setEncode(Encode.UTF8.getValue()); 17 response.setResponse(request.getCommand().equals("HELLO") ? "hello!" : "bye bye"); 18 response.setResponseLength(response.getResponse().length()); 19 //響應到客戶端 20 ProtocolUtil.writeResponse(client.getOutputStream(), response); 21 } 22 } 23 } 24 25 /** 26 * 客戶端 27 * @author admin 28 * 29 */ 30 public class Client { 31 32 public static void main(String[] args) throws Exception { 33 //組裝請求數據 34 Request request = new Request(); 35 request.setCommand("HELLO"); 36 request.setCommandLength(request.getCommand().length()); 37 request.setEncode(Encode.UTF8.getValue()); 38 Socket client = new Socket("127.0.0.1", 1234); 39 //發送請求 40 ProtocolUtil.writeRequest(client.getOutputStream(), request); 41 //讀取相應 42 Response response = ProtocolUtil.readResponse(client.getInputStream()); 43 System.out.println(response); 44 } 45 }
ProtocolUtil 類:
1 public class ProtocolUtil { 2 3 public static void writeRequest(OutputStream out, Request request) { 4 try { 5 out.write(request.getEncode()); 6 //write一個int值會截取其低8位傳輸,丟棄其高24位,因此需要將基本類型轉化為位元組流 7 //java採用Big Endian位元組序,而所有的網路協議也都是以Big Endian位元組序來進行傳輸,所以再進行數據的傳輸和接收時,需要先將數據轉化成Big Endian位元組序 8 //out.write(request.getCommandLength()); 9 out.write(int2ByteArray(request.getCommandLength())); 10 out.write(Encode.GBK.getValue() == request.getEncode() ? request.getCommand().getBytes("GBK") : request.getCommand().getBytes("UTF8")); 11 out.flush(); 12 } catch (Exception e) { 13 System.err.println(e.getMessage()); 14 } 15 } 16 17 /** 18 * 將響應輸出到客戶端 19 * @param os 20 * @param response 21 */ 22 public static void writeResponse(OutputStream out, Response response) { 23 try { 24 out.write(response.getEncode()); 25 out.write(int2ByteArray(response.getResponseLength())); 26 out.write(Encode.GBK.getValue() == response.getEncode() ? response.getResponse().getBytes("GBK") : response.getResponse().getBytes("UTF8")); 27 out.flush(); 28 } catch (Exception e) { 29 System.err.println(e.getMessage()); 30 } 31 } 32 33 public static Request readRequest(InputStream is) { 34 Request request = new Request(); 35 try { 36 //讀取編碼 37 byte [] encodeByte = new byte[1]; 38 is.read(encodeByte); 39 byte encode = encodeByte[0]; 40 //讀取命令長度 41 byte [] commandLengthByte = new byte[4];//緩衝區 42 is.read(commandLengthByte); 43 int commandLength = byte2Int(commandLengthByte); 44 //讀取命令 45 byte [] commandByte = new byte[commandLength]; 46 is.read(commandByte); 47 String command = Encode.GBK.getValue() == encode ? new String(commandByte, "GBK") : new String(commandByte, "UTF8"); 48 //組裝請求返回 49 request.setEncode(encode); 50 request.setCommand(command); 51 request.setCommandLength(commandLength); 52 } catch (Exception e) { 53 System.err.println(e.getMessage()); 54 } 55 return request; 56 } 57 58 public static Response readResponse(InputStream is) { 59 Response response = new Response(); 60 try { 61 byte [] encodeByte = new byte[1]; 62 is.read(encodeByte); 63 byte encode = encodeByte[0]; 64 byte [] responseLengthByte = new byte[4]; 65 is.read(responseLengthByte); 66 int commandLength = byte2Int(responseLengthByte); 67 byte [] responseByte = new byte[commandLength]; 68 is.read(responseByte); 69 String resContent = Encode.GBK.getValue() == encode ? new String(responseByte, "GBK") : new String(responseByte, "UTF8"); 70 response.setEncode(encode); 71 response.setResponse(resContent); 72 response.setResponseLength(commandLength); 73 } catch (Exception e) { 74 System.err.println(e.getMessage()); 75 } 76 return response; 77 } 78 79 public static int byte2Int(byte [] bytes) { 80 int num = bytes[3] & 0xFF; 81 num |= ((bytes[2] << 8) & 0xFF00); 82 num |= ((bytes[1] << 16) & 0xFF0000); 83 num |= ((bytes[0] << 24) & 0xFF000000); 84 return num; 85 } 86 87 public static byte[] int2ByteArray(int i) { 88 byte [] result = new byte[4]; 89 result[0] = (byte) ((i >> 24) & 0xFF); 90 result[1] = (byte) ((i >> 16) & 0xFF); 91 result[2] = (byte) ((i >> 8) & 0xFF); 92 result[3] = (byte) (i & 0xFF); 93 return result; 94 } 95 96 }
1.2.2 HTTP請求與響應
下圖是HTTP請求與響應的過程步驟,在此不詳細贅述。
1.2.3 通過HttpClient發送HTTP請求
HttpClient對HTTP協議通信的過程進行了封裝,下麵是簡單的通過HttpClient發送HTTP GET請求,並獲取服務端響應的代碼:
1 //url前加上http協議頭,標明該請求為http請求 2 String url = "https://www.baidu.com"; 3 //組裝請求 4 HttpClient httpClient = new DefaultHttpClient(); 5 HttpGet httpGet = new HttpGet(url); 6 //接收響應 7 HttpResponse response = httpClient.execute(httpGet); 8 HttpEntity entity = response.getEntity(); 9 byte[] byteArray = EntityUtils.toByteArray(entity); 10 String result = new String(byteArray, "utf8"); 11 System.out.println(result);
1.2.4 使用HTTP協議的優勢
隨著請求規模的擴展,基於TCP協議的RPC的實現,需要考慮多線程併發、鎖、I/O等複雜的底層細節,在大流量高併發的壓力下,任何一個小的錯誤都可能被無限放大,最終導致程式宕機。而對於基於HTTP協議的實現來說,很多成熟的開源web容易已經幫其處理好了這些事情,如Apache,Tomcat,Jboss等,開發人員可將更多的精力集中在業務實現上,而非處理底層細節。