原文在[這裡](https://grpc.io/docs/languages/go/basics/)。 本教程為Go程式員提供了使用gRPC的基本介紹。 通過跟隨本示例,你將學會如何: - 在.proto文件中定義一個服務。 - 使用協議緩衝編譯器生成伺服器和客戶端代碼。 - 使用Go gRPC A ...
原文在這裡。
本教程為Go程式員提供了使用gRPC的基本介紹。
通過跟隨本示例,你將學會如何:
- 在.proto文件中定義一個服務。
- 使用協議緩衝編譯器生成伺服器和客戶端代碼。
- 使用Go gRPC API編寫一個簡單的服務端和客戶端。
本教程假設你已經閱讀了gRPC入門並熟悉協議緩衝(Protocol Buffers)。請註意,本教程中的示例使用了proto3版本的協議緩衝語言。你可以在proto3語言指南和Go生成的代碼指南中瞭解更多信息。
為什麼使用gRPC?
本示例是一個簡單的路線映射應用程式,允許客戶端獲取有關其路線上的特點信息,創建其路線的摘要,並與伺服器和其他客戶端交換路線信息,如交通更新。
通過gRPC,我們可以在.proto文件中定義我們的服務,併在gRPC支持的任何語言中生成客戶端和伺服器。這些代碼可以運行在從大型數據中心內的伺服器到你自己的平板電腦等各種環境中,gRPC會為你處理不同語言和環境之間的通信複雜性。我們還可以獲得與協議緩衝一起工作的所有優勢,包括高效的序列化、簡單的IDL和易於更新的介面。
設置
在開始之前,你應該已經安裝了生成客戶端和伺服器介面代碼所需的工具。如果還沒有安裝,請參考快速入門指南的先決條件部分進行安裝設置。
獲取示例代碼
示例代碼位於grpc-go倉庫中。
你可以下載該倉庫的zip文件並解壓,或者通過克隆倉庫來獲取示例代碼:
$ git clone -b v1.56.2 --depth 1 https://github.com/grpc/grpc-go
然後進入示例代碼的目錄:
$ cd grpc-go/examples/route_guide
定義服務
作為第一步,我們需要使用protocol buffers來定義gRPC服務以及方法請求和響應類型。完整的.proto
文件可以在routeguide/route_guide.proto中找到。
在.proto文件中,要定義一個服務,你需要在其中指定一個命名的服務:
service RouteGuide {
...
}
然後在服務定義內部定義rpc
方法,並指定它們的請求和響應類型。gRPC允許你定義四種類型的服務方法,其中在RouteGuide服務中都會使用到:
- 一個簡單的RPC,客戶端使用存根(stub)向伺服器發送請求,並等待響應返回,就像普通的函數調用一樣。
// Obtains the feature at a given position.
rpc GetFeature(Point) returns (Feature) {}
- 一個服務端流式RPC,在這種RPC中,客戶端發送請求給伺服器,並獲得一個流以讀取一系列的響應消息。客戶端從返回的流中讀取,直到沒有更多的消息為止。在我們的例子中,你可以通過在響應類型之前使用stream關鍵字來指定一個服務端流式方法。
// Obtains the Features available within the given Rectangle. Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
- 客戶端流式RPC,客戶端編寫一系列消息並通過提供的流發送到伺服器。一旦客戶端完成寫入消息,它會等待伺服器讀取所有消息並返迴響應。你可以通過在請求類型之前放置
stream
關鍵字來指定客戶端流式方法。
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
- 雙向流式RPC,雙方使用讀寫流發送一系列消息。兩個流操作獨立,因此客戶端和伺服器可以按任意順序讀取和寫入:例如,伺服器可以在寫入其響應之前等待接收所有客戶端消息,或者可以交替讀取消息然後寫入消息,或者進行一些其他讀取和寫入的組合。每個流中消息的順序保持不變。你可以通過在請求類型和響應類型之前都放置stream關鍵字來指定這種類型的方法。
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
我們的.proto
文件還包含了用於所有服務方法中的請求和響應類型的協議緩衝區消息類型定義 - 例如,這裡是Point消息類型的定義:
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
在上面的代碼中,我們定義了一個名為Point的消息類型,它包含兩個欄位:latitude和longitude,分別對應整數類型的欄位標識為1和2。這個消息類型可以用來表示地理位置的緯度和經度信息。
生成客戶端和伺服器代碼
接下來,我們需要從.proto
服務定義生成gRPC客戶端和伺服器介面。我們使用protoc
以及gRPC Go插件來完成這個任務。
在examples/route_guide
目錄中,運行以下命令:
$ protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
routeguide/route_guide.proto
運行這個命令會在routeguide目錄下生成以下文件:
route_guide.pb.go
:包含所有協議緩衝區代碼,用於填充、序列化和檢索請求和響應消息類型。route_guide_grpc.pb.go
:包含以下內容:- 一個介面類型(或存根),供客戶端調用,其中定義了
RouteGuide
服務中的方法。 - 一個介面類型,供伺服器實現,也包含
RouteGuide
服務中定義的方法。
- 一個介面類型(或存根),供客戶端調用,其中定義了
創建服務
首先,讓我們看一下如何創建一個RouteGuide伺服器。如果你只關心創建gRPC客戶端,可以跳過本節,直接查看創建客戶端部分(不過你可能還是會對此感興趣!)。
使我們的RouteGuide服務發揮作用有兩個部分:
- 實現從服務定義生成的服務介面:執行我們服務的實際"工作"。
- 運行gRPC伺服器以偵聽來自客戶端的請求,並將它們分派到正確的服務實現。
你可以在server/server.go
文件中找到我們的示例RouteGuide伺服器。讓我們來仔細看看它是如何工作的。
實現RouteGuide
正如你所見,我們的伺服器有一個routeGuideServer
結構類型,它實現了生成的RouteGuideServer
介面:
type routeGuideServer struct {
...
}
...
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
...
}
...
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
...
}
...
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
...
}
...
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
...
}
...
簡單RPC
routeGuideServer
實現了我們的所有服務方法。首先,讓我們看一下最簡單的方法GetFeature
。它只從客戶端獲取一個Point
,然後返回其資料庫中相應特征的信息。
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
return feature, nil
}
}
// 如果未找到特征,則返回一個未命名特征
return &pb.Feature{Location: point}, nil
}
該方法接收一個RPC的上下文對象和客戶端的Point
協議緩衝區請求。它返回一個包含響應信息的Feature
協議緩衝區對象和一個錯誤。在方法中,我們將Feature
填充為適當的信息,然後將其與空錯誤一起返回,以告訴gRPC我們已經完成了對RPC的處理,並且Feature
可以返回給客戶端。
伺服器端流式RPC
現在,讓我們來看一個流式RPC的例子。ListFeatures
是一個伺服器端流式RPC,因此我們需要向客戶端發送多個Feature
。
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
for _, feature := range s.savedFeatures {
if inRange(feature.Location, rect) {
if err := stream.Send(feature); err != nil {
return err
}
}
}
return nil
}
如你所見,與在方法參數中獲取簡單的請求和響應對象不同,這次我們獲取了一個請求對象(客戶端要查找的Rectangle
中的Feature
)和一個特殊的RouteGuide_ListFeaturesServer
對象,用於編寫我們的響應。
在該方法中,我們填充了需要返回的多個Feature
對象,並使用RouteGuide_ListFeaturesServer
的Send()
方法將它們寫入其中。最後,就像在我們的簡單RPC中一樣,我們返回一個空錯誤,以告訴gRPC我們已經完成了寫入響應。如果在此調用中發生任何錯誤,則我們返回一個非空錯誤;gRPC層將將其轉換為適當的RPC狀態發送到網路。
客戶端端流式RPC
接下來,讓我們看一些更複雜的東西:客戶端端流式方法RecordRoute
。在這裡,我們從客戶端獲取一個Point
流,並返回有關他們的行程的單個RouteSummary
信息。如你所見,這次該方法根本沒有請求參數。相反,它獲取了一個RouteGuide_RecordRouteServer
流,伺服器可以使用該流來讀取和寫入消息。
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
var pointCount, featureCount, distance int32
var lastPoint *pb.Point
startTime := time.Now()
for {
point, err := stream.Recv()
if err == io.EOF {
endTime := time.Now()
return stream.SendAndClose(&pb.RouteSummary{
PointCount: pointCount,
FeatureCount: featureCount,
Distance: distance,
ElapsedTime: int32(endTime.Sub(startTime).Seconds()),
})
}
if err != nil {
return err
}
pointCount++
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
featureCount++
}
}
if lastPoint != nil {
distance += calcDistance(lastPoint, point)
}
lastPoint = point
}
}
在方法體中,我們使用RouteGuide_RecordRouteServer
的Recv()
方法重覆讀取客戶端的請求到一個請求對象(在本例中是Point
),直到沒有更多的消息為止:伺服器需要在每次調用後檢查Recv()
返回的錯誤。如果返回的錯誤為nil
,則表示流仍然有效,可以繼續讀取;如果為io.EOF
,則表示消息流已結束,伺服器可以返回其RouteSummary
。如果返回的錯誤是其他值,則我們將其“原樣”返回,以便由gRPC層將其轉換為RPC狀態。
雙向流式RPC
最後,讓我們看一下我們的雙向流式傳輸RPC RouteChat()。
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
key := serialize(in.Location)
... // 尋找要發送給客戶端的註釋
for _, note := range s.routeNotes[key] {
if err := stream.Send(note); err != nil {
return err
}
}
}
}
這次我們得到了一個 RouteGuide_RouteChatServer
流,就像我們的客戶端流式傳輸示例中一樣,可以用來讀寫消息。然而,這次我們通過方法的流式傳輸返回值,而客戶端仍然在向其消息流寫入消息。
在這裡,讀寫的語法與客戶端流式傳輸方法非常相似,不同之處在於伺服器使用流的 Send()
方法而不是 SendAndClose()
方法,因為它正在寫入多個響應。儘管每一方始終按照它們被寫入的順序獲得另一方的消息,但客戶端和伺服器都可以按任意順序讀寫 - 這些流完全獨立運行。
啟動伺服器
一旦我們實現了所有的方法,我們還需要啟動一個gRPC伺服器,這樣客戶端才能真正使用我們的服務。以下代碼片段顯示了我們如何為我們的RouteGuide
服務執行此操作:
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
var opts []grpc.ServerOption
...
grpcServer := grpc.NewServer(opts...)
pb.RegisterRouteGuideServer(grpcServer, newServer())
grpcServer.Serve(lis)
要構建和啟動伺服器,我們需要:
- 使用
net.Listen(...)
指定要用於監聽客戶端請求的埠。 - 使用
grpc.NewServer(...)
創建一個gRPC伺服器實例。 - 使用
pb.RegisterRouteGuideServer(grpcServer, newServer())
將我們的服務實現註冊到gRPC伺服器。 - 調用
Serve()
在伺服器上使用我們的埠詳情進行阻塞等待,直到進程被終止或調用Stop()
。
創建客戶端
在本節中,我們將看一下如何為我們的RouteGuide服務創建一個Go客戶端。你可以在grpc-go/examples/route_guide/client/client.go中看到我們完整的示例客戶端代碼。
創建存根
要調用服務方法,我們首先需要創建一個gRPC通道以與伺服器通信。我們通過將伺服器地址和埠號傳遞給grpc.Dial()
來創建這個通道:
var opts []grpc.DialOption
...
conn, err := grpc.Dial(*serverAddr, opts...)
if err != nil {
...
}
defer conn.Close()
可以在grpc.Dial
中使用DialOptions
來設置認證憑據(例如TLS、GCE憑據或JWT憑據),當服務需要時。RouteGuide
服務不需要任何憑證。
一旦設置了gRPC通道,我們需要一個客戶端存根來執行RPC。我們通過pb
包生成的NewRouteGuideClient
方法獲得它。
client := pb.NewRouteGuideClient(conn)
調用服務方法
現在讓我們看一下如何調用我們的服務方法。請註意,在gRPC-Go中,RPC以阻塞/同步模式運行,這意味著RPC調用會等待伺服器響應,並且會返迴響應或錯誤。
簡單RPC
調用簡單的RPC GetFeature幾乎與調用本地方法一樣簡單。
feature, err := client.GetFeature(context.Background(), &pb.Point{409146138, -746188906})
if err != nil {
...
}
正如你所看到的,我們在之前獲得的存根上調用方法。在方法參數中,我們創建並填充一個請求協議緩衝區對象(在我們的例子中為Point)。我們還傳遞了一個context.Context對象,它允許我們在必要時更改我們的RPC的行為,例如超時/取消正在進行的RPC。如果調用沒有返回錯誤,則可以從第一個返回值中讀取伺服器的響應信息。
log.Println(feature)
伺服器端流式傳輸RPC
這是我們調用伺服器端流式方法ListFeatures的地方,它返回地理Feature的流。如果你已經閱讀了創建伺服器的部分,這可能看起來很熟悉 - 流式RPC在雙方都實現了類似的方式。
rect := &pb.Rectangle{ ... } // 初始化pb.Rectangle
stream, err := client.ListFeatures(context.Background(), rect)
if err != nil {
...
}
for {
feature, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
}
log.Println(feature)
}
與簡單的RPC一樣,我們向方法傳遞一個上下文和一個請求。然而,不同於獲取響應對象,這次我們得到了一個RouteGuide_ListFeaturesClient
的實例。客戶端可以使用RouteGuide_ListFeaturesClient
流來讀取伺服器的響應。
我們使用RouteGuide_ListFeaturesClient
的Recv()
方法來重覆地將伺服器的響應讀入到響應協議緩衝區對象(在這種情況下為Feature)中,直到沒有更多的消息為止:客戶端在每次調用後都需要檢查從Recv()
返回的錯誤err。如果為nil,則流仍然有效,可以繼續讀取; 如果是io.EOF
,則消息
流已結束; 否則必須有一個RPC錯誤,該錯誤通過err傳遞。
客戶端流式傳輸RPC
客戶端流式傳輸方法RecordRoute與伺服器端方法類似,除了我們只傳遞上下文給方法,並返回一個RouteGuide_RecordRouteClient流,我們可以使用它來同時寫入和讀取消息。
// 創建隨機數量的隨機點
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2 // 遍歷至少兩個點
var points []*pb.Point
for i := 0; i < pointCount; i++ {
points = append(points, randomPoint(r))
}
log.Printf("Traversing %d points.", len(points))
stream, err := client.RecordRoute(context.Background())
if err != nil {
log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
}
for _, point := range points {
if err := stream.Send(point); err != nil {
log.Fatalf("%v.Send(%v) = %v", stream, point, err)
}
}
reply, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
log.Printf("Route summary: %v", reply)
RouteGuide_RecordRouteClient
具有一個Send()
方法,我們可以使用它來向伺服器發送請求。一旦我們使用Send()
將客戶端的請求寫入流中,我們需要在流上調用CloseAndRecv()
來讓gRPC知道我們已經完成了寫入,並且正在等待接收一個響應。我們從從CloseAndRecv()
返回的err中獲得我們的RPC狀態。如果狀態是nil,則CloseAndRecv()
的第一個返回值將是一個有效的伺服器響應。
雙向流式傳輸RPC
最後,讓我們來看一下我們的雙向流式傳輸RPC RouteChat()。與RecordRoute的情況類似,我們只傳遞一個上下文對象給方法,並返回一個流,我們可以使用它來同時寫入和讀取消息。但是,這次我們通過方法的流式傳輸返回值,而伺服器在向其消息流寫入消息時。
stream, err := client.RouteChat(context.Background())
waitc := make(chan struct{})
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
if err != nil {
log.Fatalf("Failed to receive a note : %v", err)
}
log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
}
}()
for _, note := range notes {
if err := stream.Send(note); err != nil {
log.Fatalf("Failed to send a note: %v", err)
}
}
stream.CloseSend()
<-waitc
在這裡,讀寫的語法與客戶端流式傳輸方法非常相似,不同之處在於我們在完成調用後使用流的CloseSend()
方法。儘管每一方始終按照它們被寫入的順序獲得另一方的消息,但客戶端和伺服器都可以按任意順序讀寫 - 這些流完全獨立運行。
嘗試一下!
從examples/route_guide目錄中執行以下命令:
運行伺服器:
$ go run server/server.go
從另一個終端運行客戶端:
$ go run client/client.go
你將看到類似於以下內容的輸出:
Getting feature for point (409146138, -746188906)
name:"Berkshire Valley Management Area Trail, Jefferson, NJ, USA" location:<latitude:409146138 longitude:-746188906 >
Getting feature for point (0, 0)
location:<>
Looking for features within lo:<latitude:400000000 longitude:-750000000 > hi:<latitude:420000000 longitude:-730000000 >
name:"Patriots Path, Mendham, NJ 07945, USA" location:<latitude:407838351 longitude:-746143763 >
...
name:"3 Hasta Way, Newton, NJ 07860, USA" location:<latitude:410248224 longitude:-747127767 >
Traversing 56 points.
Route summary: point_count:56 distance:497013163
Got message First message at point(0, 1)
Got message Second message at point(0, 2)
Got message Third message at point(0, 3)
Got message First message at point(0, 1)
Got message Fourth message at point(0, 1)
Got message Second message at point(0, 2)
Got message Fifth message at point(0, 2)
Got message Third message at point(0, 3)
Got message Sixth message at point(0, 3)
聲明:本作品採用署名-非商業性使用-相同方式共用 4.0 國際 (CC BY-NC-SA 4.0)進行許可,使用時請註明出處。
Author: mengbin
blog: mengbin
Github: mengbin92
cnblogs: 戀水無意