blog/source/_posts/es_mysql_data_sync_upgrade.md

4.8 KiB
Raw Permalink Blame History

title tags categories abbrlink date
记一次业务系统与ES的同步方案升级
cbda41f1 2023-08-28 15:05:47

背景

目前ES中接入了诸多业务线其技术方案一直延续第一版由ES处理获取数据和搜索的业务逻辑。随着时间的发展并发场景下发现了数据不一致,数据被重复覆盖的问题。而且从架构上search引入业务系统的API也不是合理的架构。

现状

写入链路如下

@startuml
  业务系统 -> MQ: 发送变更事件
  MQ --> Search: 订阅变更事件
  
  Search -> 业务系统: 获取业务数据
 Search -> Search: 幂等处理
Search -> ES: 索引数据
  @enduml

读取链路如下

@startuml
用户 -> 业务系统: 发起搜索
  业务系统 -> Search: 发起检索请求
  Search --> Search: 组装请求
  Search -> ES: 检索数据
  ES --> Search: 响应数据
  Search --> 业务系统: 返回响应
业务系统 -> 业务系统: 读取详细业务数据
业务系统 --> 用户: 展示搜索结果
  @enduml

因为MQ的发送后的消费并不能保证强一致可能会导致乱序执行。至于不用分片顺序因为MQ中会有批量导致无法区分分片信息。search同时还需要作幂等限流等导致Search服务更重。

目标

暂时的目标如下:

  1. 减轻Search的循环依赖的问题Search不再依赖业务系统。
  2. 减轻Search服务的业务逻辑
  3. 解决并发下的数据不一致问题

技术方案

本次改造只涉及写入流程,不会涉及读取流程

主要优化

  • 全流程新单链路独立,优化新单的可搜索延迟
  • Search去除业务系统依赖由业务系统自发自接数据处理后统一发送到Search。

时序图

@startuml


participant 业务系统 as 业务系统
queue  变更MQ       as 变更MQ
queue  分片顺序新单MQ       as 分片顺序新单MQ
queue  分片顺序MQ       as 分片顺序MQ
participant Search as Search
participant ES as ES

  业务系统 -> 变更MQ: 发送变更事件
  变更MQ --> 业务系统: 订阅变更事件
alt 并发控制
  业务系统 -> 业务系统: 获取业务数据
alt 是新单
业务系统 -> 分片顺序新单MQ: 发送业务数据
分片顺序新单MQ -> Search: 订阅业务数据
Search -> ES: 索引数据
else 不是新单
业务系统 -> 分片顺序MQ: 发送业务数据
分片顺序MQ -> Search: 订阅业务数据
Search -> ES: 索引数据
end
end
  @enduml

新单的延迟优化

在一般业务场景中,新单的延迟比单子的搜索不准确感知更明显。针对新单的我们需要做一些特殊优化,使延迟降低到无感的地步。

建议

新建单操作与更新操作隔离新建单使用单独的topic

并发下数据一致性

在无法保证单次操作时,仅发送一次变更事件的情况下,使用分布式锁保证数据一致性,是相对来说比较好的方案。

建议

使用分布式锁,对订单上锁,加锁范围 见 时序图的【并发控制】

缓解ES的写入压力

很多时候更新可能只是触发一次保存操作实际上数据没有变化的业务系统中可以做一层对比层数据有变化再发送到Search。由此缓解ES的写入压力。其次与单次操作无法保证仅发送一次变更事件一样多次变更是无效的。

建议

内部使用DB或者Redis建立一层针对单号的对比逻辑与上次发送的数据如果一致则不发送。

该对比逻辑需要有开关控制使用快捷的手段【acmdb配置等】一分钟内可操作开启关闭

数据传输优化

在系统中以拖车40HC*30箱为例【WY268171】履约系统的业务数据转换成JSON后有27万字符270KB。这种场景下对MQ的压力会相对较大对于大JSON是否需要空间换时间值得考虑。

从空间角度来说

以该JSON为例如使用gzip压缩大小可以压缩到12KB4%的压缩率即压缩后节约96%的空间。但是代价是什么呢?

从时间角度来说

该字符串压缩100次 632毫秒每次压缩6毫秒解压100次 361毫秒每次解压3.6毫秒

结论

由此可见对于大于1万个字符的字符串以空间换时间比较划算

压缩相关由Search提供工具包

  1. 压缩工具类
  2. 压缩枚举
  3. Message的数据处理

Message定义

字段 类型 备注
indexName String 索引类型由Search提供常量
indexId String 索引下唯一的ID
systemName String 系统名称,系统随便定义
timestamp long 时间戳
compress· String 压缩类型枚举【todo】
dataJson String 明文数据【未压缩下的数据】
dataByte byte[] 字节数组【压缩后的数据】
paramExt Map<String,String> 附属信息

结果

// todo