java版gRPC实战之三:服务端流

程序员欣宸 2021-09-15 08:59:28
java 实战 gRPC 服务端 之三


欢迎访问我的GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;

《java版gRPC实战》全系列链接

  1. 用proto生成代码
  2. 服务发布和调用
  3. 服务端流
  4. 客户端流
  5. 双向流
  6. 客户端动态获取服务端地址
  7. 基于eureka的注册发现

关于gRPC定义的四种类型

本文是《java版gRPC实战》系列的第三篇,前文咱们实战体验了简单的RPC请求和响应,那种简单的请求响应方式其实只是gRPC定义的四种类型之一,这里给出《gRPC 官方文档中文版》对这四种gRPC类型的描述:

  1. 简单 RPC:客户端使用存根(stub)发送请求到服务器并等待响应返回,就像平常的函数调用一样;
  2. 服务器端流式 RPC:客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息;(即本篇内容)
  3. 客户端流式 RPC:客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦 客户端完成写入消息,它等待服务器完成读取返回它的响应;
  4. 双向流式 RPC:是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器 可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替 的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留;

本篇概览

本篇是服务端流类型的gRPC服务实战,包括以下内容:

  1. 开发一个gRPC服务,类型是服务端流;
  2. 开发一个客户端,调用前面发布的gRPC服务;
  3. 验证;
  • 不多说了,开始上代码;

源码下载

名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
  • 这个git项目中有多个文件夹,《java版gRPC实战》系列的源码在grpc-tutorials文件夹下,如下图红框所示:

在这里插入图片描述

  • grpc-tutorials文件夹下有多个目录,本篇文章对应的服务端代码在server-stream-server-side目录下,客户端代码在server-stream-client-side目录下,如下图:

在这里插入图片描述

开发一个gRPC服务,类型是服务端流

  • 首先要开发的是gRPC服务端,一共要做下图所示的七件事:

在这里插入图片描述

  • 打开grpc-lib模块,在src/main/proto目录下新增文件mall.proto,里面定一个了一个gRPC方法ListOrders及其入参和返回对象,内容如下,要注意的是返回值要用关键字stream修饰,表示该接口类型是服务端流:
syntax = "proto3";
option java_multiple_files = true;
// 生成java代码的package
option java_package = "com.bolingcavalry.grpctutorials.lib";
// 类名
option java_outer_classname = "MallProto";
// gRPC服务,这是个在线商城的订单查询服务
service OrderQuery {
// 服务端流式:订单列表接口,入参是买家信息,返回订单列表(用stream修饰返回值)
rpc ListOrders (Buyer) returns (stream Order) {}
}
// 买家ID
message Buyer {
int32 buyerId = 1;
}
// 返回结果的数据结构
message Order {
// 订单ID
int32 orderId = 1;
// 商品ID
int32 productId = 2;
// 交易时间
int64 orderTime = 3;
// 买家备注
string buyerRemark = 4;
}
  • 双击下图红框位置的generateProto,即可根据proto生成java代码:

在这里插入图片描述

  • 新生成的java代码如下图红框:

在这里插入图片描述

  • 在父工程grpc-turtorials下面新建名为server-stream-server-side的模块,其build.gradle内容如下:
// 使用springboot插件
plugins {
id 'org.springframework.boot'
}
dependencies {
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter'
// 作为gRPC服务提供方,需要用到此库
implementation 'net.devh:grpc-server-spring-boot-starter'
// 依赖自动生成源码的工程
implementation project(':grpc-lib')
}
  • 新建配置文件application.yml:
spring:
application:
name: server-stream-server-side
# gRPC有关的配置,这里只需要配置服务端口号
grpc:
server:
port: 9899
  • 启动类:
package com.bolingcavalry.grpctutorials;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ServerStreamServerSideApplication {
public static void main(String[] args) {
SpringApplication.run(ServerStreamServerSideApplication.class, args);
}
}
  • 接下来是最关键的gRPC服务,代码如下,可见responseObserver.onNext方法被多次调用,用以向客户端持续输出数据,最后通过responseObserver.onCompleted结束输出:
package com.bolingcavalry.grpctutorials;
import com.bolingcavalry.grpctutorials.lib.Buyer;
import com.bolingcavalry.grpctutorials.lib.Order;
import com.bolingcavalry.grpctutorials.lib.OrderQueryGrpc;
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;
import java.util.ArrayList;
import java.util.List;
@GrpcService
public class GrpcServerService extends OrderQueryGrpc.OrderQueryImplBase {
/**
* mock一批数据
* @return
*/
private static List<Order> mockOrders(){
List<Order> list = new ArrayList<>();
Order.Builder builder = Order.newBuilder();
for (int i = 0; i < 10; i++) {
list.add(builder
.setOrderId(i)
.setProductId(1000+i)
.setOrderTime(System.currentTimeMillis()/1000)
.setBuyerRemark(("remark-" + i))
.build());
}
return list;
}
@Override
public void listOrders(Buyer request, StreamObserver<Order> responseObserver) {
// 持续输出到client
for (Order order : mockOrders()) {
responseObserver.onNext(order);
}
// 结束输出
responseObserver.onCompleted();
}
}
  • 至此,服务端开发完成,咱们再开发一个springboot应用作为客户端,看看如何远程调用listOrders接口,得到responseObserver.onNext方法输出的数据;

