文盤Rust -- tonic-Rust grpc初體驗

来源:https://www.cnblogs.com/Jcloud/archive/2023/09/18/17710909.html
-Advertisement-
Play Games

gRPC 是開發中常用的開源高性能遠程過程調用(RPC)框架,tonic 是基於 HTTP/2 的 gRPC 實現,專註於高性能、互操作性和靈活性。該庫的創建是為了對 async/await 提供一流的支持,並充當用 Rust 編寫的生產系統的核心構建塊。今天我們聊聊通過使用tonic 調用grpc... ...


gRPC 是開發中常用的開源高性能遠程過程調用(RPC)框架,tonic 是基於 HTTP/2 的 gRPC 實現,專註於高性能、互操作性和靈活性。該庫的創建是為了對 async/await 提供一流的支持,並充當用 Rust 編寫的生產系統的核心構建塊。今天我們聊聊通過使用tonic 調用grpc的的具體過程。

工程規劃

rpc程式一般包含server端和client端,為了方便我們把兩個程式打包到一個工程裡面 新建tonic_sample工程

cargo new tonic_sample


Cargo.toml 如下

[package]
name = "tonic_sample"
version = "0.1.0"
edition = "2021"

[[bin]] # Bin to run the gRPC server
name = "stream-server"
path = "src/stream_server.rs"

[[bin]] # Bin to run the gRPC client
name = "stream-client"
path = "src/stream_client.rs"


[dependencies]
tokio.workspace = true
tonic = "0.9"
tonic-reflection = "0.9.2"
prost = "0.11"
tokio-stream = "0.1"
async-stream = "0.2"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
rand = "0.7"
h2 = { version = "0.3" }
anyhow = "1.0.75"
futures-util = "0.3.28"

[build-dependencies]
tonic-build = "0.9"


tonic 的示例代碼還是比較齊全的,本次我們參考 tonic 的 streaming example

首先編寫 proto 文件,用來描述報文。 proto/echo.proto

syntax = "proto3";

package stream;

// EchoRequest is the request for echo.
message EchoRequest { string message = 1; }

// EchoResponse is the response for echo.
message EchoResponse { string message = 1; }

// Echo is the echo service.
service Echo {
  // UnaryEcho is unary echo.
  rpc UnaryEcho(EchoRequest) returns (EchoResponse) {}
  // ServerStreamingEcho is server side streaming.
  rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {}
  // ClientStreamingEcho is client side streaming.
  rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {}
  // BidirectionalStreamingEcho is bidi streaming.
  rpc BidirectionalStreamingEcho(stream EchoRequest)
      returns (stream EchoResponse) {}
}


文件並不複雜,只有兩個 message 一個請求一個返回,之所以選擇這個示例是因為該示例包含了rpc中的流式處理,包擴了server 流、client 流以及雙向流的操作。 編輯build.rs 文件

use std::{env, path::PathBuf};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    tonic_build::compile_protos("proto/echo.proto")?;
    Ok(())
}


該文件用來通過 tonic-build 生成 grpc 的 rust 基礎代碼

完成上述工作後就可以構建 server 和 client 代碼了

stream_server.rs

pub mod pb {
    tonic::include_proto!("stream");
}

use anyhow::Result;
use futures_util::FutureExt;
use pb::{EchoRequest, EchoResponse};
use std::{
    error::Error,
    io::ErrorKind,
    net::{SocketAddr, ToSocketAddrs},
    pin::Pin,
    thread,
    time::Duration,
};
use tokio::{
    net::TcpListener,
    sync::{
        mpsc,
        oneshot::{self, Receiver, Sender},
        Mutex,
    },
    task::{self, JoinHandle},
};
use tokio_stream::{
    wrappers::{ReceiverStream, TcpListenerStream},
    Stream, StreamExt,
};
use tonic::{transport::Server, Request, Response, Status, Streaming};
type EchoResult<T> = Result<Response<T>, Status>;
type ResponseStream = Pin<Box<dyn Stream<Item = Result<EchoResponse, Status>> + Send>>;

fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
    let mut err: &(dyn Error + 'static) = err_status;

    loop {
        if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
            return Some(io_err);
        }

        // h2::Error do not expose std::io::Error with `source()`
        // https://github.com/hyperium/h2/pull/462
        if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
            if let Some(io_err) = h2_err.get_io() {
                return Some(io_err);
            }
        }

        err = match err.source() {
            Some(err) => err,
            None => return None,
        };
    }
}

#[derive(Debug)]
pub struct EchoServer {}

#[tonic::async_trait]
impl pb::echo_server::Echo for EchoServer {
    async fn unary_echo(&self, req: Request<EchoRequest>) -> EchoResult<EchoResponse> {
        let req_str = req.into_inner().message;

        let response = EchoResponse { message: req_str };
        Ok(Response::new(response))
    }

    type ServerStreamingEchoStream = ResponseStream;

    async fn server_streaming_echo(
        &self,
        req: Request<EchoRequest>,
    ) -> EchoResult<Self::ServerStreamingEchoStream> {
        println!("EchoServer::server_streaming_echo");
        println!("\tclient connected from: {:?}", req.remote_addr());

        // creating infinite stream with requested message
        let repeat = std::iter::repeat(EchoResponse {
            message: req.into_inner().message,
        });
        let mut stream = Box::pin(tokio_stream::iter(repeat).throttle(Duration::from_millis(200)));

        let (tx, rx) = mpsc::channel(128);
        tokio::spawn(async move {
            while let Some(item) = stream.next().await {
                match tx.send(Result::<_, Status>::Ok(item)).await {
                    Ok(_) => {
                        // item (server response) was queued to be send to client
                    }
                    Err(_item) => {
                        // output_stream was build from rx and both are dropped
                        break;
                    }
                }
            }
            println!("\tclient disconnected");
        });

        let output_stream = ReceiverStream::new(rx);
        Ok(Response::new(
            Box::pin(output_stream) as Self::ServerStreamingEchoStream
        ))
    }

    async fn client_streaming_echo(
        &self,
        _: Request<Streaming<EchoRequest>>,
    ) -> EchoResult<EchoResponse> {
        Err(Status::unimplemented("not implemented"))
    }

    type BidirectionalStreamingEchoStream = ResponseStream;

    async fn bidirectional_streaming_echo(
        &self,
        req: Request<Streaming<EchoRequest>>,
    ) -> EchoResult<Self::BidirectionalStreamingEchoStream> {
        println!("EchoServer::bidirectional_streaming_echo");

        let mut in_stream = req.into_inner();
        let (tx, rx) = mpsc::channel(128);

        tokio::spawn(async move {
            while let Some(result) = in_stream.next().await {
                match result {
                    Ok(v) => tx
                        .send(Ok(EchoResponse { message: v.message }))
                        .await
                        .expect("working rx"),
                    Err(err) => {
                        if let Some(io_err) = match_for_io_error(&err) {
                            if io_err.kind() == ErrorKind::BrokenPipe {
                                eprintln!("\tclient disconnected: broken pipe");
                                break;
                            }
                        }

                        match tx.send(Err(err)).await {
                            Ok(_) => (),
                            Err(_err) => break, // response was droped
                        }
                    }
                }
            }
            println!("\tstream ended");
        });

        // echo just write the same data that was received
        let out_stream = ReceiverStream::new(rx);

        Ok(Response::new(
            Box::pin(out_stream) as Self::BidirectionalStreamingEchoStream
        ))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 基礎server
    let server = EchoServer {};
    Server::builder()
        .add_service(pb::echo_server::EchoServer::new(server))
        .serve("0.0.0.0:50051".to_socket_addrs().unwrap().next().unwrap())
        .await
        .unwrap();
    Ok(())
}



server 端的代碼還是比較清晰的,首先通過 tonic::include_proto! 巨集引入grpc定義,參數是 proto 文件中定義的 package 。我們重點說說 server_streaming_echo function 。這個function 的處理流程明白了,其他的流式處理大同小異。首先 通過std::iter::repeat function 定義一個迭代器;然後構建 tokio_stream 在本示例中 每 200毫秒產生一個 repeat;最後構建一個 channel ,tx 用來發送從stream中獲取的內容太,rx 封裝到response 中返回。 最後 main 函數 拉起服務。

client 代碼如下

pub mod pb {
    tonic::include_proto!("stream");
}

use std::time::Duration;
use tokio_stream::{Stream, StreamExt};
use tonic::transport::Channel;

use pb::{echo_client::EchoClient, EchoRequest};

fn echo_requests_iter() -> impl Stream<Item = EchoRequest> {
    tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest {
        message: format!("msg {:02}", i),
    })
}

