.NET Core + gRPC 實現數據串流 (Streaming)

来源:https://www.cnblogs.com/hez2010/archive/2019/01/19/10293331.html
-Advertisement-
Play Games

引入 gRPC 是谷歌推出的一個高性能優秀的 RPC 框架,基於 HTTP/2 實現。並且該框架對 .NET Core 有著優秀的支持。最近在做一個項目正好用到了 gRPC,遇到了需要串流傳輸的問題。 引入 項目創建 首先還是需要安裝 .net core sdk,可以去 http://dot.net ...


引入

gRPC 是谷歌推出的一個高性能優秀的 RPC 框架,基於 HTTP/2 實現。並且該框架對 .NET Core 有著優秀的支持。
最近在做一個項目正好用到了 gRPC,遇到了需要串流傳輸的問題。

項目創建

首先還是需要安裝 .net core sdk,可以去 http://dot.net 下載。這裡我使用的是 2.2.103 版本的 sdk。
mkdir RpcStreaming
cd RpcStreaming
dotnet new console
dotnet add package Grpc // 添加 gRPC 包
dotnet add package Grpc.Tools // 添加 gRPC 工具包
dotnet add package Google.Protobuf // 添加 Protobuf 支持

然後為了支持 protobuf 語言,我們需要修改項目配置文件,在項目中引入 .proto 文件以便生成對應的代碼。