开发一个客户端,调用前面发布的gRPC服务

  • 客户端模块的基本功能是提供一个web接口,其内部会调用服务端的listOrders接口,将得到的数据返回给前端,如下图:

在这里插入图片描述

  • 在父工程grpc-turtorials下面新建名为server-stream-client-side的模块,其build.gradle内容如下:
plugins {
id 'org.springframework.boot'
}
dependencies {
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'net.devh:grpc-client-spring-boot-starter'
implementation project(':grpc-lib')
}
  • 应用配置信息application.yml内容如下,可见是端口和gRPC服务端地址的配置:
server:
port: 8081
spring:
application:
name: server-stream-client-side
grpc:
client:
# gRPC配置的名字,GrpcClient注解会用到
server-stream-server-side:
# gRPC服务端地址
address: 'static://127.0.0.1:9899'
enableKeepAlive: true
keepAliveWithoutCalls: true
negotiationType: plaintext
  • 服务端的listOrders接口返回的Order对象里面有很多gRPC相关的内容,不适合作为web接口的返回值,因此定义一个DispOrder类作为web接口返回值:
package com.bolingcavalry.grpctutorials;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
@Data
@AllArgsConstructor
public class DispOrder {
private int orderId;
private int productId;
private String orderTime;
private String buyerRemark;
}
  • 平淡无奇的启动类:
package com.bolingcavalry.grpctutorials;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ServerStreamClientSideApplication {
public static void main(String[] args) {
SpringApplication.run(ServerStreamClientSideApplication.class, args);
}
}
  • 重点来了,GrpcClientService.java,里面展示了如何远程调用gRPC服务的listOrders接口,可见对于服务端流类型的接口,客户端这边通过stub调用会得到Iterator类型的返回值,接下来要做的就是遍历Iterator:
package com.bolingcavalry.grpctutorials;
import com.bolingcavalry.grpctutorials.lib.Buyer;
import com.bolingcavalry.grpctutorials.lib.Order;
import com.bolingcavalry.grpctutorials.lib.OrderQueryGrpc;
import io.grpc.StatusRuntimeException;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@Service
@Slf4j
public class GrpcClientService {
@GrpcClient("server-stream-server-side")
private OrderQueryGrpc.OrderQueryBlockingStub orderQueryBlockingStub;
public List<DispOrder> listOrders(final String name) {
// gRPC的请求参数
Buyer buyer = Buyer.newBuilder().setBuyerId(101).build();
// gRPC的响应
Iterator<Order> orderIterator;
// 当前方法的返回值
List<DispOrder> orders = new ArrayList<>();
// 通过stub发起远程gRPC请求
try {
orderIterator = orderQueryBlockingStub.listOrders(buyer);
} catch (final StatusRuntimeException e) {
log.error("error grpc invoke", e);
return new ArrayList<>();
}
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
log.info("start put order to list");
while (orderIterator.hasNext()) {
Order order = orderIterator.next();
orders.add(new DispOrder(order.getOrderId(),
order.getProductId(),
// 使用DateTimeFormatter将时间戳转为字符串
dtf.format(LocalDateTime.ofEpochSecond(order.getOrderTime(), 0, ZoneOffset.of("+8"))),
order.getBuyerRemark()));
log.info("");
}
log.info("end put order to list");
return orders;
}
}
  • 最后做一个controller类,对外提供一个web接口,里面会调用GrpcClientService的方法:
package com.bolingcavalry.grpctutorials;
import com.bolingcavalry.grpctutorials.lib.Order;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
public class GrpcClientController {
@Autowired
private GrpcClientService grpcClientService;
@RequestMapping("/")
public List<DispOrder> printMessage(@RequestParam(defaultValue = "will") String name) {
return grpcClientService.listOrders(name);
}
}
  • 至此,编码完成,开始验证

验证

  1. 启动server-stream-server-side,启动成功后会监听9989端口:

在这里插入图片描述

  1. 启动server-stream-client-side,再在浏览器访问:http://localhost:8081/?name=Tom ,得到结果如下(firefox自动格式化json数据),可见成功地获取了gRPC的远程数据:

在这里插入图片描述

至此,服务端流类型的gRPC接口的开发和使用实战就完成了,接下来的章节还会继续学习另外两种类型;

你不孤单,欣宸原创一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 数据库+中间件系列
  6. DevOps系列

欢迎关注公众号:程序员欣宸

微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...
https://github.com/zq2599/blog_demos

