HTTP long polling for data synchronization

zhurd 2021-01-23 23:18:04
http long polling data synchronization


The goal is

  • soul http Long polling Mode data synchronization principle and source code analysis

We were right in the last one Soul The gateway zookeeper A simple analysis of data synchronization mode is made , Got it zookeeper The basic process of synchronization . Now let's take a look at Next Soul The gateway http Long polling Data synchronization mode .

Synchronization principle

Soul gateway http Synchronization principle :

Soul Learn from it ApolloNacos Design idea , Extract its essence , Did it on its own http Long polling data synchronization function . Be careful , It's not traditional here Ajax Long polling .http The long polling mechanism is shown in the figure below :

image-20210122210328040

soul-web Gateway request soul-admin Configuration services for , The read timeout is 90s, This means that the gateway layer requests configuration services will wait at most 90s, This is convenient admin Configuration services respond to change data in a timely manner , So as to achieve quasi real-time push .

soul-web Of HTTP Request arrival sou-admin after , Not immediately responding to data , But the use of Servlet3.0 Asynchronous mechanism of , Throw the long polling request task to BlocingQueue in , And start scheduling tasks ,60s After execution . The purpose of this is 60s Then remove the long polling request from the queue , Even if there are no configuration data changes during this period of time , And let the gateway know . And when the gateway requests to configure the service , Also have 90s Timeout for .

Soul Gateway on http Long polling Sync :

  • soul-bootstrap Add the following dependencies :

     <!--soul data sync start use http-->
    <dependency>
    <groupId>org.dromara</groupId>
    <artifactId>soul-spring-boot-starter-sync-data-http</artifactId>
    <version>2.2.1</version>
    </dependency>
  • application.yml Add related configuration

    soul :
    sync:
    http:
    url: http://localhost:9095
    #url: Configure it for your zk Address , For cluster environment, please use (,) Separate 

soul-admin To configure , Or in the soul-admin Set... In startup parameters --soul.sync.http.enabled=true, Then restart the service

 sync:
http:
enabled: true

Source code analysis

soul-admin Data synchronization

soul-admin Data change notification for ,Soul Three data synchronization methods of gateway webscoket、zookeeper、http Long polling The principle is the same , It's just that different data synchronization configurations have different event processors , Last one zookeeper Data synchronization has been analyzed , I'm not going to go over it here .

  • htttp Long polling Listener source code analysis

We've opened it in front of us soul.sync.http.enabled=true, Then there must be a place in the project to read the configuration . adopt `soul.sync.http Search for configuration classes for data synchronization DataSyncConfiguration, Here is http Long polling Configuration code for :

 /**
* http long polling.
*/
@Configuration
@ConditionalOnProperty(name = "soul.sync.http.enabled", havingValue = "true")
@EnableConfigurationProperties(HttpSyncProperties.class)
static class HttpLongPollingListener {
@Bean
@ConditionalOnMissingBean(HttpLongPollingDataChangedListener.class)
public HttpLongPollingDataChangedListener httpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
return new HttpLongPollingDataChangedListener(httpSyncProperties);
}
}

