一、集群容錯 技術選型:hystrix。(就是上圖中熔斷器) 熔斷的作用: 第一個作用: 假設有兩台伺服器server1(假設可以處理的請求閾值是1W請求)和server2,在server1上註冊了三個服務service1、service2、service3,在server2上註冊了一個服務serv ...
一、集群容錯
技術選型:hystrix。(就是上圖中熔斷器)
熔斷的作用:
第一個作用:
假設有兩台伺服器server1(假設可以處理的請求閾值是1W請求)和server2,在server1上註冊了三個服務service1、service2、service3,在server2上註冊了一個服務service4,假設service4服務響應緩慢,service1調用service4時,一直在等待響應,那麼在高併發下,很快的server1處很快就會達到請求閾值(server1很快就會耗盡處理線程)之後可能宕機,這時候,不只是service1不再可用,server1上的service2和service3也不可用了。
如果我們引入了hystrix,那麼service1調用service4的時候,當發現service4超時,立即斷掉不再執行,執行getFallback邏輯。這樣的話,server1就不會耗盡處理線程,server1上的其他服務也是可用的。當然,這是在合理的配置了超時時間的情況下,如果超時時間設置的太長的話,還是會出現未引入hystrix之前的情況。
第二個作用:
當被調服務經常失敗,比如說在10min(可配)中之內調用了20次,失敗了15次(可配),那麼我們認為這個服務是失敗的,先關閉該服務,等一會兒後再自動重新啟動該服務!(這是真正的熔斷!)
二、實現
1、framework
1.1、pom.xml
1 <!-- converter-jackson --> 2 <dependency> 3 <groupId>com.squareup.retrofit</groupId> 4 <artifactId>converter-jackson</artifactId> 5 <version>1.9.0</version> 6 </dependency> 7 <!-- async-http-client --> 8 <dependency> 9 <groupId>com.ning</groupId> 10 <artifactId>async-http-client</artifactId> 11 <version>1.9.31</version> 12 </dependency> 13 14 <!-- hystrix --> 15 <dependency> 16 <groupId>com.netflix.hystrix</groupId> 17 <artifactId>hystrix-core</artifactId> 18 <version>1.5.3</version> 19 </dependency> 20 <dependency> 21 <groupId>com.netflix.hystrix</groupId> 22 <artifactId>hystrix-metrics-event-stream</artifactId> 23 <version>1.5.3</version> 24 </dependency>View Code
說明:
- 添加retrofit的Jackson轉換器,預設為GSON
- 添加AsyncHttpClient
- 添加hystrix及其metrics包(後者用於展示hystrix的圖表信息,以後會在優化部分完成)
1.2、服務通信(retrofit)+集群容錯(hystrix)
1.2.1、RestAdapterConfig
1 package com.microservice.retrofit; 2 3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.stereotype.Component; 5 6 import com.microservice.loadBalancer.MyLoadBalancer; 7 import com.microservice.loadBalancer.ServerAddress; 8 9 import retrofit.RestAdapter; 10 import retrofit.converter.JacksonConverter; 11 12 @Component 13 public class RestAdapterConfig { 14 15 @Autowired 16 private MyLoadBalancer myLoadBalancer; 17 18 /** 19 * 負載均衡並且創建傳入的API介面實例 20 */ 21 public <T> T create(Class<T> tclass, String serviceName) { 22 String commandGroupKey = tclass.getSimpleName();// 獲得簡單類名作為groupKey 23 24 ServerAddress server = myLoadBalancer.chooseServer(serviceName);// 負載均衡 25 RestAdapter restAdapter = new RestAdapter.Builder() 26 .setConverter(new JacksonConverter()) 27 .setErrorHandler(new MyErrorHandler()) 28 .setClient(new MyHttpClient(server, commandGroupKey)) 29 .setEndpoint("/").build(); 30 T tclassInstance = restAdapter.create(tclass); 31 return tclassInstance; 32 } 33 }View Code
說明:這裡我們定義了自己的retrofit.Client和自己的retrofit.ErrorHandler
1.2.2、MyHttpClient(自定義retrofit的Client)
1 package com.microservice.retrofit; 2 3 import java.io.IOException; 4 5 import com.microservice.hystrix.HttpHystrixCommand; 6 import com.microservice.loadBalancer.ServerAddress; 7 import com.netflix.hystrix.HystrixCommand.Setter; 8 import com.netflix.hystrix.HystrixCommandGroupKey; 9 10 import retrofit.client.Client; 11 import retrofit.client.Request; 12 import retrofit.client.Response; 13 14 public class MyHttpClient implements Client { 15 private ServerAddress server; 16 private String commandGroupKey; 17 18 public MyHttpClient(ServerAddress server, String commandGroupKey) { 19 this.server = server; 20 this.commandGroupKey = commandGroupKey; 21 } 22 23 @Override 24 public Response execute(Request request) throws IOException { 25 Setter setter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(commandGroupKey)); 26 return new HttpHystrixCommand(setter, server, request).execute();// 同步執行 27 } 28 }View Code
說明:在execute()中引入了hystrix
- 定義了hystrix的commandGroupKey是服務名(eg.myserviceA,被調用服務名)
- 沒有定義commandKey(通常commandKey是服務的一個方法名,例如myserviceA的client的getProvinceByCityName),通常該方法名是被調用服務的client中的被調用方法名
- 手動設置hystrix的屬性
- setter.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(1000));
- 實際上,直接配置在consul上就好了,根據上一節archaius的自動拉取配置,hystrix會自動從pollResult中取配置並設置到實例中去。
- 查看hystrix的屬性
- command.getProperties().executionTimeoutInMilliseconds().get(),這裡的command就是下邊的HttpHystrixCommand實例
1.2.3、HttpHystrixCommand(hystrix核心類)
1 package com.microservice.hystrix; 2 3 import java.io.ByteArrayOutputStream; 4 import java.io.IOException; 5 import java.util.ArrayList; 6 import java.util.List; 7 import java.util.concurrent.Future; 8 9 import org.apache.commons.lang3.StringUtils; 10 import org.apache.tomcat.util.http.fileupload.IOUtils; 11 import org.slf4j.Logger; 12 import org.slf4j.LoggerFactory; 13 14 import com.microservice.loadBalancer.ServerAddress; 15 import com.netflix.hystrix.HystrixCommand; 16 import com.ning.http.client.AsyncHttpClient; 17 import com.ning.http.client.FluentCaseInsensitiveStringsMap; 18 import com.ning.http.client.RequestBuilder; 19 20 import retrofit.client.Header; 21 import retrofit.client.Request; 22 import retrofit.client.Response; 23 import retrofit.mime.TypedByteArray; 24 import retrofit.mime.TypedOutput; 25 26 public class HttpHystrixCommand extends HystrixCommand<Response> { 27 private static final Logger LOGGER = LoggerFactory.getLogger(HttpHystrixCommand.class); 28 29 private ServerAddress server; 30 private Request request; 31 private String requestUrl; 32 private AsyncHttpClient asyncHttpClient; 33 34 public HttpHystrixCommand(Setter setter, ServerAddress server, Request request) { 35 super(setter); 36 this.server = server; 37 this.request = request; 38 39 // AsyncHttpClientConfig.Builder builder = new AsyncHttpClientConfig.Builder().setRequestTimeout(5000);//5s 40 // this.asyncHttpClient = new AsyncHttpClient(builder.build()); 41 this.asyncHttpClient = new AsyncHttpClient(); 42 } 43 44 @Override 45 public Response run() throws Exception { 46 com.ning.http.client.Request asyncReq = retroReq2asyncReq(request, server); 47 Future<com.ning.http.client.Response> asyncResFuture = asyncHttpClient.executeRequest(asyncReq); 48 com.ning.http.client.Response asyncRes = asyncResFuture.get(); 49 return asynRes2RetroRes(asyncRes); 50 } 51 52 /** 53 * 1、設置方法請求類型,例如:GET/POST 54 * 2、轉換請求頭header(包括mime。這個需要根據請求體的情況進行掌握) 55 * 3、轉換請求體 56 * 4、設置請求URL 57 */ 58 public com.ning.http.client.Request retroReq2asyncReq(Request request, ServerAddress server) { 59 RequestBuilder requestBuilder = new RequestBuilder(request.getMethod());//傳入方法請求類型,例如:GET/POST 60 List<Header> headers = request.getHeaders(); 61 headers.forEach(x -> requestBuilder.addHeader(x.getName(), x.getValue())); 62 63 if (request.getBody() != null) { 64 String mimeType = StringUtils.EMPTY; 65 if (StringUtils.isNotEmpty(mimeType)) { 66 requestBuilder.addHeader("Content-Type", mimeType); 67 } else { 68 requestBuilder.addHeader("Content-Type", "application/json"); 69 } 70 71 TypedOutput body = request.getBody(); 72 ByteArrayOutputStream outPutStream = new ByteArrayOutputStream(); 73 try { 74 body.writeTo(outPutStream);//將body內容寫入到ByteArrayOutputStream里 75 requestBuilder.setBody(outPutStream.toByteArray()); 76 } catch (IOException e) { 77 e.printStackTrace(); 78 } finally { 79 IOUtils.closeQuietly(outPutStream); 80 } 81 } 82 String url = new StringBuilder("http://").append(server.getIp()) 83 .append(":") 84 .append(server.getPort()) 85 .append("/") 86 .append(request.getUrl()).toString(); 87 requestUrl = url; 88 requestBuilder.setUrl(url); 89 return requestBuilder.build(); 90 } 91 92 public Response asynRes2RetroRes(com.ning.http.client.Response asyncRes) throws IOException { 93 return new Response(asyncRes.getUri().toUrl(), 94 asyncRes.getStatusCode(), 95 asyncRes.getStatusText(), 96 getHeaders(asyncRes.getHeaders()), 97 new TypedByteArray(asyncRes.getContentType(), asyncRes.getResponseBodyAsBytes())); 98 } 99 100 private List<Header> getHeaders(FluentCaseInsensitiveStringsMap asyncHeaders) { 101 List<Header> retrofitHeaders = new ArrayList<>(); 102 asyncHeaders.keySet().forEach(key -> retrofitHeaders.add(new Header(key, asyncHeaders.getFirstValue(key)))); 103 return retrofitHeaders; 104 } 105 106 /** 107 * 超時後的一些操作,或者如果緩存中有信息,可以從緩存中拿一些,具體的要看業務,也可以打一些logger 108 */ 109 @Override 110 public Response getFallback() { 111 LOGGER.error("請求超時了!requestUrl:'{}'", requestUrl); 112 /** 113 * 想要讓自定義的ErrorHandler起作用以及下邊的404和reason有意義,就一定要配置requestUrl和List<header> 114 * 其實這裡可以看做是定義自定義異常的狀態碼和狀態描述 115 * 其中狀態碼用於自定義異常中的判斷(見HystrixRuntimeException) 116 */ 117 return new Response(requestUrl, 404, //定義狀態碼 118 "execute getFallback because execution timeout", //定義消息 119 new ArrayList<Header>(), null); 120 } 121 }View Code
說明:首先調用run(),run()失敗或超時候調用getFallback()
- run()--這裡是一個定製口,我使用了AsyncHttpClient,還可以使用其他的網路調用工具,例如:okhttp
- 首先將Retrofit的請求信息Request轉化為AsyncHttpClient的Request(在這裡調用了負載均衡,將請求負載到選出的一臺機器)
- 之後調用AsyncHttpClient來進行真正的http調用,並返回AsyncHttpClient型的相應Response
- 最後將AsyncHttpClient型的響應Response轉換為Retrofit型的Response
- getFallback()
- 直接拋異常是不行的(該介面不讓),只能採取以下的方式
- 返回一個Response對象,該對象封裝了status是404+錯誤的原因reason+請求的url+相應的Header列表+響應體(這裡的status和reason會被用在ErrorHandler中去用於指定執行不同的邏輯,具體看下邊的MyErrorHandler)
- 如果想讓MyErrorHandler起作用,Response對象必須有"請求的url+相應的Header列表",其中Header列表可以使一個空List實現類,但是不可為null
- 在構建AsyncHttpClient實例時可以設置相關的http參數,例如:註釋部分的設置請求超時時間。
- 值得註意的是我們在配置請求超時時間時,要結合hystrix的超時時間來設置,程式會以二者的最小值作為請求超時時間
1.2.4、MyErrorHandler(自定義retrofit的錯誤處理器)
1 package com.microservice.retrofit; 2 3 import com.microservice.exception.HystrixRuntimeException; 4 5 import retrofit.ErrorHandler; 6 import retrofit.RetrofitError; 7 import retrofit.client.Response; 8 9 public class MyErrorHandler implements ErrorHandler{ 10 @Override 11 public Throwable handleError(RetrofitError cause) { 12 Response response = cause.getResponse(); 13 /** 14 * 這裡是一個可以定製的地方,自己可以定義所有想要捕獲的異常 15 */ 16 if(response!=null && response.getStatus()==404){ 17 return new HystrixRuntimeException(cause); 18 } 19 return cause; 20 } 21 }View Code
說明:當發生了retrofit.error時(不只是上邊的getFallback()返回的Response),我們可以在該ErrorHandler的handleError方法來進行相應Response的處理。這裡我們指定當404時返回一個自定義異常。
1.2.5、HystrixRuntimeException(自定義異常)
1 package com.microservice.exception; 2 3 /** 4 * 自定義異常 5 */ 6 public class HystrixRuntimeException extends RuntimeException { 7 private static final long serialVersionUID = 8252124808929848902L; 8 9 public HystrixRuntimeException(Throwable cause) { 10 super(cause);//只有這樣,才能將異常信息拋給客戶端 11 } 12 }View Code
說明:自定義異常只能通過super()來向客戶端拋出自己指定的異常信息(上邊的Response的reason,但是拋到客戶端時還是一個500錯誤,因為run()錯誤或超時就是一個服務端錯誤)。
整個流程:
當myserviceB調用myserviceA的一個方法時,首先會執行自定義的MyHttpClient的execute()方法,在該execute()方法中我們執行了自定義的HttpHystrixCommand的execute()方法,此時就會執行執行HttpHystrixCommand的run()方法,如果該方法運行正常併在超時時間內返回數據,則調用結束。
如果run()方法調用失敗或該方法超時,就會直接運行HttpHystrixCommand的getFallback()方法。該方法返回一個retrofit.Response對象,該對象的status是404,錯誤信息也是自定義的。之後該對象會被包裝到RetrofitError對象中,之後RetrofitError對象會由MyErrorHandler的handleError()進行處理:從RetrofitError對象中先取出Response,之後根據該Response的status執行相應的操作,我們這裡對404的情況定義了一個自定義異常HystrixRuntimeException。
註意點:
- retrofit的Response最好不要是null
- retrofit的Jackson轉換器無法轉化單純的String(因為Jackson轉換器會將一個json串轉化為json對象),這一點缺點可以看做沒有,因為我們的介面都是restful的,那麼我們都是使用json格式來通信的。
三、配置與測試
1、配置
在consul上配置service/myserviceA/dev/config的配置內容和service/myserviceB/dev/config的內容。其中,myserviceB配置了hystrix的超時時間:
1 hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=1000View Code
說明:關於hystrix的配置參數,查看http://www.cnblogs.com/java-zhao/p/5524584.html
2、測試
最後,啟動consul,啟動服務A和B,swagger測試就好了!!!(在測試過程中,可以動態的去改變consul中hystrix的超時時間值,來測試archaius的動態讀取)