" 【.NET Core項目實戰 統一認證平臺】開篇及目錄索引 " 一、什麼是RPC RPC是“遠程調用( Remote Procedure Call )”的一個名稱的縮寫,並不是任何規範化的協議,也不是大眾都認知的協議標準,我們更多時候使用時都是創建的自定義化(例如Socket,Netty)的消息 ...
一、什麼是RPC
RPC是“遠程調用(Remote Procedure Call)”的一個名稱的縮寫,並不是任何規範化的協議,也不是大眾都認知的協議標準,我們更多時候使用時都是創建的自定義化(例如Socket,Netty)的消息方式進行調用,相比http協議,我們省掉了不少http中無用的消息內容。因此很多系統內部調用仍然採用自定義化的RPC調用模式進行通信,畢竟速度和性能是內網的關鍵指標之一,而標準化和語義無關性在外網中舉足輕重。所以,為何API網關無法工作在RPC上,因為它沒有一個像HTTP/HTTPS那樣的通用標準。
二、CzarRpc簡介
CzarRpc是作者基於Dotnetty實現的RPC通訊框架,參考了Surging
和Tars.Net
優秀設計,目前正在內部使用中,下麵就CzarRpc調用方式做一個簡單介紹,測試結構如下:
1、服務介面
新建一個Czar.Rpc.Common
類庫,首先需要引用Czar.Rpc
Nuget包。
Install-Package Czar.Rpc
然後定義測試介面IHelloRpc.cs
,也是目前支持的調用方式。
using Czar.Rpc.Attributes;
using Czar.Rpc.Exceptions;
using Czar.Rpc.Metadata;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Czar.Rpc.Common
{
/// <summary>
/// 測試Rpc實體
/// </summary>
[BusinessExceptionInterceptor]
[CzarRpc("Demo.Rpc.Hello")]
public interface IHelloRpc: IRpcBaseService
{
string Hello(int no, string name);
void HelloHolder(int no, out string name);
Task<string> HelloTask(int no, string name);
ValueTask<string> HelloValueTask(int no, string name);
[CzarOneway]
void HelloOneway(int no, string name);
Task TestBusinessExceptionInterceptor();
DemoModel HelloModel(int D1, string D2, DateTime D3);
Task<DemoModel> HelloModelAsync(int D1, string D2, DateTime D3);
DemoModel HelloSendModel(DemoModel model);
DemoModel HelloSendModelParm(string name,DemoModel model);
List<DemoModel> HelloSendModelList(List<DemoModel> model);
}
public class DemoModel
{
/// <summary>
/// 測試1
/// </summary>
public int T1 { get; set; }
/// <summary>
/// 測試2
/// </summary>
public string T2 { get; set; }
/// <summary>
/// 測試3
/// </summary>
public DateTime T3 { get; set; }
public ChildModel Child { get; set; }
}
public class ChildModel
{
public string C1 { get; set; }
}
}
2.服務端
新建一個控制台程式Czar.Rpc.Server
,然後實現服務介面,因為都是測試數據,所以就隨意實現了方法。
HelloRpcServer.cs
using Czar.Rpc.Exceptions;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Linq;
using System.Net;
using Czar.Rpc.Common;
namespace Demo.Rpc.Server
{
public class HelloRpcServer: IHelloRpc
{
public EndPoint CzarEndPoint { get; set; }
public string Hello(int no, string name)
{
string result = $"{no}: Hi, {name}";
Console.WriteLine(result);
return result + " callback";
}
public void HelloHolder(int no, out string name)
{
name = no.ToString() + " out";
}
public void HelloOneway(int no, string name)
{
/*
耗時操作
*/
Console.WriteLine($"From oneway - {no}: Hi, {name}");
}
public Task<string> HelloTask(int no, string name)
{
return Task.FromResult(Hello(no, name));
}
public ValueTask<string> HelloValueTask(int no, string name)
{
return new ValueTask<string>(Hello(no, name));
}
public Task TestBusinessExceptionInterceptor()
{
throw new BusinessException()
{
CzarCode = "1",
CzarMessage = "test"
};
}
public DemoModel HelloModel(int D1, string D2, DateTime D3)
{
return new DemoModel()
{
T1 = D1 + 1,
T2 = D2 + "2",
T3 = D3.AddDays(1)
};
}
public async Task<DemoModel> HelloModelAsync(int D1, string D2, DateTime D3)
{
return await Task.FromResult(
new DemoModel()
{
T1 = D1 + 1,
T2 = D2 + "77777",
T3 = D3.AddDays(1)
}
);
}
public DemoModel HelloSendModel(DemoModel model)
{
model.T1 = model.T1 + 10;
model.T2 = model.T2 + "11";
model.T3 = model.T3.AddDays(12);
return model;
}
public DemoModel HelloSendModelParm(string name, DemoModel model)
{
model.T1 = model.T1 + 10;
model.T2 = model.T2 + "11";
model.T3 = model.T3.AddDays(12);
if (model.Child != null)
{
model.Child.C1 = name+"說:"+ model.Child.C1;
}
return model;
}
public List<DemoModel> HelloSendModelList(List<DemoModel> model)
{
return model.Select(t => new DemoModel() { T1=t.T1+10,T2=t.T2+"13",T3=t.T3.AddYears(1),Child=t.Child }).ToList();
}
}
}
然後啟動服務端監聽。
class Program
{
static void Main(string[] args)
{
var host = new HostBuilder()
.ConfigureHostConfiguration(i => i.AddJsonFile("CzarConfig.json"))
.ConfigureLogging((hostContext, configLogging) =>
{
configLogging.AddConsole();
})
.UseCodec<JsonCodec>()
.UseLibuvTcpHost()
.UseProxy()
.UseConsoleLifetime()
.Build();
host.RunAsync().Wait();
}
}
啟用外部使用CzarConfig.json的配置文件,註意需要設置成始終複製。
{
"CzarHost": {
"Port": 7711, //監聽埠
"QuietPeriodSeconds": 2, //退出靜默時間 DotNetty特性
"ShutdownTimeoutSeconds": 2, //關閉超時時間 DotNetty特性
"IsSsl": "false", //是否啟用 SSL, 客戶端需要保持一致
"PfxPath": "cert/datasync.pfx", //證書
"PfxPassword": "123456" //證書密鑰
}
}
到此伺服器端搭載完成。
3、客戶端
新建客戶端控制台程式Czar.Rpc.Client
,然後配置Rpc調用信息。
{
"CzarHost": {
"ProxyEndPoint": true, //是否啟用動態服務地址,就是指定服務端IP
"IsSsl": "false", //是否啟用SSL
"PfxPath": "cert/datasync.pfx", //證書
"PfxPassword": "123456", //證書密鑰
"ClientConfig": { //客戶端配置
"Demo.Rpc.Hello": { //對應服務[CzarRpc("Demo.Rpc.Hello")] 值
"Host": "127.0.0.1", //服務端IP 如果ProxyEndPoint=false 時使用
"Port": 7711, //服務端埠 如果ProxyEndPoint=false 時使用
"Timeout": 10, //調用超時時間
"WriterIdleTimeSeconds";30 //空閑超時時間,預設為30秒,非內網環境建議設置成5分鐘內。
}
}
}
}
現在開始啟用客戶端信息。
class Program
{
public static IServiceProvider service;
public static IConfiguration config;
static async Task Main(string[] args)
{
try
{
var builder = new ConfigurationBuilder();
config = builder.AddJsonFile("CzarConfig.json").Build();
service = new ServiceCollection()
.AddSingleton(config)
.AddLogging(j => j.AddConsole())
.AddLibuvTcpClient(config)
.AddProxy()
.BuildDynamicProxyServiceProvider();
var rpc = service.GetRequiredService<IHelloRpc>();
//使用的內部指定的伺服器地址
rpc.CzarEndPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 7711);
var result = string.Empty;
string t = "基本調用";
result = rpc.Hello(18, t);
Console.WriteLine(result);
result = "無返回結果";
rpc.HelloHolder(1, out result);
Console.WriteLine(result);
result = await rpc.HelloTask(2, "非同步任務");
Console.WriteLine(result);
result = "單向";
rpc.HelloOneway(3, "單向調用");
Console.WriteLine(result);
result = await rpc.HelloValueTask(4, "ValueTask任務");
Console.WriteLine(result);
var modelResult = rpc.HelloModel(5, "返回實體", DateTime.Now);
Console.WriteLine($"{modelResult.T1} {modelResult.T2} {modelResult.T3.ToLongDateString()}");
var modelResult1 = await rpc.HelloModelAsync(6, "返回Task實體", DateTime.Now);
Console.WriteLine($"{modelResult1.T1} {modelResult1.T2} {modelResult1.T3.ToLongDateString()}");
var mm = new DemoModel()
{
T1 = 7,
T2 = "傳實體返回實體",
T3 = DateTime.Now,
Child = new ChildModel()
{
C1 = "子類1"
}
};
var model2 = rpc.HelloSendModel(mm);
Console.WriteLine($"{model2.T1} {model2.T2} {model2.T3.ToLongDateString()} {model2.Child.C1}");
var list = new List<DemoModel>();
var mm1 = new DemoModel()
{
T1 = 8,
T2 = "傳List返回List",
T3 = DateTime.Now,
Child = new ChildModel()
{
C1 = "子類2"
}
};
var mm3 = new DemoModel()
{
T1 = 9,
T2 = "傳List返回List",
T3 = DateTime.Now,
Child = new ChildModel()
{
C1 = "子類3"
}
};
list.Add(mm1);
list.Add(mm3);
var list3 = rpc.HelloSendModelList(list);
Console.WriteLine($"{list3[0].T1} {list3[0].T2} {list3[0].T3.ToLongDateString()} {list3[0].Child?.C1}");
var mm4 = new DemoModel()
{
T1 = 9,
T2 = "HelloSendModelParm",
T3 = DateTime.Now,
Child = new ChildModel()
{
C1 = "子類4"
}
};
var dd = rpc.HelloSendModelParm("HelloSendModelParm", mm4);
Console.WriteLine($"{dd.T1} {dd.T2} {dd.T3.ToLongDateString()} {dd.Child.C1}");
//異常調用
await rpc.TestBusinessExceptionInterceptor();
}
catch (BusinessException e)
{
Console.WriteLine($"CzarCode:{e.CzarCode} CzarMessage:{e.CzarMessage}");
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
Console.ReadLine();
}
}
現在整個RPC調用搭建完畢,然後分別啟動伺服器端和客戶端,就可以看到屏幕輸出內容如下。
客戶端輸出:
伺服器端輸出:
至此整個CzarRpc的基本使用已經介紹完畢,感興趣的朋友可以自行測試。
三、Ocelot增加RPC支持
有了CzarRpc
的通訊框架後,現在在Ocelot
上實現Rpc
功能簡直易如反掌,現在開始添加我們的Rpc
中間件,也讓我們擴展的網關靈活起來。
還記得我介紹網關篇時添加中間件的步驟嗎?如果不記得的可以先回去回顧下。
首先如何讓網關知道這個後端調用是http
還是Rpc
呢?這時應該會想到Ocelot
路由配置里的DownstreamScheme
,可以在這裡判斷我們定義的是http
還是rpc
即可。同時我們希望之前定義的所有中間件都生效,最後一步請求時如果配置下端路由rpc
,使用rpc
調用,否則使用http
調用,這樣可以重覆利用之前所有的中間件功能,減少重覆開發。
在之前的開發的自定義限流和自定義授權中間件開發中,我們知道開發完的中間件放到哪裡使用,這裡就不介紹原理了,直接添加到BuildCzarOcelotPipeline
里如下代碼。
public static OcelotRequestDelegate BuildCzarOcelotPipeline(this IOcelotPipelineBuilder builder,
OcelotPipelineConfiguration pipelineConfiguration)
{
// 註冊一個全局異常
builder.UseExceptionHandlerMiddleware();
// 如果請求是websocket使用單獨的管道
builder.MapWhen(context => context.HttpContext.WebSockets.IsWebSocketRequest,
app =>
{
app.UseDownstreamRouteFinderMiddleware();
app.UseDownstreamRequestInitialiser();
app.UseLoadBalancingMiddleware();
app.UseDownstreamUrlCreatorMiddleware();
app.UseWebSocketsProxyMiddleware();
});
// 添加自定義的錯誤管道
builder.UseIfNotNull(pipelineConfiguration.PreErrorResponderMiddleware);
//使用自定義的輸出管道
builder.UseCzarResponderMiddleware();
// 下游路由匹配管道
builder.UseDownstreamRouteFinderMiddleware();
//增加自定義擴展管道
if (pipelineConfiguration.MapWhenOcelotPipeline != null)
{
foreach (var pipeline in pipelineConfiguration.MapWhenOcelotPipeline)
{
builder.MapWhen(pipeline);
}
}
// 使用Http頭部轉換管道
builder.UseHttpHeadersTransformationMiddleware();
// 初始化下游請求管道
builder.UseDownstreamRequestInitialiser();
// 使用自定義限流管道
builder.UseRateLimiting();
//使用請求ID生成管道
builder.UseRequestIdMiddleware();
//使用自定義授權前管道
builder.UseIfNotNull(pipelineConfiguration.PreAuthenticationMiddleware);
//根據請求判斷是否啟用授權來使用管道
if (pipelineConfiguration.AuthenticationMiddleware == null)
{
builder.UseAuthenticationMiddleware();
}
else
{
builder.Use(pipelineConfiguration.AuthenticationMiddleware);
}
//添加自定義限流中間件 2018-11-18 金焰的世界
builder.UseCzarClientRateLimitMiddleware();
//添加自定義授權中間件 2018-11-15 金焰的世界
builder.UseAhphAuthenticationMiddleware();
//啟用自定義的認證之前中間件
builder.UseIfNotNull(pipelineConfiguration.PreAuthorisationMiddleware);
//是否使用自定義的認證中間件
if (pipelineConfiguration.AuthorisationMiddleware == null)
{
builder.UseAuthorisationMiddleware();
}
else
{
builder.Use(pipelineConfiguration.AuthorisationMiddleware);
}
// 使用自定義的參數構建中間件
builder.UseIfNotNull(pipelineConfiguration.PreQueryStringBuilderMiddleware);
// 使用負載均衡中間件
builder.UseLoadBalancingMiddleware();
// 使用下游地址創建中間件
builder.UseDownstreamUrlCreatorMiddleware();
// 使用緩存中間件
builder.UseOutputCacheMiddleware();
//判斷下游的是否啟用rpc通信,切換到RPC處理
builder.MapWhen(context => context.DownstreamReRoute.DownstreamScheme.Equals("rpc", StringComparison.OrdinalIgnoreCase), app =>
{
app.UseCzarRpcMiddleware();
});
//使用下游請求中間件
builder.UseCzaHttpRequesterMiddleware();
return builder.Build();
}
這裡是在最後請求前判斷使用的下游請求方式,如果DownstreamScheme
使用的rpc
,就使用rpc
中間件處理。
Rpc處理的完整邏輯是,如何從http請求中獲取想要解析的參數,這裡需要設置匹配的優先順序,目前設計的優先順序為。
1、首先提取路由參數,如果匹配上就是用路由參數名稱為key,值為value,按順序組成第一批參數。
2、提取query參數,如有有值按順序組成第二批參數。
3、如果非Get請求,提取body內容,如果非空,組成第三批參數
4、從配置庫里提取rpc路由調用的服務名稱和函數名稱,以及是否單向調用。
5、按照獲取的數據進行rpc調用並等待返回。
看了上面的設計是不是思路很清晰了呢?
1、rpc路由表設計
CREATE TABLE AhphReRouteRpcConfig
(
RpcId int IDENTITY(1,1) NOT NULL,
ReRouteId int, //路由表主鍵
ServantName varchar(100) NOT NULL, //調用的服務名稱
FuncName varchar(100) NOT NULL, //調用的方法名稱
IsOneway bit NOT NULL //是否單向調用
)
2、提取遠程調用方法
根據上游路由獲取遠程調用的配置項目
public interface IRpcRepository
{
/// <summary>
/// 根據模板地址獲取RPC請求方法
/// </summary>
/// <param name="UpUrl">上游模板</param>
/// <returns></returns>
Task<RemoteInvokeMessage> GetRemoteMethodAsync(string UpUrl);
}
public class SqlServerRpcRepository : IRpcRepository
{
private readonly CzarOcelotConfiguration _option;
public SqlServerRpcRepository(CzarOcelotConfiguration option)
{
_option = option;
}
/// <summary>
/// 獲取RPC調用方法
/// </summary>
/// <param name="UpUrl"></param>
/// <returns></returns>
public async Task<RemoteInvokeMessage> GetRemoteMethodAsync(string UpUrl)
{
using (var connection = new SqlConnection(_option.DbConnectionStrings))
{
string sql = @"select T4.* from AhphGlobalConfiguration t1 inner join AhphConfigReRoutes T2 on
T1.AhphId=t2.AhphId inner join AhphReRoute T3 on T2.ReRouteId=T3.ReRouteId
INNER JOIN AhphReRouteRpcConfig T4 ON T3.ReRouteId=T4.ReRouteId
where IsDefault=1 and T1.InfoStatus=1 AND T3.InfoStatus=1 AND UpstreamPathTemplate=@URL";
var result = await connection.QueryFirstOrDefaultAsync<RemoteInvokeMessage>(sql, new { URL = UpUrl });
return result;
}
}
}
3、重寫返回結果
由於rpc調用後是返回的Json封裝的信息,需要解析成對應的HttpContent。
using System.IO;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
namespace Czar.Gateway.Rpc
{
public class RpcHttpContent : HttpContent
{
private string result;
public RpcHttpContent(string result)
{
this.result = result;
}
public RpcHttpContent(object result)
{
this.result = Newtonsoft.Json.JsonConvert.SerializeObject(result);
}
protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
var writer = new StreamWriter(stream);
await writer.WriteAsync(result);
await writer.FlushAsync();
}
protected override bool TryComputeLength(out long length)
{
length = result.Length;
return true;
}
}
}
4、rpc中間件邏輯處理
有了前面的準備信息,現在基本可以完成邏輯代碼的開發了,詳細的中間件代碼如下。
using Czar.Gateway.Errors;
using Czar.Rpc.Clients;
using Ocelot.Logging;
using Ocelot.Middleware;
using Ocelot.Responses;
using System.Collections.Generic;
using System.Net;
using System.Threading.Tasks;
namespace Czar.Gateway.Rpc.Middleware
{
public class CzarRpcMiddleware : OcelotMiddleware
{
private readonly OcelotRequestDelegate _next;
private readonly IRpcClientFactory _clientFactory;
private readonly ICzarRpcProcessor _czarRpcProcessor;
public CzarRpcMiddleware(OcelotRequestDelegate next, IRpcClientFactory clientFactory,
IOcelotLoggerFactory loggerFactory, ICzarRpcProcessor czarRpcProcessor) : base(loggerFactory.CreateLogger<CzarRpcMiddleware>())
{
_next = next;
_clientFactory = clientFactory;
_czarRpcProcessor = czarRpcProcessor;
}
public async Task Invoke(DownstreamContext context)
{
var httpStatusCode = HttpStatusCode.OK;
var _param = new List<object>();
//1、提取路由參數
var tmpInfo = context.TemplatePlaceholderNameAndValues;
if (tmpInfo != null && tmpInfo.Count > 0)
{
foreach (var tmp in tmpInfo)
{
_param.Add(tmp.Value);
}
}
//2、提取query參數
foreach (var _q in context.HttpContext.Request.Query)
{
_param.Add(_q.Value.ToString());
}
//3、從body里提取內容
if (context.HttpContext.Request.Method.ToUpper() != "GET")
{
context.DownstreamRequest.Scheme = "http";
var requert = context.DownstreamRequest.ToHttpRequestMessage();
if (requert.Content!=null)
{
var json = "{}";
json = await requert.Content.ReadAsStringAsync();
_param.Add(json);
}
}
//從緩存里提取
var req = await _czarRpcProcessor.GetRemoteMethodAsync(context.DownstreamReRoute.UpstreamPathTemplate.OriginalValue);
if (req != null)
{
req.Parameters = _param.ToArray();
var result = await _clientFactory.SendAsync(req, GetEndPoint(context.DownstreamRequest.Host, context.DownstreamRequest.Port));
OkResponse<RpcHttpContent> httpResponse;
if (result.CzarCode == Czar.Rpc.Utilitys.RpcStatusCode.Success)
{
httpResponse = new OkResponse<RpcHttpContent>(new RpcHttpContent(result.CzarResult?.ToString()));
}
else
{
httpResponse = new OkResponse<RpcHttpContent>(new RpcHttpContent(result));
}
context.HttpContext.Response.ContentType = "application/json";
context.DownstreamResponse = new DownstreamResponse(httpResponse.Data, httpStatusCode, httpResponse.Data.Headers, "OK");
}
else
{//輸出錯誤
var error = new InternalServerError($"請求路由 {context.HttpContext.Request.Path}未配置後端轉發");
Logger.LogWarning($"{error}");
SetPipelineError(context, error);
}
}
private EndPoint GetEndPoint(string ipaddress, int port)
{
if (IPAddress.TryParse(ipaddress, out IPAddress ip))
{
return new IPEndPoint(ip, port);
}
else
{
return new DnsEndPoint(ipaddress, port);
}
}
}
}
5、啟動Rpc客戶端配置
目前Rpc的客戶端配置我們還沒啟動,只需要在AddCzarOcelot
中添加相關註入即可。
var service = builder.First(x => x.ServiceType == typeof(IConfiguration));
var configuration = (IConfiguration)service.ImplementationInstance;
//Rpc應用
builder.AddSingleton<ICzarRpcProcessor, CzarRpcProcessor>();
builder.AddSingleton<IRpcRepository, SqlServerRpcRepository>();
builder.AddLibuvTcpClient(configuration);
6、配置客戶端
最後別忘了配置Rpc客戶端信息是否啟用證書信息,為了配置信息的內容。
{
"CzarHost": {
"ProxyEndPoint": true,
"IsSsl": "false",
"PfxPath": "cert/datasync.pfx",
"PfxPassword": "bl123456",
"ClientConfig": {
"Demo.Rpc.Hello": {
"Host": "127.0.0.1",
"Port": 7711,
"Timeout": 20
}
}
}
}
現在讓網關集成Rpc功能全部配置完畢。
四、網關Rpc功能測試
本次測試我在原有的網關基礎上,增加不同類型的Rpc調用,就按照不同維度測試Rpc調用功能,本次測試案例是建立在Czar.Rpc 服務端基礎上,正好可以測試。
1、測試路由參數
請求路徑/hello/{no}/{name}
,調用的服務端方法Hello
,傳入的兩個參數分別是no ,name
。
可以在伺服器端添加斷點調試,發現確實接收到請求信息,並正常返回,下麵是PostMan
測試結果。
2、使用Query方式傳遞參數
請求路徑/rpc/query
,調用的服務端方法還是Hello
,參數分別是no ,name
。
3、使用Post方式傳遞Json
請求路徑/rpc/body
,調用的伺服器方法是HelloSendModel
。
4、混合參數使用
請求的路徑/rpc/bodyparm/{name}
,調用的伺服器端方法是HelloSendModelParm
。
所有的返回結果可自行調試測試,發現都能達到預期結果。
同時此網關還是支持預設的http請求的,這裡就不一一測試了。
五、總結
本篇我介紹了什麼是Rpc,以及Czar.Rpc的基本使用,然後使用Czar.Rpc框架集成到我們基於Ocelot擴展網關中,並實現了不能方式的Rpc調用,可以在幾乎不改變現有流程的情況下很快速的集成進去,這也是Ocelot開發框架的魅力所在。
如果在使用過程中有什麼問題或建議,可以在.NET Core項目實戰交流群(637326624)
中聯繫作者。
最後本文涉及的所有的源代碼可在https://github.com/jinyancao/czar.gateway中下載預覽。