第五章 服務熔斷(hystrix)+ retrofit底層通信(AsyncHttpclient)

来源:http://www.cnblogs.com/java-zhao/archive/2016/08/03/5734835.html
-Advertisement-
Play Games

一、集群容錯 技術選型:hystrix。(就是上圖中熔斷器) 熔斷的作用: 第一個作用: 假設有兩台伺服器server1(假設可以處理的請求閾值是1W請求)和server2,在server1上註冊了三個服務service1、service2、service3,在server2上註冊了一個服務serv ...


 

一、集群容錯

技術選型:hystrix。(就是上圖中熔斷器

熔斷的作用

第一個作用:

假設有兩台伺服器server1(假設可以處理的請求閾值是1W請求)和server2,在server1上註冊了三個服務service1、service2、service3,在server2上註冊了一個服務service4,假設service4服務響應緩慢,service1調用service4時,一直在等待響應,那麼在高併發下,很快的server1處很快就會達到請求閾值(server1很快就會耗盡處理線程)之後可能宕機,這時候,不只是service1不再可用,server1上的service2和service3也不可用了。

如果我們引入了hystrix,那麼service1調用service4的時候,當發現service4超時,立即斷掉不再執行,執行getFallback邏輯。這樣的話,server1就不會耗盡處理線程,server1上的其他服務也是可用的。當然,這是在合理的配置了超時時間的情況下,如果超時時間設置的太長的話,還是會出現未引入hystrix之前的情況。

第二個作用:

當被調服務經常失敗,比如說在10min(可配)中之內調用了20次,失敗了15次(可配),那麼我們認為這個服務是失敗的,先關閉該服務,等一會兒後再自動重新啟動該服務!(這是真正的熔斷!)

 

二、實現

1、framework

1.1、pom.xml

 1         <!-- converter-jackson -->
 2         <dependency>
 3             <groupId>com.squareup.retrofit</groupId>
 4             <artifactId>converter-jackson</artifactId>
 5             <version>1.9.0</version>
 6         </dependency>
 7         <!-- async-http-client -->
 8         <dependency>
 9             <groupId>com.ning</groupId>
10             <artifactId>async-http-client</artifactId>
11             <version>1.9.31</version>
12         </dependency>
13 
14         <!-- hystrix -->
15         <dependency>
16             <groupId>com.netflix.hystrix</groupId>
17             <artifactId>hystrix-core</artifactId>
18             <version>1.5.3</version>
19         </dependency>
20         <dependency>
21             <groupId>com.netflix.hystrix</groupId>
22             <artifactId>hystrix-metrics-event-stream</artifactId>
23             <version>1.5.3</version>
24         </dependency>
View Code

說明:

  • 添加retrofit的Jackson轉換器,預設為GSON
  • 添加AsyncHttpClient
  • 添加hystrix及其metrics包(後者用於展示hystrix的圖表信息,以後會在優化部分完成)

1.2、服務通信(retrofit)+集群容錯(hystrix)

1.2.1、RestAdapterConfig

 1 package com.microservice.retrofit;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.stereotype.Component;
 5 
 6 import com.microservice.loadBalancer.MyLoadBalancer;
 7 import com.microservice.loadBalancer.ServerAddress;
 8 
 9 import retrofit.RestAdapter;
10 import retrofit.converter.JacksonConverter;
11 
12 @Component
13 public class RestAdapterConfig {
14 
15     @Autowired
16     private MyLoadBalancer myLoadBalancer;
17 
18     /**
19      * 負載均衡並且創建傳入的API介面實例
20      */
21     public <T> T create(Class<T> tclass, String serviceName) {
22         String commandGroupKey = tclass.getSimpleName();// 獲得簡單類名作為groupKey
23 
24         ServerAddress server = myLoadBalancer.chooseServer(serviceName);// 負載均衡
25         RestAdapter restAdapter = new RestAdapter.Builder()
26                                   .setConverter(new JacksonConverter())
27                                   .setErrorHandler(new MyErrorHandler())
28                                   .setClient(new MyHttpClient(server, commandGroupKey))
29                                   .setEndpoint("/").build();
30         T tclassInstance = restAdapter.create(tclass);
31         return tclassInstance;
32     }
33 }
View Code

說明:這裡我們定義了自己的retrofit.Client和自己的retrofit.ErrorHandler

1.2.2、MyHttpClient(自定義retrofit的Client)

 1 package com.microservice.retrofit;
 2 
 3 import java.io.IOException;
 4 
 5 import com.microservice.hystrix.HttpHystrixCommand;
 6 import com.microservice.loadBalancer.ServerAddress;
 7 import com.netflix.hystrix.HystrixCommand.Setter;
 8 import com.netflix.hystrix.HystrixCommandGroupKey;
 9 
10 import retrofit.client.Client;
11 import retrofit.client.Request;
12 import retrofit.client.Response;
13 
14 public class MyHttpClient implements Client {
15     private ServerAddress server;
16     private String        commandGroupKey;
17 
18     public MyHttpClient(ServerAddress server, String commandGroupKey) {
19         this.server = server;
20         this.commandGroupKey = commandGroupKey;
21     }
22 
23     @Override
24     public Response execute(Request request) throws IOException {
25         Setter setter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(commandGroupKey));
26         return new HttpHystrixCommand(setter, server, request).execute();// 同步執行
27     }
28 }
View Code