async fn unary_echo(client: &mut EchoClient<Channel>, num: usize) {
    for i in 0..num {
        let req = tonic::Request::new(EchoRequest {
            message: "msg".to_string() + &i.to_string(),
        });
        let resp = client.unary_echo(req).await.unwrap();
        println!("resp:{}", resp.into_inner().message);
    }
}

async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
    let stream = client
        .server_streaming_echo(EchoRequest {
            message: "foo".into(),
        })
        .await
        .unwrap()
        .into_inner();

    // stream is infinite - take just 5 elements and then disconnect
    let mut stream = stream.take(num);
    while let Some(item) = stream.next().await {
        println!("\treceived: {}", item.unwrap().message);
    }
    // stream is droped here and the disconnect info is send to server
}

async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
    let in_stream = echo_requests_iter().take(num);

    let response = client
        .bidirectional_streaming_echo(in_stream)
        .await
        .unwrap();

    let mut resp_stream = response.into_inner();

    while let Some(received) = resp_stream.next().await {
        let received = received.unwrap();
        println!("\treceived message: `{}`", received.message);
    }
}

async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) {
    let in_stream = echo_requests_iter().throttle(dur);

    let response = client
        .bidirectional_streaming_echo(in_stream)
        .await
        .unwrap();

    let mut resp_stream = response.into_inner();

    while let Some(received) = resp_stream.next().await {
        let received = received.unwrap();
        println!("\treceived message: `{}`", received.message);
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = EchoClient::connect("http://127.0.0.1:50051").await.unwrap();
    println!("Unary echo:");
    unary_echo(&mut client, 10).await;
    tokio::time::sleep(Duration::from_secs(1)).await;

    println!("Streaming echo:");
    streaming_echo(&mut client, 5).await;
    tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions

    // Echo stream that sends 17 requests then graceful end that connection
    println!("\r\nBidirectional stream echo:");
    bidirectional_streaming_echo(&mut client, 17).await;

    // Echo stream that sends up to `usize::MAX` requests. One request each 2s.
    // Exiting client with CTRL+C demonstrate how to distinguish broken pipe from
    // graceful client disconnection (above example) on the server side.
    println!("\r\nBidirectional stream echo (kill client with CTLR+C):");
    bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;

    Ok(())
}



測試一下,分別運行 server 和 client

cargo run --bin stream-server
cargo run --bin stream-client


在開發中,我們通常不會再 client 和 server都開發好的情況下才開始測試。通常在開發server 端的時候採用 grpcurl 工具進行測試工作

grpcurl -import-path ./proto -proto echo.proto list
grpcurl -import-path ./proto -proto  echo.proto describe stream.Echo
grpcurl -plaintext -import-path ./proto -proto  echo.proto -d '{"message":"1234"}' 127.0.0.1:50051 stream.Echo/UnaryEcho


此時,如果我們不指定 -import-path 參數,執行如下命令

grpcurl -plaintext 127.0.0.1:50051 list


會出現如下報錯信息

Failed to list services: server does not support the reflection API


讓服務端程式支持 reflection API

首先改造build.rs

use std::{env, path::PathBuf};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());
    tonic_build::configure()
        .file_descriptor_set_path(out_dir.join("stream_descriptor.bin"))
        .compile(&["proto/echo.proto"], &["proto"])
        .unwrap();
    Ok(())
}


file_descriptor_set_path 生成一個文件,其中包含為協議緩衝模塊編碼的 prost_types::FileDescriptorSet 文件。這是實現 gRPC 伺服器反射所必需的。

