【IT168 专稿】本文根据刘博宇老师在2018年5月11日【第九届中国数据库技术大会】现场演讲内容整理而成。
讲师简介:
刘博宇,滴滴出行高级软件开发工程师,就职于滴滴基础平台大数据架构部。负责Druid集群维护与研发工作。
摘要:
Druid是一款支持数据实时写入、低延时、高性能的OLAP引擎,具有优秀的数据聚合能力与实时查询能力。在大数据分析、实时计算、监控等领域都有特定的应用场景,是大数据基础架构建设中重要的一环。Druid在滴滴承接了包括实时报表、监控、数据分析、大盘展示等应用场景的大量业务,作为大数据基础设施服务于公司多条业务线。本次演讲我们将介绍Druid的核心特性与原理,以及在滴滴内部大规模使用中积累的经验。
分享大纲:
1、Druid特性简介
2、Druid在滴滴的应用
3、Druid平台化建设
4、展望
正文:
一、Druid特性简介
Druid是针对时间序列数据提供的低延时数据写入以及快速交互式查询的分布式OLAP数据库。其两大关键点是:首先,Druid主要针对时间序列数据提供低延时数据写入和快速聚合查询;其次,Druid是一款分布式OLAP引擎。
针对第一个特点来看,Druid与典型的TSDB,比如InfluxDB、Graphite、OpenTSDB的部分特性类似。这些时序数据库具备一些共同特点,一是写入即可查,通过内存增量索引让数据写入便可查询;二是下采样或RDD,通过下采样或类似于RDD的操作减少数据量,而Druid在数据写入时就会对数据预聚合,进而减少原始数据量,节省存储空间并提升查询效率;三是可能会支持Schema less,在InfluxDB中,用户可任意增加tag,InfluxDB可对新增tag进行聚合查询,但Druid在这点上与InfluxDB略有差异,Druid需要预先定义Schema 。Druid的Schema数据打包在最后形成的数据文件中,数据文件按照时间分片,也就是说过去和未来数据的Schema可以不同,而不同schema的数据可以共存。所以,虽然Druid不是schema less的,但是Schema调整也是比较灵活。
另外,Druid作为一个OLAP数据库。OLAP数据库需要支持类似上卷、切块、切片、下钻等操作,但不适合明细查询。对于类似根据某主键ID定位唯一数据的任务,OLAP数据库并不能友好支持。常用的OLAP数据库实现方式以下几种:1)数据检索引擎,比如ES;2)预计算加KV存储实现,比如Kylin;3)SQL on Hadoop 引擎,比如 Presto、SparkSQL。
接下来,我们就以上中实现进行对比。首先是数据检索引擎的代表ES,ES可以存储结构化和非结构化数据,同时具备明细查询和聚合查询能力,由于其自身是一个数据检索引擎,其索引类型并不是针对聚合分析设计的,所以聚合查询方面开销较大;其次,ES不但要保存所有的原始数据,还需要生成较多的索引,所以存储空间开销会更大,数据的写入效率方面会比Druid差一些。
与ES相比,Druid只能处理结构化数据,因为它必须预定义Schema;其次,Druid会对数据进行预聚合以减少存储空间,同时对数据写入和聚合进行优化。但是,由于进行了预聚合,所以Druid抛弃掉了原始数据,导致其缺少原始明细数据查询能力。如果业务方有需求,可以关闭预聚合,但会丧失Druid的优势。
其次是预计算 + kv存储方式 ,KV存储需要通过预计算实现聚合,可以认为Key涵盖了查询参数,而值就是查询结果,由于直接从KV存储进行查询,所以速度非常快。缺点是因为需要在预计算中处理预设的聚合逻辑,所以损失了查询灵活性,复杂场景下的预计算过程可能会非常耗时,而且面临数据过于膨胀的情况;由于只有前缀拼配一种索引方式,所以在大数据量的复杂过滤条件下,性能下降明显;且缺少聚合下推能力。
与预计算+KV存储方式相比,Druid 是使用Bitmap索引的列式存储,查询速度肯定不如KV存储快; 但是由于使用内存增量索引,增量预聚合的模式,写入即可查,无需等待预计算生成Cube,所以实时性更强;其次,Druid可针对任意维度组合过滤、聚合,查询更加灵活;最后,Scatter & Gather模式支持一定的聚合下推。
最后是SQL on Hadoop, 这类引擎的SQL支持通常很强大,且无冗余数据,不需要预处理。缺点是因为其直接通过计算引擎对Hadoop上的文件进行操作,所以响应速度较慢且QPS相对较低。
与SQL on Hadoop方式相比,Druid的SQL支持有限,但在逐渐完善;必须预定义维度指标。其优势在于可达到亚秒级响应,并发较高。
二、Durid在滴滴的应用
Druid目前在滴滴使用规模大概为多个集群百余台机器,日原始数据写入量在千亿级别,日落盘数据在TB级别,数百实时数据源、千级实时写入任务,日查询量近千万级。主要承接业务有监控、实时报表,大屏展示等。
下图为滴滴实时业务监控案例:
我们的监控体系大概可以分为三层:顶层为业务监控,主要由业务方定义指标,然后配置相应的查询和报警。主要目的在于及时发现业务问题并告警;中层的监控体系是对各服务网关调用的监控日志,主要为了发现某业务问题造成的影响范围和具体影响对象;底层运维体系主要对网络、机器各方面指标进行监控。
之所以业务监控适用Druid,是因为业务指标通常具有较为复杂多变的业务逻辑。Druid本身是一个OLAP引擎,定义一个数据源就可衍生出众多聚合指标,所以很适合这类灵活查询的配置。
第二类应用是实时报表类应用(如下图),实时报表类应用主要用于运营数据分析,客户端网络性能分析以及客服应答实时统计等。这些用户通常是从Hive数据仓库迁移过来的,因为希望获得实时用户体验而Hive查询速度太慢,所以选择迁移。典型应用场景比如快速获取某下雨区域的用户单据,对用户进行优惠券投放进而刺激用户打车。
第三类是大屏展示类应用(如下图),这类应用主要用于呈现业务性关键结果,通常是PV、UV或TOP N查询,非常适合Druid。
三、Druid平台化建设
在承接应用场景的过程中,我们做了很多平台化建设。简单啊介绍下平台化建设的背景: 业务数据主要来源是日志和binlog;公司统一数据通道是kafka;业务指标多样,逻辑复杂多变;Druid接入配置较复杂,除Schema配置外,还包括实时任务配置;数据进入Druid之前通常需要流计算处理,业务方自己开发既费时又很容易出现问题;Druid数据的对应关系以及数据源衍生指标链路较长,需要进行上下游关系梳理;由于Druid官方主要通过API查询,未提供数据可视化服务组件,因此业务方急需数据可视化相关服务组件。
在以上的背景下,我们构建了实时计算平台,架构图如下:
底层引擎有三部分组成,流计算引擎包括Flink Streaming和Spark Streaming,存储部分主要依靠Druid;流计算引擎之上,我们主要开发了WebIDE和任务管理功能,WebIDE在Web内部集成,我们会为用户提供相应模板,用户根据相应的模板进行进行开发,即可形成自己的流计算任务,简化了一些逻辑较简单的常见ETL、Join任务的开发。任务完成后可通过任务管理平台直接提交,同时用户自己在本地开发的流计算任务也可以上传到平台,通过平台执行,平台对任务提供相应的指标检测。基于Druid引擎,配置 Druid数据源,通过数据源衍生指标,每一个指标其实就是对Druid的一个查询。用户可针对指标配置告警。上图右侧的DCube是一个拖拽式数据分析的前端工具,DSQL让用户可直接写SQL的方式输出Druid的即席查询能力。
根据配置的指标进行告警,分为两大类,一类是阈值告警;一类是模型告警。通常对规律性不太强的数值配置阈值告警,对规律性较强的指标配置模型告警。如滴滴每天的订单呼叫量基本上呈现一个早高峰、一个晚高峰,中间较平稳的状态。通常会选取过去一段时间的数据进行模型训练,由用户在得到的预测基线上设置置信区间。如果数据超过置信区间,就会报警。当然,也会存在一些较难处理的特殊情况,比如突然下雨、热门电影首映结束等导致的订单激增,需要额外去考虑一些情况。
上图为基本工作流,整体工作流程为由MySQL的binlog和日志采集数据,形成原始topic,经过ETL或者多流Join进入清洗后的topic,此时用户可以使用平台提供的魔板功能或自行开发流计算任务,我们会为所有流计算任务定制一些默认的实时指标,这些指标和用户的业务数据都会在Druid中建立datasource。Druid也可通过Hive离线导入数据,离线数据源和实时数据源两部分组成了Druid的基本数据,之后根据基本数据构建业务指标、任务指标,完成报警配置。如果业务方具备一定开发能力,需要把数据接入到自己的系统,我们也会提供一些Open API。平台为用户提供了自助式的Druid数据源接入页面,极大简化地了Druid数据接入的复杂过程。
Druid查询采用100% SQL 的Web化配置。Druid原生查询是DSL,类似JSON格式,但学习成本较高。在支持SQL之后,除了部分用户自建服务外,平台所有查询全部迁移到SQL。
在平台化过程中,我们遇到了一些挑战:一是核心业务与非核心业务共享资源,存在一定风险;二是用户自助提交任务配置、查询不合理,造成异常情况,甚至影响整个集群的稳定性;三是随着业务的快速发展,Druid依赖的组件都需要热迁移到独立部署环境。在滴滴内部,由于Druid数据源基本都是用户自助接入,所以业务增长迅速,一年时间几乎涨了四倍,这对Druid依赖组件的热迁移提出了要求。
针对不同重要程度的业务共享资源问题,首先建设 Druid集群的异地双活机制,核心数据源的集群级双活。其次,通过统一网关对用户屏蔽多集群细节,同时根据用户身份进行查询路由,实现查询资源隔离。最后,进行业务分级,核心业务进行集群级双活,对查询资源需求较大但不过分要求实时性的业务分配独立的查询资源组,其他用户使用默认资源池。
上图为基本架构图,首先我们会有多个查询节点分布在不同集群,非核心数据源单写到公共集群,核心数据源双写到两个集群,用户使用身份验证key通过网关路由进行查询。
针对用户配置与查询不合理造成的异常,我们主要做了以下三点:一是引擎层面进行bad case防范,例如,druid数据时间字段设置合理的时间窗口限制,如果数据时间范围异常,我们就会对它进行抛弃;二是对Druid原生API进行封装,提供更加合理的默认配置,主要针对实时任务时长、任务数量以及内存进行配置;三是完善指标监控体系与异常定位手段,保证捕捉到异常查询。Druid和网关日志通常会通过流计算任务进行处理,然后把它们分别写入Druid和ES,数值指标会上报到Graphite,通过Grafana进行展示,综合利用 Druid的聚合分析能力与和ES的明细查询能力定位异常。
针对依赖组件的热迁移问题,Druid主要依赖的组件有三个:ZooKeeper、MySQL、HDFS。在过去一年,滴滴完成了三大组件的迁移,主要过程如下:
1、ZooKeeper迁移原理:扩容-集群分裂-缩容
在滴滴内部,ZK原来是与其他业务共用的,我们需要保证其他业务和Druid都不停服的情况下,把Druid的ZK集群单独迁移出来,所以我们采用了上述迁移方案。核心思路就是先扩容,随后利用两套集群配置,触发集群分裂,最后缩容掉不需要的节点。如图4所示,这七台ZK配置其实有两套,第一套是12347五台,第二套是567三台,但它们的leader都是ZK7,此时7个节点同属一个集群。当重启ZK7之后,两套配置的ZK节点会分别独立选取leader,此时进行集群分裂变成两个单独的ZK集群。
2、MySQL热迁移实践
我们主要使用Kafka-indexing-service作为实时数据写入方式。在实时任务执行过程中,会元数据不断更新到MySQL中的。要想对Mysql进行迁移,首先需要保证元数据在迁移过程中不变,因此首先要了解该数据写入流程。在Kafka-indexing-service的实时任务执行过程中,元数据更新主要来自于实时任务执行状态的变化和数据消费相关的部分API调用。实时任务的生命周期包括读取数据和发布数据两个过程,读取数据是从kafka读取,在内存中建立增量索引;发布数据过程,就是把实时节点消费的数据下推到历史节点,其实是通过首先写到HDFS,然后再由历史节点加载。只有在实时任务状态发生改变时,才会产生元数据更新操作。因此,我们开发了实时任务状态冻结API,把所有实时任务的生命周期都冻结在数据reading的状态,此时进行MySQL迁移,迁移完成之后,重新启动Overlord(一个管理实时任务的节点,所有实时任务的数据状态变化都由Overlord触发),实时任务就会继续进行生命周期迭代。
3、HDFS迁移实践
刚才提到了HDFS在数据发布流程中的作用,数据发布就是指实时节点消费到的数据下推到历史节点的过程。下推的方式就是先推到HDFS,再由历史节点从HDFS拉取。Master节点会告诉历史节点那些数据是需要其拉取的,而从哪里拉取则是从元数据存储中获得的,我们需要做的是保证历史节点可以从两个HDFS读取数据,同时滚动重启实时节点,保证增量数据写到新的HDFS,历史节点从新的HDFS拉取数据。对于存量数据我们只需要更改元数据里历史数据的路径,就可以无缝替换原有HDFS。
接下来简单介绍Druid平台化建设性能优化部分的工作。首先简单介绍下Druid的三种数据写入方式:
1、Standalone Realtime Node
该模式属于单机数据消费,失败后无法恢复,且由于其内部实现机制,该模式无法实现HA,这种方式官方也不推荐使用。
2、Tranquility + indexing-service
该方式主要通过Tranquility 进程消费数据,把数据主动push给indexing- service服务,优势是Tranquility 进程可以帮助管理数据副本、数据实时任务以及生命周期等;其缺点是如果所有副本全部任务失败,无法恢复数据;必须设置数据迟到容忍窗口,该窗口与任务时长挂钩,由于任务时长有限,因此很难容忍长时间数据迟到。
3、Kafka-indexing-service
这是Druid比较新的写入方式,其优势主要体现在数据可靠性及任务恢复方面,弥补了前两种方式的不足。其缺点是数据消费过于依赖Overlord服务,Overlord单机性能将会成为集群规模瓶颈 ;由于Segment与Kafka topic的partition关联,因此容易造成元数据过度膨胀,进而引发性能问题。最终,我们选择Kafka-indexing-service 模式,并开始解决该模式存在的问题。
该写入方式面临的问题经过定位主要有以下三个:一是MySQL查询性能问题, Overlord提供的多个API需要直接对MySQL进行操作,所以MySQL查询性能直接影响Overlord并发水平;二是Druid里所有数据都是JSON存储格式,反序列化非常耗时;三是 Druid中实时任务状态都通过ZK发布,Overlord会监听ZK上面的节点,而这些监听器的回调线程执行函数中会涉及一些MySQL操作,当MySQL比较繁忙时, ZK watch回调单线程模型导致事件处理需要排队。
针对查询性能瓶颈,我们主要针对Druid元数据存储索引进行了优化。通过对Segment定时Merge,合理设置数据生命周期来合并精简元数据; 对Druid数据库连接池DBCP2参数进行优化。针对反序列化与watch回调问题,我们主要对Druid任务管理体系进行了较大修改,进行了Overlord Federation改造,引入namespace概念,每一个namespace下的任务又单独的Overlord管理。这样增加Overlord水平扩展能力,同时亦可做Overlord级别的资源隔离。
四、展望
目前Druid数据消费能力依赖Kafka topic的partition,未来我们希望引入流计算引擎提升单partition消费能力,解耦对Kafka topic partition的依赖,对数据消费和数据处理进行不同的并发度配置。其次,Overlord大量服务涉及对MySQL的直接操作,易导致单机性能瓶颈,后续将会对高并发服务进行内存化改造。数据下推到HDFS之后,Coordinator(Druid里数据分配角色)决定数据的加载位置,这是一个单线程运行模型,当Overload水平扩展之后,每个时间点产生的Segment数量会有很大提升,Coordinator任务处理单线程模型需要优化。最后, 我们希望把Druid部分组件On-yarn,进而提升资源利用率并简化运维操作。