說明:在execute()中引入了hystrix

  • 定義了hystrix的commandGroupKey是服務名(eg.myserviceA,被調用服務名
  • 沒有定義commandKey(通常commandKey是服務的一個方法名,例如myserviceA的client的getProvinceByCityName),通常該方法名是被調用服務的client中的被調用方法名
  • 手動設置hystrix的屬性
    • setter.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(1000));
    • 實際上,直接配置在consul上就好了,根據上一節archaius的自動拉取配置,hystrix會自動從pollResult中取配置並設置到實例中去
  • 查看hystrix的屬性
    • command.getProperties().executionTimeoutInMilliseconds().get(),這裡的command就是下邊的HttpHystrixCommand實例

1.2.3、HttpHystrixCommand(hystrix核心類)

  1 package com.microservice.hystrix;
  2 
  3 import java.io.ByteArrayOutputStream;
  4 import java.io.IOException;
  5 import java.util.ArrayList;
  6 import java.util.List;
  7 import java.util.concurrent.Future;
  8 
  9 import org.apache.commons.lang3.StringUtils;
 10 import org.apache.tomcat.util.http.fileupload.IOUtils;
 11 import org.slf4j.Logger;
 12 import org.slf4j.LoggerFactory;
 13 
 14 import com.microservice.loadBalancer.ServerAddress;
 15 import com.netflix.hystrix.HystrixCommand;
 16 import com.ning.http.client.AsyncHttpClient;
 17 import com.ning.http.client.FluentCaseInsensitiveStringsMap;
 18 import com.ning.http.client.RequestBuilder;
 19 
 20 import retrofit.client.Header;
 21 import retrofit.client.Request;
 22 import retrofit.client.Response;
 23 import retrofit.mime.TypedByteArray;
 24 import retrofit.mime.TypedOutput;
 25 
 26 public class HttpHystrixCommand extends HystrixCommand<Response> {
 27     private static final Logger LOGGER = LoggerFactory.getLogger(HttpHystrixCommand.class);
 28 
 29     private ServerAddress       server;
 30     private Request             request;
 31     private String              requestUrl;
 32     private AsyncHttpClient     asyncHttpClient;
 33 
 34     public HttpHystrixCommand(Setter setter, ServerAddress server, Request request) {
 35         super(setter);
 36         this.server = server;
 37         this.request = request;
 38 
 39         //        AsyncHttpClientConfig.Builder builder = new AsyncHttpClientConfig.Builder().setRequestTimeout(5000);//5s
 40         //        this.asyncHttpClient = new AsyncHttpClient(builder.build());
 41         this.asyncHttpClient = new AsyncHttpClient();
 42     }
 43 
 44     @Override
 45     public Response run() throws Exception {
 46         com.ning.http.client.Request asyncReq = retroReq2asyncReq(request, server);
 47         Future<com.ning.http.client.Response> asyncResFuture = asyncHttpClient.executeRequest(asyncReq);
 48         com.ning.http.client.Response asyncRes = asyncResFuture.get();
 49         return asynRes2RetroRes(asyncRes);
 50     }
 51 
 52     /**
 53      * 1、設置方法請求類型,例如:GET/POST
 54      * 2、轉換請求頭header(包括mime。這個需要根據請求體的情況進行掌握)
 55      * 3、轉換請求體
 56      * 4、設置請求URL
 57      */
 58     public com.ning.http.client.Request retroReq2asyncReq(Request request, ServerAddress server) {
 59         RequestBuilder requestBuilder = new RequestBuilder(request.getMethod());//傳入方法請求類型,例如:GET/POST
 60         List<Header> headers = request.getHeaders();
 61         headers.forEach(x -> requestBuilder.addHeader(x.getName(), x.getValue()));
 62 
 63         if (request.getBody() != null) {
 64             String mimeType = StringUtils.EMPTY;
 65             if (StringUtils.isNotEmpty(mimeType)) {
 66                 requestBuilder.addHeader("Content-Type", mimeType);
 67             } else {
 68                 requestBuilder.addHeader("Content-Type", "application/json");
 69             }
 70 
 71             TypedOutput body = request.getBody();
 72             ByteArrayOutputStream outPutStream = new ByteArrayOutputStream();
 73             try {
 74                 body.writeTo(outPutStream);//將body內容寫入到ByteArrayOutputStream里
 75                 requestBuilder.setBody(outPutStream.toByteArray());
 76             } catch (IOException e) {
 77                 e.printStackTrace();
 78             } finally {
 79                 IOUtils.closeQuietly(outPutStream);
 80             }
 81         }
 82         String url = new StringBuilder("http://").append(server.getIp())
 83                                                  .append(":")
 84                                                  .append(server.getPort())
 85                                                  .append("/")
 86                                                  .append(request.getUrl()).toString();
 87         requestUrl = url;
 88         requestBuilder.setUrl(url);
 89         return requestBuilder.build();
 90     }
 91 
 92     public Response asynRes2RetroRes(com.ning.http.client.Response asyncRes) throws IOException {
 93         return new Response(asyncRes.getUri().toUrl(), 
 94                             asyncRes.getStatusCode(), 
 95                             asyncRes.getStatusText(),
 96                             getHeaders(asyncRes.getHeaders()),
 97                             new TypedByteArray(asyncRes.getContentType(), asyncRes.getResponseBodyAsBytes()));
 98     }
 99 
100     private List<Header> getHeaders(FluentCaseInsensitiveStringsMap asyncHeaders) {
101         List<Header> retrofitHeaders = new ArrayList<>();
102         asyncHeaders.keySet().forEach(key -> retrofitHeaders.add(new Header(key, asyncHeaders.getFirstValue(key))));
103         return retrofitHeaders;
104     }
105 
106     /**
107      * 超時後的一些操作,或者如果緩存中有信息,可以從緩存中拿一些,具體的要看業務,也可以打一些logger
108      */
109     @Override
110     public Response getFallback() {
111         LOGGER.error("請求超時了!requestUrl:'{}'", requestUrl);
112         /**
113          * 想要讓自定義的ErrorHandler起作用以及下邊的404和reason有意義,就一定要配置requestUrl和List<header>
114          * 其實這裡可以看做是定義自定義異常的狀態碼和狀態描述
115          * 其中狀態碼用於自定義異常中的判斷(見HystrixRuntimeException)
116          */
117         return new Response(requestUrl, 404, //定義狀態碼
118             "execute getFallback because execution timeout", //定義消息 
119             new ArrayList<Header>(), null);
120     }
121 }
View Code

