Practice of building real time search function in the website based on Kafka and elasticsearch

George jizhuozhi 2021-02-23 12:59:02
practice building real time search


At present, we are building a multi tenant and multi product website , In order to let users better find the products they need , We need to build on-site search function , And it should be updated in real time . This article will discuss the core infrastructure for building this function , And the technology stack supporting this search capability .

Problem definition and decision making

To build a fast 、 Real time search engine , We have to make some design decisions . We use MySQL As the primary database store , Therefore, there are the following options :

  1. Directly in MySQL Query each keyword that the user enters in the search box in the database , It's like %#{word1}%#{word2}%... such .
  2. Using an efficient search database , Such as Elasticsearch.

Considering that we are a Multi tenant applications , The entities being searched at the same time may need a lot of Associated operations ( If we use MySQL A kind of relational database ), Because different types of products have different data structures , So we can also traverse at the same time Multiple tables To query the keywords entered by the user . So we decided not to use it directly in MySQL The scheme of searching key words in .🤯

therefore , We have to decide on an efficient 、 A reliable way , Put the data real time From MySQL Migrate to Elasticsearch in . The next step is to make the following decision :

  1. Use Worker Check regularly MySQL database , And send all the changed data to Elasticsearch.
  2. Use... In an application Elasticsearch client , Write data to at the same time MySQL and Elasticsearch in .
  3. Using an event based flow engine , take MySQL Data changes in the database as events , To the streaming server , After processing, forward it to Elasticsearch.🥳

Options 1 It's not real-time , So it can be ruled out directly , And even if we shorten the polling interval , It will also cause full table scanning and database query pressure . Except it's not real time , Options 1 Deletion of data is not supported , If you delete the data , So we need extra tables to record the data that existed before , Only in this way can we ensure that users will not search for dirty data that has been deleted . For the other two options , Different application scenarios may make different decisions . In our scenario , If you choose the option 2, So we can foresee some problems : As if Elasticsearch It's slow to establish a network connection and confirm updates , So this might slow down our application ; Or writing Elasticsearch An unknown exception occurred when , How do we try this operation again to ensure data integrity ; It is undeniable that not all developers on the development team can understand all the functions , If there are developers who don't introduce new product related business logic Elasticsearch client , Then we will be in Elasticsearch Update this data change in , No guarantee MySQL And Elasticsearch Data consistency between .

Next, we should consider how to make the MySQL Data changes in the database as events , To the streaming server . We can change the database , Clients that use message pipelining in an application synchronously send events to the message pipeline , But this does not solve the above mentioned use Elasticsearch Problems with clients , It's just taking the risk away from Elasticsearch Moved to the message pipeline . In the end, we decided to collect MySQL Binlog, take MySQL Binlog As a way to send events to the message pipeline Event based flow engine . About binlog The content of can be Click the link , I won't repeat it here .

Service profile

In order to provide a unified search interface , We first need to define the data structure for search . For most search systems , Search results presented to users usually include title and Content , We call this part Searchable content (Searchable Content). In the multi tenant system, we also need to indicate which tenant the search result belongs to in the search results , Or to filter the searchable content of the current tenant , We also need additional information to help users filter the product categories they want to search for , We call this part of the general content that is not used for search Metadata (Metadata). Last , When we display search results, we may want to provide different display effects according to different types of products , We need to return in search results what these personalized presentations need Original content (Raw Content). So far, we can define the storage to Elasticsearch The general data structure in :

{
"searchable": {
"title": "string",
"content": "string"
},
"metadata": {
"tenant_id": "long",
"type": "long",
"created_at": "date",
"created_by": "string",
"updated_at": "date",
"updated_by": "string"
},
"raw": {}
}

infrastructure

Apache Kafka: Apache Kafka It's an open source distributed event flow platform . We use Apache kafka As database events ( Insert 、 Modification and deletion ) Persistent storage of .

mysql-binlog-connector-java: We use mysql-binlog-connector-java from MySQL Binlog Get database events from , And send it to Apache Kafka in . We're going to start a single service to complete this process .

At the receiving end, we will also start a separate service to consume Kafka In the event , And process the data and send it to Elasticsearch in .

Q: Why not use Elasticsearch connector Such connectors process the data and send it to Elasticsearch in ?
A: In our system, it is not allowed to store large text into MySQL Medium , So we use an extra object storage service to store our product documents , So we can't send data directly to Elasticsearch in .
Q: Why not send it to Kafka The data will be processed before ?
A: In this way, a large amount of data will be persisted to Kafka in , Occupy Kafka Of disk space , And this part of the data is actually stored in Elasticsearch.
Q: Why use a separate service to collect binlog, Instead of using Filebeat And so on. agent?
A: Of course, you can go directly to MySQL Database agent To collect directly binlog And send it to Kafka in . But in some cases, developers use cloud services or other infrastructure services MySQL The server , In this case, we can't go directly to the server to install agent, So use a more generic 、 Noninvasive C/S Structure to consume MySQL Of binlog.

Configure the technology stack