在 RpcStreaming.csproj 中,加入<Protobuf Include="**/*.proto" />,除此之外還需要啟用最新語言支持(C# 7.3),方便我們將 Main 函數直接寫為 async 函數,直接設置為最新版本的語言即可,如下所示:
<Project Sdk="Microsoft.NET.Sdk">
  ...
  <PropertyGroup>
    ...
    <LangVersion>latest</LangVersion>
    ...
  </PropertyGroup>
  
  <ItemGroup>
    ...
    <Protobuf Include="**/*.proto" />
    ...
  </ItemGroup>
  ...
</Project>

這裡我們使用了 wildcard 語法匹配了項目內的全部 proto 文件用於生成對應的代碼。

到這裡,項目的創建就完成了。

編寫 Proto 文件

我們在項目目錄下建立一個 .proto 文件,用於描述 rpc 調用和消息類型。比如:RpcStreaming.proto
內容如下:
 1 synatx = "proto3";
 2 service RpcStreamingService {
 3   rpc GetStreamContent (StreamRequest) returns (stream StreamContent) {}
 4 }
 5 message StreamRequest {
 6   string fileName = 1;
 7 }
 8 message StreamContent {
 9   bytes content = 1;
10 }

做 RPC 請求時,我們向 RPC 伺服器發送一個 StreamRequest 的 message,其中包含了文件路徑;為了讓伺服器以流式傳輸數據,我們在 returns 內加一個 “stream”。

保存後,我們執行一次 dotnet build,這樣就會在 ./obj/Debug/netcoreapp2.2下自動生成 RPC 調用和消息類型的代碼。

編寫 Server 端代碼

為了編寫 RPC 調用服務端代碼,我們需要重寫自動生成的 C# 虛函數。
首先我們進入 ./obj/Debug/netcoreapp2.2 看看自動生成了什麼代碼。
RpcStreaming.cs 中包含消息類型的定義,RpcStreamingGrpc.cs 中包含了對應 rpc 調用的函數原型。
我們查找一下我們剛剛在 proto 文件中聲明的 GetStreamContent。
可以在裡面找到一個上方文檔註釋為 “Base class for server-side implementations RpcStreamingServiceBase” 的抽象類 RpcStreamingServiceBase,裡面包含了我們要找的東西。 可以找到我們的 GetStreamContent 的預設實現:
public virtual global::System.Threading.Tasks.Task GetStreamContent(global::StreamRequest request, grpc::IServerStreamWriter<global::StreamContent> responseStream, grpc::ServerCallContext context)
{
    throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
}

這樣就簡單了,我們新建一個類 RpcServiceImpl,繼承 RpcStreamingService.RpcStreamingServiceBase,然後實現對應的方法即可。

為了串流,我們需要將數據流不斷寫入 response,這裡給一個簡單的示例。
 1 using System;
 2 using System.IO;
 3 using System.Threading.Tasks;
 4 using Google.Protobuf;
 5 using Grpc.Core;
 6 namespace RpcStreaming
 7 {
 8     public class RpcStreamingServiceImpl : RpcStreamingService.RpcStreamingServiceBase
 9     {
10         public override Task GetStreamContent(StreamRequest request, IServerStreamWriter<StreamContent> response, ServerCallContext context)
11         {
12             return Task.Run(async () =>
13             {
14                 using (var fs = File.Open(request.FileName, FileMode.Open)) // 從 request 中讀取文件名並打開文件流
15           {
16                     var remainingLength = fs.Length; // 剩餘長度
17               var buff = new byte[1048576]; // 緩衝區,這裡我們設置為 1 Mb
18               while (remainingLength > 0) // 若未讀完則繼續讀取
19               {
20                         var len = await fs.ReadAsync(buff); // 非同步從文件中讀取數據到緩衝區中
21                   remainingLength -= len; // 剩餘長度減去剛纔實際讀取的長度
22 
23                   // 向流中寫入我們剛剛讀取的數據
24                   await response.WriteAsync(new StreamContent
25                         {
26                             Content = ByteString.CopyFrom(buff, 0, len)
27                         });
28                     }
29                 }
30             });
31         }
32     }
33 }

 

啟動 RPC Server

首先需要:
1 using Google.Protobuf;
2 using Grpc.Core;

然後我們在 Main 函數中構建並啟動 RPC Server,監聽 localhost:23333

1 new Server
2 {
3     Services = { RpcStreamingService.BindService(new RpcStreamingServiceImpl()) }, // 綁定我們的實現
4     Ports = { new ServerPort("localhost", 23333, ServerCredentials.Insecure) }
5 }.Start();
6 Console.ReadKey();

這樣服務端就構建完成了。

編寫客戶端調用 RPC API

方便起見,我們先將 Main 函數改寫為 async 函數。
1 // 原來的 Main 函數
2 static void Main(string[] args) { ... }
3 // 改寫後的 Main 函數
4 static async Task Main(string[] args) { ... }

另外,還需要:

1 using System;
2 using System.IO;
3 using System.Threading.Tasks;
4 using Google.Protobuf;
5 using Grpc.Core;

然後我們在 Main 函數中添加調用代碼:

 1 var channel = new Channel("localhost:23333", ChannelCredentials.Insecure); // 建立到 localhost:23333 的 channel
 2 var client = new RpcStreamingService.RpcStreamingServiceClient(channel); // 建立 client
 3 // 調用 RPC API
 4 var result = client.GetStreamContent(new StreamRequest { FileName = "你想獲取的文件路徑" });
 5 var iter = result.ResponseStream; // 拿到響應流
 6 using (var fs = new FileStream("寫獲取的數據的文件路徑", FileMode.Create)) // 新建一個文件流用於存放我們獲取到數據
 7 {
 8     while (await iter.MoveNext()) // 迭代
 9     {
10         iter.Current.Content.WriteTo(fs); // 將數據寫入到文件流中
11     }
12 }

測試

dotnet run

會發現,我們想要獲取的文件的數據被不斷地寫到我們指定的文件中,每次 1 Mb。在我的電腦上測試,內網的環境下傳輸速度大概 80~90 Mb/s,幾乎跑滿了我的千兆網卡,速度非常理想。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • beego 框架 cache 目錄是 Go 實現的一個緩存管理器,是beego框架自帶工具之一,當然,如果你只想使用 cache 而不是整個 beego 框架,可以選擇性安裝: go get github.com/astaxie/beego/cache 在我寫這篇博文時, beego 版本是 v1. ...
  • IDE進行Gradle操作,那麼還需要設置IDE的參數。例如在IDEA中,需要打開File->Other Settings->Default Settings->Gradle,在Gradle Vm Options中設置-Dfile.encoding=utf-8。這樣IDEA中的Gradle也可以正確 ...
  • GitHub RSA密碼 RSA密碼是1978年美國麻省理工學院三位密碼學者R.L.Rivest、A.Shamir和L.Adleman提出的一種基於大合數因數分解困難性的公開密鑰密碼。由於RSA密碼既可用於加密,又可用於數字簽名,通俗易懂,因此RSA密碼已成為目前應用最廣泛的公開密鑰密碼。 RSA加 ...
  • 首先需要安裝最新的python:安裝步驟見:https://www.cnblogs.com/weven/p/7252917.html 其次下載python源碼: 鏈接:https://pan.baidu.com/s/1UZmMEjt5nc7clMtatgLwzA 提取碼:qatx 然後就開始以下步驟 ...
  • 2019-01-19 多重背包:每種東西有多個,因此可以把它拆分成多個01背包 優化:二進位拆分(拆成1+2+4+8+16+...,分別表示2的n次冪) 比如18=1+2+4+8+3,可以證明18以內的任何數都可以用這幾個數的和或差表示(每個數只能用一次)(0時則背包為空), 所以就把2個,4個.. ...
  • 最近想要做一個小東西,用到了下麵幾個中間件或者環境: Java Tomcat Maven MongoDB ZooKeeper Node 並且恰好碰到騰訊雲打折,雲主機原價100多一個月,花了30塊錢買了三個月。買下後立即動手準備開始環境配置。 說到環境,少則2小時,多則兩三天可能都要整矇蔽,環境好了 ...
  • 寫之前的說明 0. 其實吧。 這個東西已經寫好了,地址在:https://github.com/hjx601496320/JdbcPlus 這 系列 文章算是我寫的過程的總結吧。(恩系列,說明我可能會寫好久,╮(╯▽╰)╭) 1. 現在有很多的現成的orm框架,為什麼還要自己寫一個? 框架這種東西個 ...
  • 開始研究動態代理之前 先簡要談下動態代理的概念 在不改變原有類結構的前提下增強類的功能以及對類原有方法操作,註意是方法不是屬性(屬性一般被設計為private修飾,不可以直接被調用) 動態代理的基本實例不做闡述,網上一大把 不理解的同學可以直接去搜索。 今天說的是自己在項目中遇到的一個實際的動態代理 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...