大量数据推送kafka,进行数据上报

osc_i1f0lp6q 2021-01-21 07:12:17
Spring Boot kafka


需求背景

最近做的一个需求,需要将数据(数据分为产品基本数据和产品使用数据,分处同一个连接下的两个数据库)推送上报后,运营部门进行分析,简单翻译过来就是需要从数据库中把历史数据和每日数据捞出来,推到kafka上,其中一些的细节想在本文记录分享一下。

分析

1、数据分类

在背景中交代过数据分为产品基本数据和产品使用数据(一下通称为基本数据和使用数据),很多同学(包括我)的第一反应是必然是要创建两个独立的数据实体类,基本数据类和使用数据类。但是有经验的同学肯定会想到,既然都属于产品的数据类,两类数据在从数据库中捞出数据后,在后续的数据处理逻辑和代码中必然会存在重复,两种处理区分只是数据类型罢了。所以,更合适的做法是将两种数据中共有的属性抽象出来,放在一个父类中,然后两个实体类各自管理自身的特有属性字段。

代码也就很快就可以写出来了

1、父类

@Data
public class ProductBase {
private String messageId; // 消息ID
private String platform; // 平台or系统类型
private String messageType; // 消息类型
private String sendTime; // 发送到kafka时间
private String productId; // ID
private String productName; // 名称
private String productVersion; // 版本号
private String releaseId; // release id
private String createDate; // 入库时间
private String updateDate; // 更新时间
}

2、两种数据信息实体类

基本数据实体类

@Data
public class ProductInfo extends ProductBase {
private String identifier; // 唯一标识
private String releaseDate; // 发布时间
private String releaseRanking; // 对应版本排序
private ProductStatus status; // 状态
private String description; // 描述
private String publisher; // 发行组织或个人
private String publisherName; // 发行组织或个人名称
private String publishBy; // 发布者
}

这里说明一下,ProductStatus是我定义的一个枚举类,因为数据库中status字段存储的是int型枚举,为了信息的易读性,将它转换成了对应的状态描述,这个稍后会提到。

使用信息实体类

@Data
public class ProductUsedInfo extends ProductBase {
private PlatformEnum platformName; // 平台枚举值
private String userId; // 用户域账号
private String ipAddress; // 机器ip地址
}

2、数据源

如果数据源单一,springboot使用默认的数据源配置。现在两种数据存储在两个数据库中,在同一个工程中,要访问两个数据库,必然对两个数据源进行配置。

首先需要在启动类中去掉springboot的默认数据源配置,在@SpringbootApplication注解中加上(exclude = DataSourceAutoConfiguration.class)