HttpLongPollingDataChangedListener Class is DataChangedListener The concrete implementation of the interface , Handle http Long polling Data push , The core code is as follows :

 /**
* If the configuration data changes, the group information for the change is immediately responded.
* Otherwise, the client's request thread is blocked until any data changes or the specified timeout is reached.
*
* @param request the request
* @param response the response
*/
public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
// Check if the client needs to update the cache ,md5 atypism , Respond immediately
// Be careful : Here is the configuration grouping information , After the gateway receives the response message , I just know which one Group A configuration change has occurred , It needs to be requested again Group Configuration data of
List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
String clientIp = getRemoteIp(request);
// response immediately.
if (CollectionUtils.isNotEmpty(changedGroup)) {
this.generateResponse(response, changedGroup);
log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
return;
}
// listen for configuration changed.
// obtain servlet3.0 Asynchronous processing of HTTP request
final AsyncContext asyncContext = request.startAsync();
// AsyncContext.settimeout() does not timeout properly, so you have to control it yourself
asyncContext.setTimeout(0L);
// block client's thread.
// Block client threads 60 second
scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
}
/******************LongPollingClient Core code *****************************/
public void run() {
// Join a scheduled task , If 60s There are no configuration changes within , be 60s After execution , Respond to http request
this.asyncTimeoutFuture = scheduler.schedule(() -> {
// Remove the current long poll from the queue ( clients It's blocking the queue , Saved from soul-web Request information for )
clients.remove(LongPollingClient.this);
// Get the configuration grouping information in the current request
List<ConfigGroupEnum> changedGroups
= compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
// Send response request
sendResponse(changedGroups);
}, timeoutTime, TimeUnit.MILLISECONDS);
clients.add(this);
}
/**
* Send response datagram. Send response packets
*
* @param response the response
* @param changedGroups the changed groups
*/
private void generateResponse(final HttpServletResponse response, final List<ConfigGroupEnum> changedGroups) {
try {
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setContentType(MediaType.APPLICATION_JSON_VALUE);
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().println(GsonUtils.getInstance().toJson(SoulAdminResult.success(SoulResultMessage.SUCCESS, changedGroups)));
} catch (IOException ex) {
log.error("Sending response failed.", ex);

thus ,soul-admin The data transmission has been completed .

soul-bootstrap Gateway data synchronization

Turn on http Long polling Sync , Need to be in soul-bootstrap Introduction in soul-spring-boot-starter-sync-data-http, Find the corresponding customization in the project spring-boot-starter, Found out HttpSyncDataConfiguration Configuration class .

/**
* Http sync data configuration for spring boot.
*
* @author xiaoyu(Myth)
*/
@Configuration
@ConditionalOnClass(HttpSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.http", name = "url")
@Slf4j
public class HttpSyncDataConfiguration {
/**
* Http sync data service.
*
* @param httpConfig the http config
* @param pluginSubscriber the plugin subscriber
* @param metaSubscribers the meta subscribers
* @param authSubscribers the auth subscribers
* @return the sync data service
*/
@Bean
public SyncDataService httpSyncDataService(final ObjectProvider<HttpConfig> httpConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
log.info("you use http long pull sync soul data");
return new HttpSyncDataService(Objects.requireNonNull(httpConfig.getIfAvailable()), Objects.requireNonNull(pluginSubscriber.getIfAvailable()),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
/**
* Http config http config.
*
* @return the http config
*/
@Bean
@ConfigurationProperties(prefix = "soul.sync.http")
public HttpConfig httpConfig() {
return new HttpConfig();
}
}

HttpSyncDataService In order to achieve http Long polling The core of the class , Here is the core code :

 private void start() {
// RUNNING = new AtomicBoolean(false), The default is false
// compareAndSet: If the current status value is equal to the expected value , Then the synchronization state is set to the given update value atomically
if (RUNNING.compareAndSet(false, true)) {
// fetch all group configs.
// request configs/fetch, Get the latest configuration information
this.fetchGroupConfig(ConfigGroupEnum.values());
int threadSize = serverList.size();
this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
SoulThreadFactory.create("http-long-polling", true));
// start long polling, each server creates a thread to listen for changes.
// Turn on polling , If multiple soul-admin The server , Each server turns on polling
this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
} else {
log.info("soul http long polling was started, executor=[{}]", executor);
}
}
/****************** HttpLongPollingTask Core code *****************************/
// Group request soul-admin/configs/listener Listening interface
private void doLongPolling(final String server) {
// The last update time of each group of cache
MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);
for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
ConfigData<?> cacheConfig = factory.cacheConfigData(group);
String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));
params.put(group.name(), Lists.newArrayList(value));
}
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
HttpEntity httpEntity = new HttpEntity(params, headers);
String listenerUrl = server + "/configs/listener";
log.debug("request listener configs: [{}]", listenerUrl);
JsonArray groupJson = null;
try {
// Perform the requested
String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");
} catch (RestClientException e) {
String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());
throw new SoulException(message, e);
}
if (groupJson != null) {
// Get group configuration information asynchronously
ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
if (ArrayUtils.isNotEmpty(changedGroups)) {
// Group request soul-admin/configs/listener Listening interface , Get the latest configuration information
this.doFetchGroupConfig(server, changedGroups);
}
}
}
// Group request soul-admin/configs/listener Interface , Get the latest configuration information
private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
StringBuilder params = new StringBuilder();
for (ConfigGroupEnum groupKey : groups) {
params.append("groupKeys").append("=").append(groupKey.name()).append("&");
}
// Assembly request url Address
String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
String json = null;
try {
// Get the latest configuration information
json = this.httpClient.getForObject(url, String.class);
} catch (RestClientException e) {
log.warn(message);
throw new SoulException(message, e);
}
// Update local cache
boolean updated = this.updateCacheWithJson(json);
ThreadUtils.sleep(TimeUnit.SECONDS, 30);
}
/**
* Update local cache
* @param json the response from config server.
* @return true: the local cache was updated. false: not updated.
*/
private boolean updateCacheWithJson(final String json) {
JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
JsonObject data = jsonObject.getAsJsonObject("data");
// if the config cache will be updated?
return factory.executor(data);
}
/**
* Perform the update local cache operation
*
* @param data the data
* @return the boolean
*/
public boolean executor(final JsonObject data) {
final boolean[] success = {false};
ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data));
return success[0];
}

Why let me know first Group A configuration change has occurred , adopt Group request soul-admin Of /configs/fetch Interface to get specific configuration information , Instead of writing out the changed data directly ?

because http Long polling The mechanism can only guarantee quasi real time , If it is not handled in time in the gateway layer , Or the administrator updates the configuration frequently , It's very likely that you missed the push of a configuration change , Be on the safe side , Just tell one person Group The information has changed .

thus ,http Long polling Data synchronization source code analysis completed .

版权声明
本文为[zhurd]所创,转载请带上原文链接,感谢
https://javamana.com/2021/01/20210123231721021k.html

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云