目錄結構 . ├── protos │ └── hello_world.proto ├── dists │ ├── __init__.py │ ├── hello_world_pb2.py │ └── hello_world_pb2_grpc.py ├── server.py ├── client. ...
目錄結構
. ├── protos │ └── hello_world.proto ├── dists │ ├── __init__.py │ ├── hello_world_pb2.py │ └── hello_world_pb2_grpc.py ├── server.py ├── client.py ├── token.py ├── grpc_ssl_key.pem ├── grpc_ssl_cert.pem └── grpc_ssl_server.pem
構建protobuf
# protos/hello_world.proto syntax = "proto3"; package test; message User { string name = 1; } message Result { string reply = 1; } service Test { rpc Hello(User) returns (Result) {} } # 通過命令構建 python -m grpc_tools.protoc \ -I protos \ --python_out=dists \ --grpc_python_out=dists \ protos/hello_world.proto
SSL/TLS加密
創建密鑰
1 // 創建grpc_ssl_key.pem和grpc_ssl_cert.pem 2 // 其中<domain>務必事先指定, 後續需要用到 3 openssl req -subj "/CN=black-ip.yazx.com" -x509 -newkey rsa:4096 -days 7200 \ 4 -keyout grpc-ssl-key.pem \ 5 -out grpc-ssl-cert.pem 6 // 創建grpc_ssl_server.pem 7 openssl rsa -in grpc-ssl-key.pem -out grpc-ssl-server.pem
服務端載入密鑰
# server.py from grpc.experimental import aio from typing import AnyStr from dist import hello_world_pb2 from dist import hello_world_pb2_grpc # 實現具體的grpc函數 class TestServicer(hello_world_pb2_grpc.TestServicer): async def Hello(self, request, context) -> hello_world_pb2.Result: return hello_world_pb2.Result(reply=f"hello {request.name}") # 讀取密鑰二進位數據 def read_file(path: str, binary: bool) -> AnyStr: with open(path, 'rb' if binary else 'r') as f: return f.read() # 創建grpc服務端證書 def create_ssl_server_credentials() -> aio.grpc.ChannelCredentials: return aio.grpc.ssl_server_credentials( private_key_certificate_chain_pairs=( ( read_file('grpc_ssl_server.pem', True), read_file('grpc_ssl_cert.pem', True), ), ) ) # 運行grpc服務 async def run(host: str, port: int) -> None: server = aio.server() server_credetials = create_ssl_server_credentials() server.add_secure_port(f'[::]:{port}', server_credetials) hello_world_pb2_grpc.add_TestServicer_to_server(TestServicer(), server) await server.start() await server.wait_for_termination()
客戶端載入密鑰
# client.py from grpc.experimental import aio from typing import AnyStr from dist import hello_world_pb2 from dist import hello_world_pb2_grpc # 創建grpc客戶端證書 def create_ssl_channel_credentials() -> aio.grpc.ChannelCredentials: return aio.grpc.ssl_channel_credentials( root_certificates=read_file('grpc_ssl_cert.pem') ) # 運行grpc客戶端 async def run(host: str, port: int) -> None: options = (('grpc.ssl_target_name_override', '<domain>'),) kwargs = { 'target': f"{host}:{port}", 'options': ( ("grpc.lb_policy_name", "round_robin"), # 自動根據dns功能變數名稱解析服務列表 *options ) } channel_credentials = create_ssl_channel_credentials() creds = aio.grpc.composite_channel_credentials(channel_credentials) channel = aio.secure_channel(**kwargs, credentials=creds) await channel.channel_ready() stub = hello_world_pb2_grpc.TestStub(channel) result = await stub.Hello(hello_world_pb2.User(name='world'))
Token鑒權
grpc強制Token鑒權必須使用SSL/TLS加密
實現Token校驗
# token.py from typing import Callable, List, Any from grpc.experimental import aio class BearerToken(object): code: aio.grpc.StatusCode = aio.grpc.StatusCode.UNAUTHENTICATED details: str = 'bad bearer token' def __init__(self, token: str) -> None: self.token = token def __call__(self, func) -> Callable: async def wrapper(inner_self, request, context: aio.ServicerContext) -> Any: metadata = context.invocation_metadata() for item in metadata: if item[0] == 'authorization' and item[1] == f'Bearer {self.token}': return await func(inner_self, request, context) await context.abort( code=self.code, details=self.details, trailing_metadata=metadata ) return wrapper
服務端校驗Token
# server.py from grpc.experimental import aio from typing import AnyStr from dist import hello_world_pb2 from dist import hello_world_pb2_grpc from token import BearerToken # 實現具體的grpc函數 class TestServicer(hello_world_pb2_grpc.TestServicer): @BearerToken(token='xxx') async def Hello(self, request, context) -> hello_world_pb2.Result: return hello_world_pb2.Result(reply=f"hello {request.name}")
客戶端註入Token
# client.py from grpc.experimental import aio token = 'xxx' # 創建Token認證 def create_access_token_credentials(cls) -> aio.grpc.CallCredentials: return aio.grpc.access_token_call_credentials(token) # 運行grpc客戶端 async def run(host: str, port: int) -> None: options = (('grpc.ssl_target_name_override', '<domain>'),) kwargs = { 'target': f"{host}:{port}", 'options': ( ("grpc.lb_policy_name", "round_robin"), # 自動根據dns功能變數名稱解析服務列表 *options ) } channel_credentials = create_ssl_channel_credentials() token_credentials = self.create_access_token_credentials() creds = aio.grpc.composite_channel_credentials( channel_credentials, token_credentials ) channel = aio.secure_channel(**kwargs, credentials=creds) await channel.channel_ready() stub = hello_world_pb2_grpc.TestStub(channel) result = await stub.Hello(hello_world_pb2.User(name='world'))