接下來改造一下 stream-server.rs,涉及兩處更改。

新增 STREAM_DESCRIPTOR_SET 常量

pub mod pb {
    tonic::include_proto!("stream");
    pub const STREAM_DESCRIPTOR_SET: &[u8] =
        tonic::include_file_descriptor_set!("stream_descriptor");
}


修改main函數

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 基礎server
    // let server = EchoServer {};
    // Server::builder()
    //     .add_service(pb::echo_server::EchoServer::new(server))
    //     .serve("0.0.0.0:50051".to_socket_addrs().unwrap().next().unwrap())
    //     .await
    //     .unwrap();

    // tonic_reflection 
    let service = tonic_reflection::server::Builder::configure()
        .register_encoded_file_descriptor_set(pb::STREAM_DESCRIPTOR_SET)
        .with_service_name("stream.Echo")
        .build()
        .unwrap();

    let addr = "0.0.0.0:50051".parse().unwrap();

    let server = EchoServer {};

    Server::builder()
        .add_service(service)
        .add_service(pb::echo_server::EchoServer::new(server))
        .serve(addr)
        .await?;
    Ok(())
}


register_encoded_file_descriptor_set 將包含編碼的 prost_types::FileDescriptorSet 的 byte slice 註冊到 gRPC Reflection 服務生成器註冊。

再次測試

grpcurl -plaintext 127.0.0.1:50051 list
grpcurl -plaintext 127.0.0.1:50051 describe stream.Echo


返回正確結果。

以上完整代碼地址

作者:京東科技 賈世聞