@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class Application {
/**
* 服务主入口
*
* @param args 服务初始化参数
*/
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

第二步自然需要对两个数据源进行配置

数据源1配置(具体的配置项在配置文件中配置即可)

@Configuration
@MapperScan(basePackages = DataSource1Config.PACKAGE, sqlSessionFactoryRef = "sqlSessionFactory")
public class DataSource1Config {
static final String PACKAGE = "com.xxxx.mapper.marketplace";
static final String MAPPER_LOCATION = "classpath:mapper/xxx/*.xml";
@Value("${datasource1.url}")
private String url;
@Value("${datasource1.username}")
private String userName;
@Value("${datasource1.password}")
private String password;
@Value("${datasource1.driverClassName}")
private String driverClass;
/**
* 获取数据源信息
*
* @return 数据源信息
*/
@Bean(name = "DataSource1")
@Primary
public DataSource dataSource() {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setDriverClassName(driverClass);
dataSource.setUrl(url);
dataSource.setUsername(userName);
dataSource.setPassword(password);
return dataSource;
}
/**
* 获取数据源事务管理器
*
* @return 事务管理器
*/
@Bean(name = "TransactionManager")
@Primary
public DataSourceTransactionManager transactionManager() {
return new DataSourceTransactionManager(dataSource());
}
/**
* 获取sqlsessionFactory
*
* @param dataSource
* @return sqlsessionFactory
* @throws Exception
*/
@Bean(name = "sqlSessionFactory")
@Primary
public SqlSessionFactory sessionFactory(@Qualifier("dataSource") DataSource dataSource)
throws Exception {
final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
sessionFactory.setDataSource(dataSource);
sessionFactory.setMapperLocations(new PathMatchingResourcePatternResolver()
.getResources(DataSource1Config.MAPPER_LOCATION));
return sessionFactory.getObject();
}
}

产品使用数据,同理

@Configuration
@MapperScan(basePackages = DataSourceConfig2.PACKAGE, sqlSessionFactoryRef = "sqlSessionFactory")
public class DataSourceConfig2 {
static final String PACKAGE = "com.xxx.mapper.xx";
static final String MAPPER_LOCATION = "classpath:mapper/xxx/*.xml";
@Value("${datasource2.url}")
private String url;
@Value("${datasource2.username}")
private String userName;
@Value("${datasource2.password}")
private String password;
@Value("${datasource2.driverClassName}")
private String driverClass;
/**
* 获取数据源信息
*
* @return 数据源信息
*/
@Bean(name = "dataSource2")
public DataSource dataSource() {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setDriverClassName(driverClass);
dataSource.setUrl(url);
dataSource.setUsername(userName);
dataSource.setPassword(password);
return dataSource;
}
/**
* 获取数据源事务管理器
*
* @return 事务管理器
*/
@Bean(name = "datamineTransactionManager")
public DataSourceTransactionManager datamineTransactionManager() {
return new DataSourceTransactionManager(dataSource());
}
/**
* 获取sqlsessionFactory
*
* @param datamineDataSource
* @return sqlsessionFactory
* @throws Exception
*/
@Bean(name = "datamineSqlSessionFactory")
public SqlSessionFactory sessionFactory(@Qualifier("dataSource2") DataSource dataSource)
throws Exception {
final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
sessionFactory.setDataSource(dataSource);
sessionFactory.setMapperLocations(new PathMatchingResourcePatternResolver().
getResources(DataSourceConfig2.MAPPER_LOCATION));
return sessionFactory.getObject();
}
}

3、数据的读取与处理

数据从数据库中读取的操作我就不说了,每个人都有自己读取方式和方法,不过需要注意的是如果数据量很大,在数据库中可以采用设置索引等方式来提高查询效率,在代码中可以采用limit offset的方式进行读取,以免出现宕机等问题。这里我主要想说说当从数据库中获取到了批量数据后数据的处理问题。

批量获取的产品数据,对方提了一个很奇葩的要求,要求把数据一条一条地推到kafka上(我觉得这种方式很蠢)。这种要求下如果批量从数据库中获取数据,比如说存储到一个list中,通过for循环去遍历,然后每条数据加上当前的时间戳,然后再推到kafka。这里面就存在两个问题

1、数据量很大的问题:一般来说,历史数据的体量都会很大,如果全量取出来肯定会有性能问题,这个问题,很多人都可以想到用分批获取的方法来解决。

2、数据的发送问题:将大量数据分批次读取后,每一次取得数据列表如何处理,单线程的for循环遍历打上时间戳后再发送效率肯定比较低,这个时候我们可以考虑使用线程池起多线程的方式来解决这个问题。

解决方法:针对数据量很大的问题,可以分几个步骤去考虑将数据分段获取。

首先,历史数据的获取可以根据接口设定的日期起始时间和截止时间来分批获取,如果数据量实在巨大,尽量将时间差设置得小一些。

其次,在设置时间段后,接口首先需要计算出此次查询的数据量count,配置一个可用的线程池,线程池中配置核心线程池的线程数n,那么每次从数据库中取得数据可以再次分段,可以分为count / n(或者count / n + 1),最后在循环中使用limit i*pageSize,pageSize获取即可。

最后,将分段的数据的遍历,打时间戳还有推送kafka的逻辑放在线程中执行。

具体的代码如下,仅供参考:

线程池的配置:

/**
* 线程池配置
*/
@Configuration
@EnableAsync
public class ExecutorConfig {
@Value("${threadpool.core.threadNum}")
private int coreThreadNum;
@Value("${threadpool.max.threadNum}")
private int maxThreadNum;
@Value("${threadpool.queue.capacity}")
private int queueCapacity;
/**
* 线程池配置
*
* @return 线程池示例
*/
@Bean(name = "serviceExecutor")
public Executor serviceExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(coreThreadNum);
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
threadPoolTaskExecutor.setMaxPoolSize(maxThreadNum);
threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
threadPoolTaskExecutor.setThreadNamePrefix("executor-service-");
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}

controller:

@PostMapping("/product_info/history")
@ApiOperation(value = "上报历史产品的基本数据")
public void reportHistoryProInfoData(@RequestParam String startTime, @RequestParam String endTime)
throws Exception {
if (!DateValidUtil.isValidDate(startTime) || !DateValidUtil.isValidDate(endTime)) {
throw new Exception("The date format must be yyyy-MM-dd HH:mm:ss");
}
prodDataService.reportHistoryProductData(startTime, endTime);
}

service方法:

 @Value("${spring.kafka.template.default-topic}")
private String kafkaTopic;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private ProductInfoDataMapper productInfoDataMapper;
@Autowired
private ProductInstallDataMapper productInstallDataMapper;
@Autowired
@Qualifier(value = "serviceExecutor")
private Executor executor;
@Value("${threadpool.core.threadNum}")
private int coreThreadNum;
public void reportHistoryPruductData(String startTime, String endTime) throws InterruptedException {
LOG.info("Report historical product information data");
int count = pluginInfoDataMapper.getHistoryProductDataCount(startTime, endTime);
int pageSize = getPageSize(count);
CountDownLatch countDownLatch = new CountDownLatch(coreThreadNum);
for (int page = 0; page < coreThreadNum; page++) {
int start = page * pageSize;
List<ProductInfo> list = productDataMapper.getHistoryPruData(start, pageSize, startTime, endTime);
ProductDataHandler.handlerProductDataList(list, kafkaTopic, kafkaTemplate, executor, countDownLatch);
}
countDownLatch.await();
}

其中handlerProductDataList方法如下:

public class ProductDataHandler<T extends ProductBase> implements Runnable {
private String kafkaTopic;
private List<T> list;
private KafkaTemplate<String, String> kafkaTemplate;
private CountDownLatch countDownLatch;
public ProductDataHandler(List<T> list, String kafkaTopic, KafkaTemplate kafkaTemplate,CountDownLatch countDownLatch) {
this.list = list;
this.kafkaTopic = kafkaTopic;
this.kafkaTemplate = kafkaTemplate;
this.countDownLatch = countDownLatch;
}
public static void handlerProductDataList(List datalist, String kafkaTopic, KafkaTemplate template,Executor executor, CountDownLatch countDownLatch) throws InterruptedException {
ProductDataHandler task = new ProductDataHandler(datalist, kafkaTopic, template, countDownLatch);
executor.execute(task);
}
@Override
public void run() {
for (PluginBase base : list) {
Date date = new Date();
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String sendTime = format.format(date);
base.setSendTime(sendTime);
String info = JSON.toJSONString(base);
// 发送至kafka
kafkaTemplate.send(kafkaTopic, info);
}
countDownLatch.countDown();
}
}

结语

主要的一些细节就介绍到这了,由于任务比较紧,代码中肯定还存在很多可以优化的地方,如果各位同学有高见,欢迎在评论区指点一二。
 

版权声明
本文为[osc_i1f0lp6q]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4282691/blog/4917041

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云