A large amount of data is pushed to Kafka for data reporting

osc_ i1f0lp6q 2021-01-21 07:13:23
large data pushed kafka data


Demand background

A recent need to do , Data need to be ( Data is divided into product basic data and product usage data , Two databases under the same connection ) After push Report , The operations department conducts the analysis , Simply translated, we need to extract historical data and daily data from the database , push to kafka On , I would like to share some details in this article .

analysis

1、 data classification

The data explained in the background is divided into product basic data and product usage data ( It is called basic data and usage data ), Many students ( Including me ) The first reaction is to create two independent data entity classes , Basic data class and usage data class . But experienced students will surely think of , Since they all belong to the product data class , After two kinds of data are fished out from the database , In the subsequent data processing logic and code, there must be duplication , The difference between the two kinds of processing is only data type . therefore , It is more appropriate to abstract the common attributes of the two kinds of data , In a parent class , Then the two entity classes manage their own unique property fields .

The code will be written soon

1、 Parent class

@Data
public class ProductBase {
private String messageId; // news ID
private String platform; // platform or System type 
private String messageType; // Message type 
private String sendTime; // Send to kafka Time 
private String productId; // ID
private String productName; // name 
private String productVersion; // Version number 
private String releaseId; // release id
private String createDate; // Storage time 
private String updateDate; // Update time 
}

2、 Two data information entity classes

Basic data entity class

@Data
public class ProductInfo extends ProductBase {
private String identifier; // Unique identification 
private String releaseDate; // Release time 
private String releaseRanking; // Sort by version 
private ProductStatus status; // state 
private String description; // describe 
private String publisher; // Issuing organization or individual 
private String publisherName; // The name of the issuing organization or individual 
private String publishBy; // Publisher 
}

Here's an explanation ,ProductStatus It's an enumeration class that I define , Because in the database status The field stores int Type enumeration , For the readability of information , It is transformed into the corresponding state description , This will be mentioned later .

Using information entity classes

@Data
public class ProductUsedInfo extends ProductBase {
private PlatformEnum platformName; // Platform enumeration values 
private String userId; // User domain account 
private String ipAddress; // machine ip Address 
}

2、 data source

If the data source is single ,springboot Use the default data source configuration . Now two kinds of data are stored in two databases , In the same project , To access two databases , Two data sources must be configured .

First, you need to remove... From the startup class springboot Default data source configuration for , stay @SpringbootApplication Add... To the note (exclude = DataSourceAutoConfiguration.class)