We use docker and docker-compose To configure and deploy Services . For the sake of simplicity ,MySQL Directly used root As user name and password ,Kafka and Elasticsearch Using a single node cluster , And no authentication method is set , For development environment use only , Do not use it directly in the production environment .

version: "3"
services:
mysql:
image: mysql:5.7
container_name: mysql
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: app
ports:
- 3306:3306
volumes:
- mysql:/var/lib/mysql
zookeeper:
image: bitnami/zookeeper:3.6.2
container_name: zookeeper
ports:
- 2181:2181
volumes:
- zookeeper:/bitnami
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: bitnami/kafka:2.7.0
container_name: kafka
ports:
- 9092:9092
volumes:
- kafka:/bitnami
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0
container_name: elasticsearch
environment:
- discovery.type=single-node
volumes:
- elasticsearch:/usr/share/elasticsearch/data
ports:
- 9200:9200
volumes:
mysql:
driver: local
zookeeper:
driver: local
kafka:
driver: local
elasticsearch:
driver: local

After the service is successfully started, we need to provide Elasticsearch Create index , Here we use it directly curl call Elasticsearch Of RESTful API, You can also use busybox The basic image creation service completes this step .

# Elasticsearch
curl "http://localhost:9200/search" -XPUT -d '
{
"mappings": {
"properties": {
"searchable": {
"type": "nested",
"properties": {
"title": {
"type": "text"
},
"content": {
"type": "text"
}
}
},
"metadata": {
"type": "nested",
"properties": {
"tenant_id": {
"type": "long"
},
"type": {
"type": "integer"
},
"created_at": {
"type": "date"
},
"created_by": {
"type": "keyword"
},
"updated_at": {
"type": "date"
},
"updated_by": {
"type": "keyword"
}
}
},
"raw": {
"type": "nested"
}
}
}
}'

Core code implementation (SpringBoot + Kotlin)

Binlog Acquisition terminal :

 override fun run() {
client.serverId = properties.serverId
val eventDeserializer = EventDeserializer()
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
)
client.setEventDeserializer(eventDeserializer)
client.registerEventListener {
val header = it.getHeader<EventHeader>()
val data = it.getData<EventData>()
if (header.eventType == EventType.TABLE_MAP) {
tableRepository.updateTable(Table.of(data as TableMapEventData))
} else if (EventType.isRowMutation(header.eventType)) {
val events = when {
EventType.isWrite(header.eventType) -> mapper.map(data as WriteRowsEventData)
EventType.isUpdate(header.eventType) -> mapper.map(data as UpdateRowsEventData)
EventType.isDelete(header.eventType) -> mapper.map(data as DeleteRowsEventData)
else -> emptyList()
}
logger.info("Mutation events: {}", events)
for (event in events) {
kafkaTemplate.send("binlog", objectMapper.writeValueAsString(event))
}
}
}
client.connect()
}

In this code , First of all, we are right binlog The client is initialized , And then we start monitoring binlog event .binlog There are many types of events , Most of them are events that we don't need to care about , We just need to pay attention TABLE\_MAP and WRITE/UPDATE/DELETE Can . When we receive TABLE\_MAP event , We will update the database table structure in memory , In the subsequent WRITE/UPDATE/DELETE Incident , We will use the database structure of memory cache to map . The whole process is like this :

Table: ["id", "title", "content",...]
Row: [1, "Foo", "Bar",...]
=>
{
"id": 1,
"title": "Foo",
"content": "Bar"
}

Then we send the collected events to Kafka in , And by the Event Processor Conduct consumption processing .

Event handler

@Component
class KafkaBinlogTopicListener(
val binlogEventHandler: BinlogEventHandler
) {
companion object {
private val logger = LoggerFactory.getLogger(KafkaBinlogTopicListener::class.java)
}
private val objectMapper = jacksonObjectMapper()
@KafkaListener(topics = ["binlog"])
fun process(message: String) {
val binlogEvent = objectMapper.readValue<BinlogEvent>(message)
logger.info("Consume binlog event: {}", binlogEvent)
binlogEventHandler.handle(binlogEvent)
}
}

use first SpringBoot Message Kafka Provide annotations to consume events , Next, delegate the event to binlogEventHandler To deal with . actually BinlogEventHandler It's a custom functional interface , We customize the event handler to implement the interface Spring Bean The way to inject KafkaBinlogTopicListener in .

@Component
class ElasticsearchIndexerBinlogEventHandler(
val restHighLevelClient: RestHighLevelClient
) : BinlogEventHandler {
override fun handle(binlogEvent: BinlogEvent) {
val payload = binlogEvent.payload as Map<*, *>
val documentId = "${binlogEvent.database}_${binlogEvent.table}_${payload["id"]}"
// Should delete from Elasticsearch
if (binlogEvent.eventType == EVENT_TYPE_DELETE) {
val deleteRequest = DeleteRequest()
deleteRequest
.index("search")
.id(documentId)
restHighLevelClient.delete(deleteRequest, DEFAULT)
} else {
// Not ever WRITE or UPDATE, just reindex
val indexRequest = IndexRequest()
indexRequest
.index("search")
.id(documentId)
.source(
mapOf<String, Any>(
"searchable" to mapOf(
"title" to payload["title"],
"content" to payload["content"]
),
"metadata" to mapOf(
"tenantId" to payload["tenantId"],
"type" to payload["type"],
"createdAt" to payload["createdAt"],
"createdBy" to payload["createdBy"],
"updatedAt" to payload["updatedAt"],
"updatedBy" to payload["updatedBy"]
)
)
)
restHighLevelClient.index(indexRequest, DEFAULT)
}
}
}