來源:京東雲開發者社區 轉載請註明來源


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 什麼是裝飾器,它們如何被使用,以及我們如何利用它們來構建代碼。我們將看到裝飾器是如何成為一個強大的工具,可以用來為我們的應用程式添加功能,並且可以在Python編程語言中找到。 裝飾器順序 在Python中,裝飾器是一個特殊的函數,可以修改另一個函數的行為。裝飾器是一種設計模式,它在不改變現有對象結 ...
  • dataclass 到 Python 中的 JSON JavaScript Object Notation或JSON表示使用編程語言中的文本組成的腳本(可執行)文件來存儲和傳輸數據。 Python通過JSON內置模塊支持JSON。因此,我們在Python腳本中導入JSON包,以利用這一能力。 JSO ...
  • 前言 很多時候,由於種種不可描述的原因,我們需要針對單個介面實現介面限流,防止訪問次數過於頻繁。這裡就用 redis+aop 實現一個限流介面註解 @RedisLimit 代碼 點擊查看RedisLimit註解代碼 import java.lang.annotation.*; /** * 功能:分佈 ...
  • 對重寫代碼說不。 以下為譯文: 1、重寫代碼消耗了12個月! 我們從頭開始重寫代碼浪費的時間。 你能想象在軟體行業,12個月的時間沒有任何新產品推出,沒有任何新版本更新嗎? 真的,我不由自主地問自己這個問題: 在這個快速發展的世界里,12月的時間能讓我們做多少事情? “2015年1月20日,星期二, ...
  • Nacos 2.x版本增加了GRPC服務介面和客戶端,極大的提升了Nacos的性能,本文將簡單介紹grpc-java的使用方式以及Nacos中集成GRPC的方式。 grpc-java GRPC是google開源的、以protobuf作為序列化方式、以http2作為通信協議的高性能rpc框架。 grp ...
  • 1.什麼是權重比例 權重比例計算即將各數值乘以相應的權數,然後加總求和得到總體值,再除以總的單位數。 如何計算 有一個對象集合為[A,B,C,D,E,F,G,H,I,J],其對象的全紅 總權重為10 每一個對象的權重為1/10=0.1 2.什麼是權重覆蓋區域 權重覆蓋區域是對象在整體權重範圍中的鎖分 ...
  • JDK21 計劃23年9月19日正式發佈,雖然一直以來都是“版本隨便出,換 8 算我輸”,但這麼多年這麼多版本的折騰,如果說之前的 LTS版本JDK17你還覺得不香,那 JDK21還是有必要關註一下,因為會有一批重要更新發佈到生產環境中,特別是千呼萬喚的虛擬線程,雖然說這東西我感覺不需要的用不到,需 ...
  • 前言 MongoDB是一個基於分散式文件存儲的開源資料庫系統,使用C++語言編寫。它是一個介於關係資料庫和非關係資料庫之間的產品,具有類似關係資料庫的功能,但又有一些非關係資料庫的特點。MongoDB的數據模型比較鬆散,採用類似json的bson格式,可以靈活地存儲各種類型的數據 MongoDB的優 ...
一周排行
    -Advertisement-
    Play Games
  • WPF本身不支持直接的3D繪圖,但是它提供了一些用於實現3D效果的高級技術。 如果你想要在WPF中進行3D繪圖,你可以使用兩種主要的方法: WPF 3D:這是一種在WPF應用程式中創建3D圖形的方式。WPF 3D提供了一些基本的3D形狀(如立方體、球體和錐體)以及一些用於控制3D場景和對象的工具(如 ...
  • 一、XML概述 XML(可擴展標記語言)是一種用於描述數據的標記語言,旨在提供一種通用的方式來傳輸和存儲數據,特別是Web應用程式中經常使用的數據。XML並不預定義標記。因此,XML更加靈活,並且可以適用於廣泛的應用領域。 XML文檔由元素(element)、屬性(attribute)和內容(con ...
  • 從今年(2023)三月份開始,Github開始強制用戶開啟兩步驗證2FA(雙因數)登錄驗證,毫無疑問,是出於安全層面的考慮,畢竟Github賬號一旦被盜,所有代碼倉庫都會毀於一旦,關於雙因數登錄的必要性請參見:別讓你的伺服器(vps)淪為肉雞(ssh暴力破解),密鑰驗證、雙向因數登錄值得擁有。 雙因 ...
  • 第一題 下列代碼輸入什麼? public class Test { public static Test t1 = new Test(); { System.out.println("blockA"); } static { System.out.println("blockB"); } publi ...
  • 本文主要涉及的問題:用ElementTree和XPath讀寫XML文件;解決ElementTree新增元素後再寫入格式不統一的問題;QTableWidget單元格設置控制項 ...
  • QStandardItemModel 類作為標準模型,主打“類型通用”,前一篇水文中,老周還沒提到樹形結構的列表,本篇咱們就好好探討一下這貨。 還是老辦法,咱們先做示例,然後再聊知識點。下麵這個例子,使用 QTreeView 組件來顯示數據,使用的列表模型比較簡單,只有一列。 #include <Q ...
  • 一、直充內充(充值方式) 直充: 包裝套餐直接充值到上游API系統。【PID/Smart】 (如:支付寶、微信 話費/流量/語音/簡訊 等 充值系統)。 內充(套餐打包常見物聯卡系統功能): 套餐包裝 適用於不同類型套餐 如 流量、簡訊、語音 等。 (目前已完善流量邏輯) 二、套餐與計費產品 計費產 ...
  • 在前面幾天中,我們學習了Dart基礎語法、可迭代集合,它們是Flutter應用研發的基本功。今天,我們繼續學習Flutter應用另一個必須掌握知識點:非同步編程(即Future和async/await)。它類似於Java中的FutureTask、JavaScript中的Promise。它是後續Flut... ...
  • 針對改動範圍大、影響面廣的需求,我通常會問上線了最壞情況是什麼?應急預案是什麼?你帶開關了嗎?。當然開關也是有成本的,接下來本篇跟大家一起交流下高頻發佈支撐下的功能開關技術理論與實踐結合的點點滴滴。 ...
  • 1.d3.shuffle D3.shuffle() 方法用於將數組中的元素隨機排序。它使用 Fisher–Yates 洗牌演算法,該演算法是無偏的,具有最佳的漸近性能(線性時間和常數記憶體)。 D3.shuffle() 方法的語法如下: d3.shuffle(array, [start, end]) 其中 ...