@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class Application {
/**
* Service main entrance
*
* @param args Service initialization parameters
*/
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

The second step is to configure the two data sources

data source 1 To configure ( The specific configuration items can be configured in the configuration file )

@Configuration
@MapperScan(basePackages = DataSource1Config.PACKAGE, sqlSessionFactoryRef = "sqlSessionFactory")
public class DataSource1Config {
static final String PACKAGE = "com.xxxx.mapper.marketplace";
static final String MAPPER_LOCATION = "classpath:mapper/xxx/*.xml";
@Value("${datasource1.url}")
private String url;
@Value("${datasource1.username}")
private String userName;
@Value("${datasource1.password}")
private String password;
@Value("${datasource1.driverClassName}")
private String driverClass;
/**
* Get data source information
*
* @return Data source information
*/
@Bean(name = "DataSource1")
@Primary
public DataSource dataSource() {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setDriverClassName(driverClass);
dataSource.setUrl(url);
dataSource.setUsername(userName);
dataSource.setPassword(password);
return dataSource;
}
/**
* Get the data source transaction manager
*
* @return Transaction manager
*/
@Bean(name = "TransactionManager")
@Primary
public DataSourceTransactionManager transactionManager() {
return new DataSourceTransactionManager(dataSource());
}
/**
* obtain sqlsessionFactory
*
* @param dataSource
* @return sqlsessionFactory
* @throws Exception
*/
@Bean(name = "sqlSessionFactory")
@Primary
public SqlSessionFactory sessionFactory(@Qualifier("dataSource") DataSource dataSource)
throws Exception {
final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
sessionFactory.setDataSource(dataSource);
sessionFactory.setMapperLocations(new PathMatchingResourcePatternResolver()
.getResources(DataSource1Config.MAPPER_LOCATION));
return sessionFactory.getObject();
}
}

Product usage data , Empathy

@Configuration
@MapperScan(basePackages = DataSourceConfig2.PACKAGE, sqlSessionFactoryRef = "sqlSessionFactory")
public class DataSourceConfig2 {
static final String PACKAGE = "com.xxx.mapper.xx";
static final String MAPPER_LOCATION = "classpath:mapper/xxx/*.xml";
@Value("${datasource2.url}")
private String url;
@Value("${datasource2.username}")
private String userName;
@Value("${datasource2.password}")
private String password;
@Value("${datasource2.driverClassName}")
private String driverClass;
/**
* Get data source information
*
* @return Data source information
*/
@Bean(name = "dataSource2")
public DataSource dataSource() {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setDriverClassName(driverClass);
dataSource.setUrl(url);
dataSource.setUsername(userName);
dataSource.setPassword(password);
return dataSource;
}
/**
* Get the data source transaction manager
*
* @return Transaction manager
*/
@Bean(name = "datamineTransactionManager")
public DataSourceTransactionManager datamineTransactionManager() {
return new DataSourceTransactionManager(dataSource());
}
/**
* obtain sqlsessionFactory
*
* @param datamineDataSource
* @return sqlsessionFactory
* @throws Exception
*/
@Bean(name = "datamineSqlSessionFactory")
public SqlSessionFactory sessionFactory(@Qualifier("dataSource2") DataSource dataSource)
throws Exception {
final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
sessionFactory.setDataSource(dataSource);
sessionFactory.setMapperLocations(new PathMatchingResourcePatternResolver().
getResources(DataSourceConfig2.MAPPER_LOCATION));
return sessionFactory.getObject();
}
}

3、 Data reading and processing

I won't talk about the operation of reading data from the database , Everyone has their own way of reading , But it's important to note that if there's a lot of data , In the database, we can set index to improve the efficiency of query , You can use... In your code limit offset The way to read , In order to avoid downtime and other problems . Here I would like to talk about the problem of data processing after obtaining batch data from the database .

Batch product data , The other side made a wonderful request , Ask to push data one by one to kafka On ( I think it's stupid ). Under this kind of request, if the batch obtains the data from the database , For example, store it in a list in , adopt for Loop de ergodic , Then add the current timestamp to each data , And then push it to kafka. There are two problems

1、 Big data problem : Generally speaking , The volume of historical data will be very large , If you take it out in full, there will be performance problems , This problem , Many people can think of using the method of batch acquisition to solve .

2、 Data transmission problem : After reading a large amount of data in batches , How to deal with the data list every time , Single threaded for The efficiency of loop traversal is definitely low after timestamp , At this time, we can consider using thread pool to Multithread to solve this problem .

resolvent : For the problem of large amount of data , There are several steps to consider segmenting the data .

First , Historical data can be obtained in batches according to the date start time and deadline set by the interface , If the amount of data is really huge , Try to set the time difference as small as possible .

secondly , After setting the time period , The interface first needs to calculate the amount of data for this query count, Configure an available thread pool , The number of threads in the thread pool to configure the core thread pool n, So each time you get data from the database, you can segment it again , Can be divided into count / n( perhaps count / n + 1), Finally, use... In the loop limit i*pageSize,pageSize Get it .

Last , Traversal of segmented data , Time stamping and push kafka The logic of is executed in a thread .

The specific code is as follows , For reference only :

Thread pool configuration :

/**
* Thread pool configuration
*/
@Configuration
@EnableAsync
public class ExecutorConfig {
@Value("${threadpool.core.threadNum}")
private int coreThreadNum;
@Value("${threadpool.max.threadNum}")
private int maxThreadNum;
@Value("${threadpool.queue.capacity}")
private int queueCapacity;
/**
* Thread pool configuration
*
* @return Thread pool example
*/
@Bean(name = "serviceExecutor")
public Executor serviceExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(coreThreadNum);
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
threadPoolTaskExecutor.setMaxPoolSize(maxThreadNum);
threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
threadPoolTaskExecutor.setThreadNamePrefix("executor-service-");
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}

controller:

@PostMapping("/product_info/history")
@ApiOperation(value = " Report basic data of historical products ")
public void reportHistoryProInfoData(@RequestParam String startTime, @RequestParam String endTime)
throws Exception {
if (!DateValidUtil.isValidDate(startTime) || !DateValidUtil.isValidDate(endTime)) {
throw new Exception("The date format must be yyyy-MM-dd HH:mm:ss");
}
prodDataService.reportHistoryProductData(startTime, endTime);
}

service Method :

 @Value("${spring.kafka.template.default-topic}")
private String kafkaTopic;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private ProductInfoDataMapper productInfoDataMapper;
@Autowired
private ProductInstallDataMapper productInstallDataMapper;
@Autowired
@Qualifier(value = "serviceExecutor")
private Executor executor;
@Value("${threadpool.core.threadNum}")
private int coreThreadNum;
public void reportHistoryPruductData(String startTime, String endTime) throws InterruptedException {
LOG.info("Report historical product information data");
int count = pluginInfoDataMapper.getHistoryProductDataCount(startTime, endTime);
int pageSize = getPageSize(count);
CountDownLatch countDownLatch = new CountDownLatch(coreThreadNum);
for (int page = 0; page < coreThreadNum; page++) {
int start = page * pageSize;
List<ProductInfo> list = productDataMapper.getHistoryPruData(start, pageSize, startTime, endTime);
ProductDataHandler.handlerProductDataList(list, kafkaTopic, kafkaTemplate, executor, countDownLatch);
}
countDownLatch.await();
}

among handlerProductDataList The method is as follows :

public class ProductDataHandler<T extends ProductBase> implements Runnable {
private String kafkaTopic;
private List<T> list;
private KafkaTemplate<String, String> kafkaTemplate;
private CountDownLatch countDownLatch;
public ProductDataHandler(List<T> list, String kafkaTopic, KafkaTemplate kafkaTemplate,CountDownLatch countDownLatch) {
this.list = list;
this.kafkaTopic = kafkaTopic;
this.kafkaTemplate = kafkaTemplate;
this.countDownLatch = countDownLatch;
}
public static void handlerProductDataList(List datalist, String kafkaTopic, KafkaTemplate template,Executor executor, CountDownLatch countDownLatch) throws InterruptedException {
ProductDataHandler task = new ProductDataHandler(datalist, kafkaTopic, template, countDownLatch);
executor.execute(task);
}
@Override
public void run() {
for (PluginBase base : list) {
Date date = new Date();
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String sendTime = format.format(date);
base.setSendTime(sendTime);
String info = JSON.toJSONString(base);
// Sent to the kafka
kafkaTemplate.send(kafkaTopic, info);
}
countDownLatch.countDown();
}
}

Conclusion

Here are the main details , Because of the tight task , There must be a lot to optimize in the code , If you have any ideas , Welcome to the comments section .
 

版权声明
本文为[osc_ i1f0lp6q]所创,转载请带上原文链接,感谢
https://javamana.com/2021/01/20210121071154871o.html

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云