大数据 频道

京东物流数据同步平台“数据蜂巢”架构演进之路

  【IT168 技术】导语:本文根据贺思远老师在2018年5月12日【第九届中国数据库技术大会(DTCC)】现场演讲内容整理而成。

  京东物流系统架构师贺思远

  9年互联网/大数据领域研发、架构经验,2015年加入京东物流,主要负责大数据相关架构与开发工作。

  摘要:数据蜂巢平台是京东物流自主研发的分布式、高性能、高可用、支持异构,离线和实时的大数据同步与管理平台。关键技术:HA;离线与实时同步整合;binlog采集,存储与订阅;客户端并发消费;一致性校验与修复;任务隔离。 目前已经在京东物流系统中大规模应用,比如单源和多源复制,从全国各地仓储园区集群(上百个)实时复制到IDC,从mysql到ES,从mysql到cassandra等等。仓储园区硬件、网络环境复杂,数据蜂巢平台需要考虑硬件设施和网络故障的容错性。议题主要分享平台诞生的背景,使用的关键技术,架构的演进过程,演进过程中所踩过的坑。

  正文:

  京东物流一线数据大部分都存在MySQL数据库上,分布比较广,包括国内外园区库房和IDC,这样数据使用起来极不方便,各业务系统为了使用数据,开发出多种版本的同步工具,结果导致管理非常困难以及资源的浪费。这时就需要一个统一的平台把这些数据管理起来。

  架构设计

  关于数据同步,主要分为:批量同步、实时订阅和实时同步。

  批量同步: 采用sqoop的模式,把数据分片,然后进行多机并发复制,用以提升效率。

  实时订阅:使用消息队列,即将binlog事件解析生成对应的消息存储在队列中;

  实时同步:通过客户端去消费队列将数据写入目标存储,从而实现数据的实时同步。

  如何中将以上三个功能整合到一个平台?

  首先,适配批量同步,参照sqoop, sqoop是借助于hadoop集群提交一个mr作业, 类似的,蜂巢系统也提供一个集群,将一个批量同步作业分为多task去执行。实时订阅也可使用这种思路,不同的是每一个task即对应一台mysql实例的binlog采集;同理实时同步也可分成多个task,每一个task即为一个消息队列的消费客户端。

  架构图如下所示:

  采用了经典的master、slave结构,每一个slave上可以跑三种对应的任务。BatchWorker负责把MySQL数据批量同步到storage里面,这里的storage是一个抽象,不一定就是存储之类的系统,也有可能是一些业务的处理。StreamWorker负责binlog的采集和消息队列的维护,如果有订阅需求,使用系统提供的客户端Consumer就可以直接消费。

  Pieworker则是Consumer的分装,只不过这个客户端由集群去管理维护。

  BatchWorker结构比较简单,主要由Fetcher、Sinker和Storage(与整体架构图中的storage不同,此处为一个buffer)组成。Fetcher接口负责抽取数据,Storage负责缓存数据,Sinker接口负责数据的写入。

  StreamWorker模仿MySQL的主从复制机制, RelayLogTask负责binlog的抽取;HHLTask负责解析binlog,生成消息体,最后存在hhl中,hhl即为上文提到消息队列。

  客户端: StreamJob除了采集binlog,维护消息队列外,还提供了一个ClientServer模块,用于接收客户端的消费请求。当Consumer需要消费的时候,请求会发送到ClientServer,ClientServer通过索引快速的定位到hhl文件的某一个位置,然后把数据读取出来,然后经过客户端的指定过滤规则进行过滤,最终将消息体传送给客户端。

  消息的位点由三部分组成的:Serverld对应的就是MySQL的Serverld;binlogPosition是一个长整型数字,高32位为binlog文件下标,低32位为文件内部的位置;time为对应的binlog事件产生的时间。

  为解决性能问题,客户端支持多种并行模式。

  第一种是以事务为单位串行处理,;

  第二种是在第一种基础上进行了一个简单的优化,比如:一个事务内只对单表操作,那么这个事务是完全可以并发的,不同表间的事务顺序并不会影响最终结果的一致性;但事务内对多表操作就需要继续串行。所以第二种并发模式就是不断串行并行转变的过程。

  以上两种消费模式是以事务为单位,以下两种以行为单位

  第三种是表级并发,将事务拆分为多个行级操作,同表操作由同一线程完成,保障同一表操作的有序性;

  第四种是行级并发,主要是把MySQL的数据同步到NOSQL上时使用,关系型数据库同步时局限性较大,因为行级并发只保障同一主键的操作有序,而关系型数据库会存在多个唯一约束,这样即使保障了主键的操作有序也可能引起数据不一致。

  集群的特性:作为一个集群都需要要保证高可用、数据本地性和负载均衡。高可用这一块主要分为三部分:

  MySQL: 由DBA保证数据库的高可用,但当Mysql主从切换时,binlog的位点是不一致的,此时系统通过Serverld的检测发现该变更,然后通过时间在新的mysql实列上定位正确的binlog位点。

  master(Queen):基于Zookeeper完成Active角色的选举

  Bee(Slave):Bee宕机后由Master将其运行的任务迁移到其他的Slave上。

  数据本地性:每一个Bee在启动时都配置了机房,分组等信息,作业提交时可以指定自己期望的运行位置,与hadoop,spark类似。

  负载均衡: 每一个Bee会将自己的负载信息通过心跳发给Queen,queen进行作业调度时,会在满足数据本地性的前提下选择压力最小的机器去运行新任务。

  演进

  HHL文件丢失:上文提到过,如果Streamworker的运行主机宕机,Master会把它迁移到另外一台机器上,但是Streamworker采集解析的binlog存在本地,迁移后会引起数据丢失,解决这个问题通用的方案是多副本,但大数据量下的多复本会造成磁盘空间的浪费,尤其是在库房环境下;并且这些数据有一个特点,就是发生迁移时,虽然解析过的数据丢失,但是原始binlog都会在机器上保存(dba会保留n天的binlog数据),最终可以通过数据补全来保证数据不丢失。

  上图为第一版streamworker的扩展,最右面的可以认为是这一组的主线程,虚线位置发生任务迁移,切换到了新的主机上,此时虚线左边的数据全部丢失。如果有客户端需要消费丢失的数据,服务端则启动一组新线程,然后进行catchup,catchup会把丢失的部分补齐并提供给客户端消费。

  元数据: binlog是不记录字段名等元数据的,而客户端消费时需要。最简单的方式是收集到binlog之后,去源库上查询,但在binlog采集延迟期间如果有ddl操作,会导致元数据不准确。为解决该问题系统实现了一个快照模块。在StreamJob初次启动的时候,把对应的MySQL里面所有表都做一份快照,在此后的运行期间监控DDL操作,当解析到DDL操作时会将原快照取出生成一个复本,并在这个复本上应用这个ddl,生成新的快照 。这样系统可以保证任何时刻binlog对应的元数据都是正确的,方便用户使用。

  客户端:服务端并不记录客户端的消费位点,消费的位置由客户端自行存储。由于客户端采用的是并发消费模式,消息又是严格有序的,此时位点记录就必须保障每一个记录下的位点之前的所有消息都被正确处理了,此处引入了一个环形提交队列(具体实现与disruptor类似),当连续的多个消息被正确处理,并达到记录位点的间隔,此时提交队列会将一个位点写入对应的存储介质。比如1,2,3,4,5,8,10,14被处理完成,位点提交间隔为5,则5位置对应的位点被记录,当6,7,9被处理完成后10再被记录。

  以下主要为易用性的改进:

  SQL: 用户通过SQL描述需要同步哪些字段,同步条件等信息,服务端通过解析sql执行对应的同步逻辑。

  Union: 用于处理多表合一的场景,通过加入来源标识字段来解决唯一约束。

  Join: 在同步的过程中完成宽表的加工,内部通过缓存,布隆过滤器等优化方案来提升性能。

  一致性较验:

  系统提供了两种模型:

  第一种是使用pt_table_checksum的思路,比如要较验一张表的数据,先把这张表的数据分成多个片段,然后对这些片段进行crc计算,并将结果和计算的范围存储到同一个数据库的表里去,此时会触发一个binlog事件,当消费者消费到这个事件后重现该操作,通过比对计算结果值来确定数据是否一致,这种方式一是侵入性比较强,需要在原库上建对应的比对结果表,二是需要加锁,三是对延迟的要求很高,当延迟较大时,消费端拿不到对应的比对事件,将无法确定数据一致性。

  第二种数据校验的模型是基于BatchJob实现。

  Fetcher抽取源和目标双方的数据,排序后通过storage把数据传递给sinker,Sinker根据用户自定义的比较接口对数据进行比较,最后将差异通过Collector进行收集。这种比对方式的缺点是不能保证时间序列,在比对的时候数据是变更的,比对出来的结果可能并不是真正的差异,此时我们需要修复比对结果,首先在比对开始时把消费端的位点记录下来,比对完成后,如果有差异则从比对开始前记录的位点进行binlog重放,通过分析binlog中操作,对差异进行修复,输出最终结果。 该比对模型最大的问题是需要抽取比对双方的数据,对带宽占用较大,为减少网络传输,内部会对数据进行初步的筛选,将要比对的数据分成多个片段,在存储端对每个片段进行md5计算,fetcher只收集md5值 ,只有双方md5值不同时才将真正的数据抽取到计算端进行比较。

  修复:

  系统提供两种修复方式:

  一是基于binlog事件传递,当比对出差异数据后,只需要在源库上对差异数据进行一个伪操作(比如更改update_time字段),触发binlog事件的产生,消费端收到该事件即可修复错误数据(事件对应的消息体内包含了所有对应字段的值,并不只限于update_time字段),该方式缺点是受延迟的影响,同时还需要源库的写权限。

  二是直接修复,即直接在目标上将差异数据修正。在修复数据时需要加锁或暂停同步,避免并发问题。

  资源的隔离: 系统默认只提供了常用的同步功能,当默认实现无法满足客户需求时,用户需要自行编写代码来实现对应的接口来完成他们的逻辑。Bee开始使用的线程模型,即每一个Bee可以看做一个线程池,多个任务都在同一线程池内运行,而用户自定义代码又无法控制,导致不同任务相互影响,更有甚者任务结束后资源不释放。为保证资源隔离,将线程改为进程,将不同作业的任务交由不同的子进程去执行,子进程启动时会指定额定运行资源;所有子进程由Bee统一管理,Bee不再运行具体任务,作业结束后,Bee将未退出的子进程全部强制杀死。

1
相关文章