简介 gRPC 是 Google 开源的一个远程过程调用(Remote Procedure Call) 框架,在 gRPC 中,客户端应用程序可以直接调用不同机器上的服务器应用程序上的方法,就像它是本地对象一样,更容易创建分布式应用程序和服务。默认情况下,gRPC 使用 协议缓冲区 通信, 并支持以下语言:
协议缓冲区 (protocol buffers)
协议缓冲区提供了一种语言中立、平台中立、可扩展的机制,用于以向前兼容和向后兼容的方式序列化结构化数据。它类似于 JSON,只是它更小更快,并且生成本地语言绑定。 协议缓冲区是定义语言(在 .proto文件中创建)、proto 编译器生成的与数据接口的代码、特定于语言的运行时库以及写入文件(或通过网络连接)。
使用协议缓冲区的第一步是定义要在_proto 文件_中序列化的数据的结构:这是一个带有.proto扩展名的普通文本文件。协议缓冲区数据被构造为_消息_,其中每条消息都是一个小的信息逻辑记录,包含一系列称为_字段_的名称-值对。这是一个简单的例子:
1 2 3 4 5 6 7 8 9 10 11 syntax = "proto3" ; package helloworld;message HelloRequest { string name = 1 ; int64 id = 2 ; }
更多语法请看 https://developers.google.com/protocol-buffers/docs/proto3
生成 gRPC 代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 syntax = "proto3" ; option go_package = ".;helloworld" ;package helloworld;service Greeter { rpc SayHello (HelloRequest) returns (HelloReply) {} } message HelloRequest { string name = 1 ; int64 id = 2 ; } message HelloReply { string message = 1 ; }
一旦定义好服务,我们可以使用 protocol buffer 编译器 protoc 来生成创建应用所需的特定客户端和服务端的代码。 首先运行 brew install protobuf 安装 protoc https://github.com/protocolbuffers/protobuf/releases
1 2 > protoc --version libprotoc 3.19.4
其次安装 go 的插件
1 2 go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
默认下载到$GOPATH/bin下。 指定源目录(应用程序的源代码所在的位置——如果不提供值,则使用当前目录)、目标目录(希望生成的代码所在的位置;通常与$SRC_DIR相同) ,以及.protp文件.
1 protoc --go_out=helloworld --go-grpc_out=helloworld helloworld/helloworld.proto
生成两个文件:
helloworld.pb.go 文件 ,包含用于填充、序列化和检索请求和响应消息类型的所有协议缓冲区代码。
helloworld_grpc.pb.go 文件, 其中包含以下内容:
客户端使用Greeter服务中定义的方法调用的接口类型(或存根)。
服务器要实现的接口类型,也可以使用Greeter服务中定义的方法。
服务端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 package mainimport ( "context" "fmt" "google.golang.org/grpc" pb "grpcTest/helloworld" "log" "net" ) type server struct { pb.UnimplementedGreeterServer } func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error ) { log.Printf("Received: %v %d" , in.GetName(), in.GetId()) return &pb.HelloReply{Message: fmt.Sprintf("Hello %s %d\n" , in.GetName(), in.GetId())}, nil } func main () { lis, err := net.Listen("tcp" , ":8999" ) if err != nil { log.Fatalf("failed to listen: %v" , err) return } s := grpc.NewServer() pb.RegisterGreeterServer(s, &server{}) log.Printf("server listening at %v" , lis.Addr()) err = s.Serve(lis) if err != nil { fmt.Println(err) return } }
客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package mainimport ( "context" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" pb "grpcTest/helloworld" "log" "time" ) func main () { conn, err := grpc.Dial("0.0.0.0:8999" , grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf("did not connect: %v" , err) } defer conn.Close() c := pb.NewGreeterClient(conn) ctx, cancel := context.WithTimeout(context.Background(), 5 *time.Second) defer cancel() r, err := c.SayHello(ctx, &pb.HelloRequest{ Name: "yhy" , Id: 1 , }) if err != nil { log.Fatalf("could not greet: %v" , err) } log.Printf("Greeting: %s" , r.GetMessage()) }
service 方法的4 种请求和响应模式类型
一个_简单的 RPC_,其中客户端使用存根向服务器发送请求并等待响应返回,就像正常的函数调用一样。
前面的SayHello方法 就是一个简单RPC
1 rpc GetFeature(Point) returns (Feature) {}
服务器端流式 RPC ,客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。从例子中可以看出,通过在 响应 类型前插入stream关键字,可以指定一个服务器端的流方法。
1 2 3 4 5 rpc ListFeatures(Rectangle) returns (stream Feature) {}
一个 客户端流式 RPC , 客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦客户端完成写入消息,它等待服务器完成读取返回它的响应。通过在 请求 类型前指定 stream 关键字来指定一个客户端的流方法。
1 2 3 rpc RecordRoute(stream Point) returns (RouteSummary) {}
一个 双向流式 RPC 是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留。你可以通过在请求和响应前加 stream 关键字去制定方法的类型。
1 2 3 rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
身份验证 gRPC 底层是基于 HTTP/2 协议的,HTTP 本身不带任何加密传输功能,基于 SSL 的 HTTPS 协议才是加密传输。gRPC 使用了 HTTP/2 协议但是并未使用 HTTPS,即少了加密传输的部分。 对于加密传输的部分 gRPC 将它抽出来作为一个组件,可以由用户自由选择。gRPC 内默认提供了两种 内置的认证方式:
基于 CA 证书的 SSL/TLS 认证方式;
基于 Token 的认证方式。
gRPC 中的连接类型一共有以下 3 种:
insecure connection:不使用 TLS 加密;
server-side TLS:仅服务端 TLS 加密;
mutual TLS:客户端、服务端都使用 TLS 加密。
基于 TLS/SSL 认证 1.ca根证书生成 新建ca.conf,填入以下内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 [ req ] default_bits = 4096 distinguished_name = req_distinguished_name [ req_distinguished_name ] countryName = Country Name (2 letter code) countryName_default = CN stateOrProvinceName = State or Province Name (full name) stateOrProvinceName_default = JiangSu localityName = Locality Name (eg, city) localityName_default = NanJing organizationName = Organization Name (eg, company) organizationName_default = Step commonName = CommonName (e.g. server FQDN or YOUR name) commonName_max = 64 commonName_default = XXX(自定义)
生成ca.key:
1 openssl genrsa -out ca.key 4096
生成ca.csr:(直接回车,采用default默认配置值)
1 openssl req -new -sha256 -out ca.csr -key ca.key -config ca.conf
生成ca.crt:
1 openssl x509 -req -days 3650 -in ca.csr -signkey ca.key -out ca.crt
2.server证书生成 新建server.conf,填入以下内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 [ req ] default_bits = 2048 distinguished_name = req_distinguished_name [ req_distinguished_name ] countryName = Country Name (2 letter code) countryName_default = CN stateOrProvinceName = State or Province Name (full name) stateOrProvinceName_default = JiangSu localityName = Locality Name (eg, city) localityName_default = NanJing organizationName = Organization Name (eg, company) organizationName_default = Step commonName = CommonName (e.g. server FQDN or YOUR name) commonName_max = 64 commonName_default = XXX(自定义,客户端需要此字段做匹配) [ req_ext ] subjectAltName = @alt_names [alt_names] DNS.1 = XXX(自定义) IP = 127.0 .0 .1
生成server.key:
1 openssl genrsa -out server.key 2048
生成server.csr:(直接回车,采用default默认配置值)
1 openssl req -new -sha256 -out server.csr -key server.key -config server.conf
生成server.crt:
1 openssl x509 -req -days 3650 -CA ca.crt -CAkey ca.key -CAcreateserial -in server.csr -out server.pem -extensions req_ext -extfile server.conf
最终只需要 server.pem和 server.key两个文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 package mainimport ( "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" pb "grpcTest/helloworld" "log" "net" ) type server struct { pb.UnimplementedGreeterServer } func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error ) { log.Printf("Received: %v %d" , in.GetName(), in.GetId()) return &pb.HelloReply{Message: fmt.Sprintf("Hello %s %d\n" , in.GetName(), in.GetId())}, nil } func LoggingInterceptor (ctx context.Context, req interface {}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface {}, error ) { fmt.Printf("gRPC method: %s, %v" , info.FullMethod, req) resp, err := handler(ctx, req) fmt.Printf("gRPC method: %s, %v" , info.FullMethod, resp) return resp, err } func main () { lis, err := net.Listen("tcp" , ":8999" ) if err != nil { log.Fatalf("failed to listen: %v" , err) return } creds, err := credentials.NewServerTLSFromFile("/Users/yhy/go/workplace/grpcTest/ca/server.pem" , "/Users/yhy/go/workplace/grpcTest/ca/server.key" ) if err != nil { grpclog.Fatalf("Failed to generate credentials %v" , err) } s := grpc.NewServer(grpc.Creds(creds)) pb.RegisterGreeterServer(s, &server{}) log.Printf("server listening at %v" , lis.Addr()) err = s.Serve(lis) if err != nil { fmt.Println(err) return } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 package mainimport ( "context" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" pb "grpcTest/helloworld" "log" "time" ) func main () { creds, err := credentials.NewClientTLSFromFile("/Users/yhy/go/workplace/grpcTest/ca/server.pem" , "*.yhy.com" ) if err != nil { grpclog.Fatalf("Failed to generate credentials %v" , err) } conn, err := grpc.Dial("0.0.0.0:8999" , grpc.WithTransportCredentials(creds)) if err != nil { log.Fatalf("did not connect: %v" , err) } defer conn.Close() c := pb.NewGreeterClient(conn) ctx, cancel := context.WithTimeout(context.Background(), 5 *time.Second) defer cancel() r, err := c.SayHello(ctx, &pb.HelloRequest{ Name: "yhy" , Id: 1 , }) if err != nil { log.Fatalf("could not greet: %v" , err) } log.Printf("Greeting: %s" , r.GetMessage()) }
这样就可以对客户端和服务端之间交互的所有数据进行加密。
Token认证 客户端发请求时,添加Token到上下文context.Context中,服务器接收到请求,先从上下文中获取Token验证,验证通过才进行下一步处理。 gRPC 中默认定义了 PerRPCCredentials,是提供用于自定义认证的接口,它的作用是将所需的安全认证信息添加到每个RPC方法的上下文中。其包含 2 个方法:
GetRequestMetadata:获取当前请求认证所需的元数据
RequireTransportSecurity:是否需要基于 TLS 认证进行安全传输1 2 3 4 type PerRPCCredentials interface { GetRequestMetadata(ctx context.Context, uri ...string ) (map [string ]string , error ) RequireTransportSecurity() bool }
使用 token 认证必须实现这两个方法。 这里以 JWT作为 Token 认证为例,新建jwt.go文件1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 package tokenimport ( "context" "fmt" "github.com/dgrijalva/jwt-go" "time" "google.golang.org/grpc/metadata" ) const JwtSecret = "yhy" func GenerateToken (userName string ) (string , error ) { claims := jwt.MapClaims{ "iss" : "test" , "aud" : "test" , "nbf" : time.Now().Unix(), "exp" : time.Now().Add(24 * time.Hour).Unix(), "sub" : "user" , "username" : userName, } tokenClaims := jwt.NewWithClaims(jwt.SigningMethodHS256, claims) token, err := tokenClaims.SignedString([]byte (JwtSecret)) return token, err } type AuthToken struct { Token string } func (c AuthToken) GetRequestMetadata(ctx context.Context, uri ...string ) (map [string ]string , error ) { return map [string ]string { "authorization" : c.Token, }, nil } func (c AuthToken) RequireTransportSecurity() bool { return false } type Claims struct { jwt.StandardClaims Username string `json:"username"` } func getTokenFromContext (ctx context.Context) (string , error ) { md, ok := metadata.FromIncomingContext(ctx) if !ok { return "" , fmt.Errorf("Err NoMetadata In Context" ) } token, ok := md["authorization" ] if !ok || len (token) == 0 { return "" , fmt.Errorf("Err NoAuthorization In Metadata" ) } return token[0 ], nil } func CheckAuth (ctx context.Context) (username string , err error ) { tokenStr, err := getTokenFromContext(ctx) if err != nil { panic ("get token from context error" ) } var clientClaims Claims token, err := jwt.ParseWithClaims(tokenStr, &clientClaims, func (token *jwt.Token) (interface {}, error ) { if token.Header["alg" ] != "HS256" { panic ("ErrInvalidAlgorithm" ) } return []byte (JwtSecret), nil }) if err != nil { return "" , err } if !token.Valid { return "" , err } return clientClaims.Username, nil }
helloworld.proto增加以下代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 service Greeter { rpc SayHello (HelloRequest) returns (HelloReply) {} rpc Login (LoginRequest) returns (LoginResp) {} } message LoginRequest{ string username = 1 ; string password = 2 ; } message LoginResp{ string status = 1 ; string token = 2 ; }
执行 protoc --go_out=helloworld --go-grpc_out=helloworld helloworld/helloworld.proto重新生成对应的文件server.go 实现 Login方法
1 2 3 4 5 6 7 8 9 10 func (s *server) Login(ctx context.Context, request *pb.LoginRequest) (resp *pb.LoginResp, err error ) { if request.Username == "yhy" && request.Password == "123456" { jwtToken, err := token.GenerateToken(request.Username) if err != nil { return nil , err } return &pb.LoginResp{Status: "200" , Token: jwtToken}, nil } return &pb.LoginResp{Status: "401" , Token: "" }, nil }
然后服务端代码中,每个服务的方法都需要添加CheckAuth(ctx)来验证Token,这样十分麻烦。这里使用gRPC拦截器,能够很好地解决这个问题。gRPC拦截器功能类似中间件,拦截器收到请求后,先进行一些操作,然后才进入服务的代码处理。 服务端完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 package mainimport ( "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" pb "grpcTest/helloworld" "grpcTest/token" "log" "net" ) type server struct { pb.UnimplementedGreeterServer } func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error ) { log.Printf("Received: %v %d" , in.GetName(), in.GetId()) return &pb.HelloReply{Message: fmt.Sprintf("Hello %s %d\n" , in.GetName(), in.GetId())}, nil } func (s *server) Login(ctx context.Context, request *pb.LoginRequest) (resp *pb.LoginResp, err error ) { if request.Username == "yhy" && request.Password == "123456" { jwtToken, err := token.GenerateToken(request.Username) if err != nil { return nil , err } return &pb.LoginResp{Status: "200" , Token: jwtToken}, nil } return &pb.LoginResp{Status: "401" , Token: "" }, nil } func Interceptor (ctx context.Context, req interface {}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface {}, err error ) { fmt.Printf("gRPC method1: %s, %v\n" , info.FullMethod, req) if info.FullMethod != "/helloworld.Greeter/Login" { username, err := token.CheckAuth(ctx) if err != nil { log.Printf("err : %v\n" , err) return resp, err } log.Printf("用户 %s 登录\n" , username) } resp, err = handler(ctx, req) fmt.Printf("gRPC method2: %s, %v\n" , info.FullMethod, resp) return resp, err } func main () { lis, err := net.Listen("tcp" , ":8999" ) if err != nil { log.Fatalf("failed to listen: %v" , err) return } creds, err := credentials.NewServerTLSFromFile("/Users/yhy/go/workplace/grpcTest/ca/server.pem" , "/Users/yhy/go/workplace/grpcTest/ca/server.key" ) if err != nil { grpclog.Fatalf("Failed to generate credentials %v" , err) } s := grpc.NewServer(grpc.Creds(creds), grpc.UnaryInterceptor(Interceptor)) pb.RegisterGreeterServer(s, &server{}) log.Printf("server listening at %v" , lis.Addr()) err = s.Serve(lis) if err != nil { fmt.Println(err) return } }
客户端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 package mainimport ( "context" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" pb "grpcTest/helloworld" "grpcTest/token" "log" "time" ) func main () { creds, err := credentials.NewClientTLSFromFile("/Users/yhy/go/workplace/grpcTest/ca/server.pem" , "*.kuafu.com" ) if err != nil { grpclog.Fatalf("Failed to generate credentials %v" , err) } conn, err := grpc.Dial("0.0.0.0:8999" , grpc.WithTransportCredentials(creds)) if err != nil { log.Fatalf("did not connect: %v" , err) } defer conn.Close() c := pb.NewGreeterClient(conn) login, err := c.Login(context.Background(), &pb.LoginRequest{Username: "yhy" , Password: "123456" }) if err != nil { log.Fatalf("did not connect: %v" , err) } requestToken := new (token.AuthToken) requestToken.Token = login.Token conn, err = grpc.Dial(":8999" , grpc.WithTransportCredentials(creds), grpc.WithPerRPCCredentials(requestToken)) if err != nil { log.Fatalf("faild to connect: %v" , err) } defer conn.Close() c = pb.NewGreeterClient(conn) ctx, cancel := context.WithTimeout(context.Background(), 5 *time.Second) defer cancel() r, err := c.SayHello(ctx, &pb.HelloRequest{ Name: "yhy" , Id: 1 , }) if err != nil { log.Fatalf("could not greet: %v" , err) } log.Printf("Greeting: %s" , r.GetMessage()) }
双向流 普通RPC 就像正常调用方法一样,主要就是流式的使用,这里以双向流为例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 syntax = "proto3" ; option go_package = "./historyPb" ;service History {rpc User(stream UserHistory) returns (stream UserHistory) {}} message UserHistory {string msg = 1 ;}
服务端实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (h *History) User(stream historyPb.History_UserServer) error { for { res, err := stream.Recv() if err == io.EOF { return nil } if err != nil { return err } err = stream.Send(&historyPb.UserHistory{Msg: res.Msg}) if err != nil { return err } } }
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func main () { ... 之前的一样 historyStream := historyPb.NewHistoryClient(conn) streaming(historyStream) } func streaming (client historyPb.HistoryClient) error { stream, err := client.User(context.Background()) if err != nil { logging.Logger.Errorf("stream err %v" , err) } for n := 0 ; n < 10 ; n++ { err := stream.Send(&historyPb.UserHistory{Msg: strconv.Itoa(n)}) if err != nil { return err } fmt.Println("Streaming Send: " , n) res, err := stream.Recv() if err == io.EOF { break } if err != nil { return err } fmt.Println("Streaming Recv: " , res.Msg) } stream.CloseSend() return nil }
流式拦截器 流拦截器过程和一元拦截器有所不同,同样可以分为 3 个阶段:
预处理(pre-processing)
调用 RPC 方法(invoking RPC method)
后处理(post-processing)
预处理阶段的拦截只是在流式请求第一次 发起的时候进行拦截,后面的流式请求不会再进入到处理逻辑。 后面两种情况对应着 Streamer api 提供的两个扩展方法来进行,分别是 SendMsg 和 RecvMsg 方法。 正常情况下实现一个流式拦截器与普通拦截器一样,实现这个已经定义好的拦截器方法即可:
1 2 3 func StreamInterceptor (srv interface {}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {}
如果是想在发消息之前和之进行处理, 则实现 SendMsg 和 RecvMsg
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 type wrappedStream struct { grpc.ServerStream } func newWrappedStream (s grpc.ServerStream) grpc.ServerStream { return &wrappedStream{s} } func (w *wrappedStream) RecvMsg(m interface {}) error { fmt.Printf("Receive a message (Type: %T) at %s" , m, time.Now().Format(time.RFC3339)) return w.ServerStream.RecvMsg(m) } func (w *wrappedStream) SendMsg(m interface {}) error { fmt.Printf("Send a message (Type: %T) at %v" , m, time.Now().Format(time.RFC3339)) return w.ServerStream.SendMsg(m) }
使用流式拦截器服务端修改, 下面就是同时使用流式和普通拦截器
1 2 s := grpc.NewServer(grpc.Creds(creds),grpc.UnaryInterceptor(service.Commonnterceptor), grpc.StreamInterceptor(service.StreamInterceptor))
流式拦截器认证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func StreamInterceptor (srv interface {}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { logging.Logger.Infof("gRPC stream method1: %s " , info.FullMethod) username, err := middleware.CheckAuth(ss.Context()) if err != nil { logging.Logger.Warnf("err : %v\n" , err) return err } err = handler(srv, newWrappedStream(ss)) if err != nil { logging.Logger.Warnf("err11 : %v\n" , err) return err } logging.Logger.Infof("gRPC stream method2: %s " , info.FullMethod) logging.Logger.Infof("用户 %s " , username) return err }
发布订阅模式
服务端的信息更改时,通知客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 syntax="proto3" ; package pb;option go_package="./pb" ;message Msg {string value=1 ;} service PubsubService {rpc Publish (Msg) returns (Msg) ;rpc Subscribe (Msg) returns (stream Msg) ;}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 package serviceimport ( "context" "github.com/ZhuriLab/KuaFuServer/grpc/pb/pb" "github.com/docker/docker/pkg/pubsub" "strings" "time" ) type PubsubService struct { pub *pubsub.Publisher pb.UnimplementedPubsubServiceServer } func NewPubsubService () *PubsubService { return &PubsubService{ pub: pubsub.NewPublisher(100 *time.Millisecond, 10 ), } } func (p *PubsubService) Publish(ctx context.Context, arg *pb.Msg) (*pb.Msg, error ) { p.pub.Publish(arg.GetValue()) return &pb.Msg{}, nil } func (p *PubsubService) Subscribe(arg *pb.Msg, stream pb.PubsubService_SubscribeServer) error { ch := p.pub.SubscribeTopic(func (v interface {}) bool { if key, ok := v.(string ); ok { if strings.HasPrefix(key, arg.GetValue()) { return true } } return false }) for v := range ch { if err := stream.Send(&pb.Msg{Value: v.(string )}); err != nil { return err } } return nil }
服务端
1 pb.RegisterPubsubServiceServer(s, service.NewPubsubService())
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 func main () { .... go Subscribe(conn) client := pb.NewPubsubServiceClient(conn) _, err := client.Publish(context.Background(), &pb.Msg{Value: "golang: hello Go" }) if err != nil { log.Fatal(err) } } func Subscribe (conn *grpc.ClientConn) { client := pb.NewPubsubServiceClient(conn) stream, err := client.Subscribe(context.Background(), &pb.Msg{Value: "golang: " }) if err != nil { log.Fatal("123 " , err) } for { reply, err := stream.Recv() if err != nil { if err == io.EOF { break } fmt.Println("=======1111=========" ) log.Fatal("22 " , err) } fmt.Println("================" ) fmt.Println(reply.GetValue()) pkg.UserMsgContent += reply.GetValue() } }