說明:首先調用run(),run()失敗或超時候調用getFallback()

  • run()--這裡是一個定製口,我使用了AsyncHttpClient,還可以使用其他的網路調用工具,例如:okhttp
    • 首先將Retrofit的請求信息Request轉化為AsyncHttpClient的Request(在這裡調用了負載均衡,將請求負載到選出的一臺機器)
    • 之後調用AsyncHttpClient來進行真正的http調用,並返回AsyncHttpClient型的相應Response
    • 最後將AsyncHttpClient型的響應Response轉換為Retrofit型的Response
  • getFallback()
    • 直接拋異常是不行的(該介面不讓),只能採取以下的方式
    • 返回一個Response對象,該對象封裝了status是404+錯誤的原因reason+請求的url+相應的Header列表+響應體(這裡的status和reason會被用在ErrorHandler中去用於指定執行不同的邏輯,具體看下邊的MyErrorHandler)
    • 如果想讓MyErrorHandler起作用,Response對象必須有"請求的url+相應的Header列表",其中Header列表可以使一個空List實現類,但是不可為null
  • 在構建AsyncHttpClient實例時可以設置相關的http參數,例如:註釋部分的設置請求超時時間。
    • 值得註意的是我們在配置請求超時時間時,要結合hystrix的超時時間來設置,程式會以二者的最小值作為請求超時時間

1.2.4、MyErrorHandler(自定義retrofit的錯誤處理器)

 1 package com.microservice.retrofit;
 2 
 3 import com.microservice.exception.HystrixRuntimeException;
 4 
 5 import retrofit.ErrorHandler;
 6 import retrofit.RetrofitError;
 7 import retrofit.client.Response;
 8 
 9 public class MyErrorHandler implements ErrorHandler{
10     @Override
11     public Throwable handleError(RetrofitError cause) {
12         Response response = cause.getResponse();
13         /**
14          * 這裡是一個可以定製的地方,自己可以定義所有想要捕獲的異常
15          */
16         if(response!=null && response.getStatus()==404){
17             return new HystrixRuntimeException(cause);
18         }
19         return cause;
20     }
21 }
View Code

說明:當發生了retrofit.error時(不只是上邊的getFallback()返回的Response),我們可以在該ErrorHandler的handleError方法來進行相應Response的處理。這裡我們指定當404時返回一個自定義異常。