Here we just need to judge whether it is a deletion operation , If it's a delete operation, it needs to be done in Elasticsearch Delete data , And if it's a non deletion operation, it only needs to be done in Elasticsearch Just index the document again . This code simply uses Kotlin Provided in mapOf Method to map data , If you need other complicated processing, just follow Java Write the processor in the way of code .

summary

This is the first blog written by the author after his work , Lack of talent and learning , There may be some problems in some places, such as technical understanding is not deep enough, improper use of words and so on , Thank you for your help .
Actually Binlog There are many open source processing engines in the processing section of , Include Alibaba Canal, This article uses the manual processing method is also for other use of non MySQL Data source students have similar solutions . You can take what you need , Adjust measures to local conditions , Design your own real-time search engine for your website !
The complete code mentioned in this article has been uploaded to Gitee, Welcome to a Star~ Portal

版权声明
本文为[George jizhuozhi]所创,转载请带上原文链接,感谢
https://javamana.com/2021/02/20210223125822099u.html

  1. Mybatis profile depth
  2. 深入理解Java虚拟机是怎么实现synchronized的?
  3. On the relationship among object, class, abstract class, interface and inheritance in Java
  4. Spring security real combat dry goods: the core logic of oauth2 login to obtain token
  5. In depth understanding of how Java virtual machine implements synchronized?
  6. 关于Java中的对象、类、抽象类、接口、继承之间的联系
  7. Spring Security 实战干货:OAuth2登录获取Token的核心逻辑
  8. On the relationship among object, class, abstract class, interface and inheritance in Java
  9. Spring security real combat dry goods: the core logic of oauth2 login to obtain token
  10. Spring事务
  11. 解决Docker MySQL无法被宿主机访问的问题
  12. Spring Boot 构建一个RESTful Web服务
  13. Spring transactions
  14. Solve the problem that docker MySQL cannot be accessed by the host
  15. Build a restful web service with spring boot
  16. Java中针对Yaml格式数据操作记录
  17. Java for yaml format data operation record
  18. Spring Authorization Server 全新授权服务器整合使用
  19. Java 集合处理/ 空值处理/ 异常处理,使用心得分享!
  20. Integrated use of new authorization server of spring authorization server
  21. Java collection processing / null value processing / exception processing, use experience sharing!
  22. mysql Innodb_flush_log_at_trx_commit 和 sync_binlog
  23. mysql Innodb_ flush_ log_ at_ trx_ Commit and sync_ binlog
  24. 不能回滚的Redis事务还能用吗
  25. 不能回滚的Redis事务还能用吗
  26. Can redis transactions that cannot be rolled back be used
  27. 23种java设计模式
  28. Java、JavaScript、C、C++、PHP、Python都是用来开发什么?
  29. Docker overlay 清理空间
  30. 「Linux」- 安装 Opera 浏览器 @20210223
  31. java的byte和C#的byte的不同之处
  32. Can redis transactions that cannot be rolled back be used
  33. 23 Java design patterns
  34. What are Java, JavaScript, C, C + +, PHP and python used to develop?
  35. Docker overlay cleaning space
  36. "Linux" - install opera browser @ 20210223
  37. Differences between Java byte and C byte
  38. SAP UI5 JavaScript文件的lazy load - 懒加载
  39. Java 在Excel中添加筛选器并执行筛选
  40. LiteOS:盘点那些重要的数据结构
  41. Lazy load lazy load of SAP ui5 JavaScript files
  42. Add filter and execute filter in excel by Java
  43. Liteos: inventory those important data structures
  44. HDFS依然是存储的王者
  45. [MySQL]事务的MVCC原理与幻读
  46. 93.7%的程序员!竟然都不知道Redis为什么默认16个数据库?
  47. Java 集合处理/ 空值处理/ 异常处理,使用心得分享!
  48. Spring Authorization Server 全新授权服务器整合使用
  49. Spring Security 实战干货:OAuth2登录获取Token的核心逻辑
  50. Java中各种锁的原理解析
  51. java的byte和C#的byte的不同之处
  52. Java 在Excel中添加筛选器并执行筛选
  53. HDFS is still the king of storage
  54. Mvcc principle and unreal reading of [MySQL] transaction
  55. 93.7% of programmers! Why does redis default to 16 databases?
  56. Java collection processing / null value processing / exception processing, use experience sharing!
  57. Integrated use of new authorization server of spring authorization server
  58. Spring security real combat dry goods: the core logic of oauth2 login to obtain token
  59. Principle analysis of various locks in Java
  60. Differences between Java byte and C byte