版权声明
本文为[程序员欣宸]所创,转载请带上原文链接,感谢
https://www.cnblogs.com/bolingcavalry/p/15270672.html

  1. La dernière réponse à l'entrevue de développement Android, l'hiver froid de l'industrie
  2. A young Lexus, the new NX refuses to be mediocre and mature
  3. Interprétation approfondie de l'équipe sin7y: application de plookup dans la conception de zkevm
  4. Java basic knowledge point Combing, redis Common Data Structures and Using scenario Analysis,
  5. Five minutes to understand MySQL index push down
  6. Data structure and algorithm (XI) -- algorithm recursion
  7. Programmation asynchrone Java scirp, développement frontal de base
  8. Java basic knowledge point video, three sides ant Gold Clothing successfully obtained offer,
  9. Oracle Linux bascule le noyau uek vers le noyau rhck pour résoudre les problèmes de compatibilité acfs
  10. After the grand finale of spring in jade mansion, after reading many comments, I began to sympathize with white deer
  11. 字节跳动Java高级工程师,统一命名服务、集群管理、分布式应用?
  12. 字节跳动Java高级工程师,深入分布式缓存从原理到实践技术分享,
  13. 字节跳动第三轮技术面,阿里P8架构师Java大厂面试题总结,
  14. 字节跳动社招Java面试,超通俗解析CountDownLatch用法和源码,
  15. 字节跳动最新开源,最经典的HashMap图文详解,
  16. 字節跳動第三輪技術面,阿裏P8架構師Java大廠面試題總結,
  17. Byte Jumping the Third Third Technical surface, Ali P8 Architect Java Factory Interview Question summary,
  18. L'ingénieur Java senior de Byte Hopping approfondit la mise en cache distribuée, du principe au partage de la technologie pratique.
  19. Byte Jump Java Senior Engineer, Unified Naming service, Cluster Management, Distributed application?
  20. Plusieurs méthodes de transfert de fichiers entre Windows et Linux
  21. 快速从 Windows 切换到 Linux 环境
  22. 五分钟向MySql数据库插入一千万条数据
  23. Java日期时间API系列42-----一种高效的中文日期格式化和解析方法
  24. 用Java实现红黑树
  25. 使用Redis Stream来做消息队列和在Asp.Net Core中的实现
  26. 海量列式非关系数据库HBase 架构,shell与API
  27. Architecture, Shell et API de base de données non relationnelle à grande échelle
  28. Mise en œuvre de l'arbre Rouge et noir en Java
  29. Java Date Time API Series 42 - - a efficient Chinese Date Format and Analysis Method
  30. 5 minutes pour insérer 10 millions de données dans la base de données MySQL
  31. Passage rapide de Windows à l'environnement Linux
  32. Notes on Java backend development of PostgreSQL (I)
  33. 海量列式非關系數據庫HBase 架構,shell與API
  34. Byte Jump the latest open source, the most Classic hashtap Graph details,
  35. L'interview Java de Byte Hopping Society, l'analyse super populaire de l'utilisation et du code source de countdownlatch,
  36. "Anti Mafia storm" Wang Zhifei's love history is really wonderful: he divorced Zhang Xinyi and married a 14-year-old wife
  37. In spring in the jade mansion, Jia Fengyuan was not moved by his brother's death. Why was su Yingxue changed? The reason is realistic
  38. Adam Oracle Oracle fully constructs Adam token incentive for ecological development
  39. 实战SpringCloud通用请求字段拦截处理,超过500人面试阿里,
  40. 宅家36天咸鱼翻身入职腾讯,Zookeeper一致性级别分析,
  41. The first starcoin & move hacksong source code analysis - P (a)
  42. Zhaijia 36 days Salt Fish turn into Tencent, Zookeeper Consistency level analysis,
  43. Traitement de l'interception des champs de demande communs de Spring Cloud, plus de 500 personnes interviewent Ali,
  44. About JavaScript modules
  45. Object oriented programming (2)
  46. Java日期时间API系列42-----一种高效的中文日期格式化和解析方法
  47. Java日期時間API系列42-----一種高效的中文日期格式化和解析方法
  48. 宅家36天鹹魚翻身入職騰訊,Zookeeper一致性級別分析,
  49. Java Date Time API Series 42 - - a efficient Chinese Date Format and Analysis Method
  50. 已成功拿下字节、腾讯、脉脉offer,7年老Java一次操蛋的面试经历,
  51. 小米Java社招面试,每次面试必问的二叉树的设计与编码,
  52. 小米Java校招面试,阿里、百度、美团、携程、蚂蚁面经分享,
  53. 小米Java校招面試,阿裏、百度、美團、攜程、螞蟻面經分享,
  54. Xiaomi Java School Recruitment interview, Ali, baidu, meituan, ctrip, ant Facebook Sharing,
  55. La conception et le codage de l'arbre binaire requis pour chaque entrevue d'embauche de la société Java millet;
  56. A remporté avec succès Byte, Tencent, Pulse offer, 7 ans Java une expérience d'entrevue de baise,
  57. 干货来袭,Java岗面试12家大厂成功跳槽,
  58. 常用Java框架面试题目,现在做Java开发有前途吗?
  59. 常用Java框架面試題目,現在做Java開發有前途嗎?
  60. Les questions d'entrevue couramment utilisées pour le cadre Java sont - elles prometteuses pour le développement Java?