1.2.5、HystrixRuntimeException(自定義異常)

 1 package com.microservice.exception;
 2 
 3 /**
 4  * 自定義異常
 5  */
 6 public class HystrixRuntimeException extends RuntimeException {
 7     private static final long serialVersionUID = 8252124808929848902L;
 8 
 9     public HystrixRuntimeException(Throwable cause) {
10         super(cause);//只有這樣,才能將異常信息拋給客戶端
11     }
12 }
View Code

說明:自定義異常只能通過super()來向客戶端拋出自己指定的異常信息(上邊的Response的reason,但是拋到客戶端時還是一個500錯誤,因為run()錯誤或超時就是一個服務端錯誤)。

 

整個流程:

當myserviceB調用myserviceA的一個方法時,首先會執行自定義的MyHttpClient的execute()方法,在該execute()方法中我們執行了自定義的HttpHystrixCommand的execute()方法,此時就會執行執行HttpHystrixCommand的run()方法,如果該方法運行正常併在超時時間內返回數據,則調用結束。

如果run()方法調用失敗或該方法超時,就會直接運行HttpHystrixCommand的getFallback()方法。該方法返回一個retrofit.Response對象,該對象的status是404,錯誤信息也是自定義的。之後該對象會被包裝到RetrofitError對象中,之後RetrofitError對象會由MyErrorHandler的handleError()進行處理:從RetrofitError對象中先取出Response,之後根據該Response的status執行相應的操作,我們這裡對404的情況定義了一個自定義異常HystrixRuntimeException。

 註意點:

  • retrofit的Response最好不要是null
  • retrofit的Jackson轉換器無法轉化單純的String(因為Jackson轉換器會將一個json串轉化為json對象),這一點缺點可以看做沒有,因為我們的介面都是restful的,那麼我們都是使用json格式來通信的。 

 

三、配置與測試

1、配置

在consul上配置service/myserviceA/dev/config的配置內容和service/myserviceB/dev/config的內容。其中,myserviceB配置了hystrix的超時時間:

1 hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=1000
View Code

說明:關於hystrix的配置參數,查看http://www.cnblogs.com/java-zhao/p/5524584.html

2、測試

最後,啟動consul,啟動服務A和B,swagger測試就好了!!!(在測試過程中,可以動態的去改變consul中hystrix的超時時間值,來測試archaius的動態讀取)


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 較之傳統通過App.config和Web.config這兩個XML文件承載的配置系統,.NET Core採用的這個全新的配置模型的最大一個優勢就是針對多種不同配置源的支持。我們可以將記憶體變數、命令行參數、環境變數和物理文件作為原始配置數據的來源,如果採用物理文件作為配置源,我們可以選擇不同的格式(比... ...
  • 今天大家在群里大家非常熱鬧的討論像畫筆一樣慢慢畫出Path的這種效果該如何實現. 北京-LGL 博客號@ligl007發起了這個話題.然後各路高手踴躍發表意見.最後雷叔 上海-雷蒙 博客號@雷蒙之星 以一種巧妙的思路實現了這個效果 使大家受益匪淺 本來這篇博客應該由雷叔來寫的,奈何雷叔忙著寫教材,就 ...
  • 比如:要實現 http://localhost:60291/home/geta/1212.html 或者 .abc 任意擴展名 完成兩步即可。 第一步修改路由: 第二步:修改web.config 增加 <system.webServer>節點 增加如下: <modules runAllManaged ...
  • `dotnet pack` 命令生成項目並創建 NuGet 包。這個操作的結果是兩個 `nupkg` 擴展名的包。一個包含代碼,另一個包含調試符號。 該項目被依賴的 NuGet 包裝被添加到 nuspec 文件,因此,他們能夠在安裝包時得到解決。 ...
  • 1. 按照官方頁面進行安裝 https://www.microsoft.com/net/core#macos 2. 在運行"brew link --force openssl" 時會出錯,這是一個known issue,可以先跳過此步驟,進行下一步,安裝.NET Core SDK 3. 然後參考此頁 ...
  • NSoup是一個開源框架,是JSoup(Java)的.NET移植版本 1、直接用起來 NSoup的強大之處在於可以用類似js的方法來獲取節點元素 通過元素類型獲取元素GetElementByTag("p") 2、做了一個winform的小demo 關鍵代碼: 下麵以提取一個html代碼中211大學名 ...
  • Redis是一個用的比較廣泛的Key/Value的記憶體資料庫,也是最快的key-value分散式緩存之一。 Redis官網:http://redis.io/ Redis快速入門教程:http://www.yiibai.com/redis/redis_quick_guide.html 解壓版下載地址: ...
  • 度娘的思路 Action() { //實現1-2+3-4+5……+99-100的值 //20160802 int i,tag,sum; sum=0; tag=1; for(i=1;i<=100;i++) { sum+=i*tag; tag=-tag; } lr_output_message ("%d ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...