【直播全文记录】实时计算在点评

此文根据【QCON高可用架构群】分享内容,由群内【编辑组】志愿整理,转发请注明出处。

王新春:大众点评网数据平台资深工程师 负责点评实时计算平台相关工作,推动流式计算和实时计算在点评的应用和推广,一直致力于大数据和分布式系统的研究和应用。 目前主要从事NoSQL、实时分布式系统的研究与开发。著有《Storm技术内幕与大数据实践》。

实时计算在点评的使用场景

类别一:Dashboard、实时DAU、新激活用户数、实时交易额等
类别二:搜索、推荐、安全等

以搜索为例:用户在点评的每一步有价值的操作(包括:搜索、点击、浏览、购买、收藏等),都将实时、智能地影响搜索结果排序,从而显著提升用户搜索体验、搜索转化率。

某用户 搜索“ 火锅 ”,当他 在搜索结果页 点击了“ 重庆高老九火锅 ”后, 再次刷新搜索结果列表时,该商户的排序就会提升到顶部 。

再结合其他的一些实时反馈的个性化推荐策略,最终使团购的交易额有了明显的增加,转化率提升了2个多点。

实时计算在业界的使用场景

场景1:阿里JStorm

双11实时交易数据

场景2:360 Storm
场景3:腾讯TDProcess
场景4:京东Samza

点评如何构建实时计算平台

点评的实时计算平台是一个端到端的方案,从下面的平台 架构图,可以看出整体架构是一个比较长的过程,包括了数据源、数据的传输通道、计算、存储和对外服务等。

实时计算平台 首先解决的问题是,数据怎么获取,如何拿到那些数据。

我们现在做到了几乎所有点评线上产生的数据都可以毫秒级拿到,封装对应的数据输入源Spout。 通过Blackhole支持日志类实时获取,包括打点日志、业务Log、Nginx日志等。 整合Puma Client第一时间获取数据库数据变更。 整合Swallow获取应用消息。

Blackhole是我们团队开发的类Kafka系统,主要目标是批量从业务方拉取日志时做到数据的完整性和一致性,然后也提供了实时的消费能力。 Puma是以MySQL binlog为基础开发的,这样可以实时拿到数据库的update、delete、insert操作。 Swallow是点评的MQ系统 。

通过整合各种传输通道,并且封装相应的Spout,做业务开发的同学就完全不用关心数据怎样可靠获取,只需要写自己的业务逻辑就可以了。

解决了数据和传输问题后,计算过程则在Storm中完成。 如果在Storm计算过程中或计算出结果后,需要与外部存储系统交互,我们也提供了一个data-service服务 ,通过点评的RPC框架提供接口, 用户不用关心实际Redis/HBase这些系统的细节和部署情况, 以及这个数据到底是在Redis还是HBase中的,我们可以根据SLA来做自动切换; 同时计算的结果也是通过data-service服务,再反馈到线上系统。

就拿刚刚搜索结果的例子,搜索业务在用户再次搜索的时候会根据userId请求一次data-service,然后拿到这个用户的最近浏览记录,并重新排序结果,返回给用户。 这样的好处就是实时计算业务和线上其他业务完全解耦,实时计算这边出现问题,不会导致线上业务出现问题。

Storm基础知识简单介绍

Apache Storm( http://storm.apache.org/ )是由Twitter开源的分布式实时计算系统。Storm可以非常容易、可靠地处理无限的数据流。对比Hadoop的批处理,Storm是个实时的、分布式以及具备高容错的计算系统。Storm可以使用任何编程语言进行开发。

Storm的集群表面上看和Hadoop的集群非常像,但是在Hadoop上面运行的是MapReduce的Job,而在Storm上面运行的是Topology。

Storm和Hadoop一个非常关键的区别是Hadoop的MapReduce Job最终会结束, 而Storm的Topology会一直运行(除非显式地杀掉)。

Storm基本概念:


Nimbus和Supervisor之间的通讯是依靠ZooKeeper来完成,并且Nimbus进程和Supervisor都是快速失败(fail-fast)和无状态的。可以用kill -9来杀死Nimbus和Supervisor进程,然后再重启它们,它们可以继续工作。

在Storm中,Spout 是Topology中产生源数据流的组件。通常Spout获取从Kafka、MQ等的数据,然后调用nextTuple函数,发射数据出去供Bolt消费。
图中的Spout就发射出去了两条数据流。

而Bolt是在Topology中接受Spout的数据,然后执行处理的组件。Bolt在接收到消息后会调用execute函数,用户可以在其中执行自己想要的操作。

为什么用Storm呢,因为Storm有它 的优点:

刚刚介绍了一些Storm的基础概念和特性,再用一张比较完整的图来回顾一下整个Storm的体系架构:

Storm提交一个作业的时候,是通过Thrift的Client执行相应的命令来完成。

Nimbus针对该Topology建立本地的目录,Nimbus中的调度器根据Topology的配置计算Task,并把Task分配到不同的Worker上,调度的结果写入Zookeeper中。

Zookeeper上建立assignments节点,存储Task和Supervisor中Worker的对应关系。

在Zookeeper上创建workerbeats节点来监控Worker的心跳。

Supervisor去Zookeeper上获取分配的Tasks信息,启动一个或者多个Worker来执行。

每个Worker上运行多个Task,Task由Executor来具体执行。Worker根据Topology信息初始化建立Task之间的连接,相同Worker内的Task通过DisrupterQueue来通信,不同Worker间默认采用Netty来通信,然后整个Topology就运行起来了。

如何保证业务运行可靠性

首先Storm自身有很多容错机制,我们也加了很多监控信息,方便业务同学监控自己的业务状态。 在Storm上,遇到的一个很基本的问题就是,各个业务是运行的Worker会跑在同一台物理机上。曾经有位同学就在自己的Worker中起了200多个线程来处理json,结果就是这台机器的CPU都被他的Worker吃光了,其他的业务也跟着倒霉。

因此我们也使用CGroup做了每个Worker的资源隔离, 主要限制了CPU和Memory的使用。相对而言JStorm在很多方面要完善一些,JStorm自己就带资源隔离。 对应监控来说,基本的主机维度的监控在ganglia上可以看见,比如现在集群的运行状况。

下图是现在此时的集群的网络和负载:

这些信息并不能保证业务就OK,因此我们将Storm上的很多监控信息和点评的开源监控系统Cat集成在了一起,从Cat上可以看见更多的业务运行状态信息。

比如在Cat中我可以看见整个集群的TPS,现在已经从30多万降下来了。 然后我可以设置若干的报警规则, 如:连续N分钟降低了50%可以报警。 然后也监控了各个业务Topology的TPS、Spout输入、Storm的可用Slot等的变化。
这个图就是某个业务的TPS信息, 如果TPS同比或者环比出现问题,也可以报警给业务方。

Storm使用经验分享

1.使用组件的并行度代替线程池

Storm自身是一个分布式、多线程的框架,对每个Spout和Bolt,我们都可以设置其并发度;它也支持通过rebalance命令来动态调整并发度,把负载分摊到多个Worker上。如果自己在组件内部采用线程池做一些计算密集型的任务,比如JSON解析,有可能使得某些组件的资源消耗特别高,其他组件又很低,导致Worker之间资源消耗不均衡,这种情况在组件并行度比较低的时候更明显。比如某个Bolt设置了1个并行度,但在Bolt中又启动了线程池,这样导致的一种后果就是,集群中分配了这个Bolt的Worker进程可能会把机器的资源都给消耗光了,影响到其他Topology在这台机器上的任务的运行。如果真有计算密集型的任务,我们可以把组件的并发度设大,Worker的数量也相应提高,让计算分配到多个节点上。

为了避免某个Topology的某些组件把整个机器的资源都消耗光的情况,除了不在组件内部启动线程池来做计算以外,也可以通过CGroup控制每个Worker的资源使用量。

2.不要用DRPC批量处理大数据

RPC提供了应用程序和Storm Topology之间交互的接口,可供其他应用直接调用,使用Storm的并发性来处理数据,然后将结果返回给调用的客户端。这种方式在数据量不大的情况下,通常不会有问题,而当需要处理批量大数据的时候,问题就比较明显了。

3. 不要在Spout中处理耗时的操作

Spout中nextTuple方法会发射数据流,在启用Ack的情况下,fail方法和ack方法会被触发。需要明确一点,在Storm中Spout是单线程(JStorm的Spout分了3个线程,分别执行nextTuple方法、fail方法和ack方法)。如果nextTuple方法非常耗时,某个消息被成功执行完毕后,Acker会给Spout发送消息,Spout若无法及时消费,可能造成ACK消息超时后被丢弃,然后Spout反而认为这个消息执行失败了,造成逻辑错误。反之若fail方法或者ack方法的操作耗时较多,则会影响Spout发射数据的量,造成Topology吞吐量降低。

4.注意fieldsGrouping的数据均衡性

fieldsGrouping是根据一个或者多个Field对数据进行分组,不同的目标Task收到不同的数据,而同一个Task收到的数据会相同。

假设某个Bolt根据用户ID对数据进行fieldsGrouping,如果某一些用户的数据特别多,而另外一些用户的数据又比较少,那么就可能使得下一级处理Bolt收到的数据不均衡,整个处理的性能就会受制于某些数据量大的节点。可以加入更多的分组条件或者更换分组策略,使得数据具有均衡性。

5. 优先使用localOrShuffleGrouping

localOrShuffleGrouping是指如果目标Bolt中的一个或者多个Task和当前产生数据的Task在同一个Worker进程里面,那么就走内部的线程间通信,将Tuple直接发给在当前Worker进程的目的Task。否则,同shuffleGrouping。

localOrShuffleGrouping的数据传输性能优于shuffleGrouping,因为在Worker内部传输,只需要通过Disruptor队列就可以完成,没有网络开销和序列化开销。因此在数据处理的复杂度不高,而网络开销和序列化开销占主要地位的情况下,可以优先使用localOrShuffleGrouping来代替shuffleGrouping。

6. 设置合理的MaxSpoutPending值

在启用Ack的情况下,Spout中有个RotatingMap用来保存Spout已经发送出去,但还没有等到Ack结果的消息。RotatingMap的最大个数是有限制的,为p*num-tasks。其中p是topology.max.spout.pending值,也就是MaxSpoutPending(也可以由TopologyBuilder在setSpout通过setMaxSpoutPending方法来设定),num-tasks是Spout的Task数。如果不设置MaxSpoutPending的大小或者设置得太大,可能消耗掉过多的内存导致内存溢出,设置太小则会影响Spout发射Tuple的速度。

7. 设置合理的Worker数

Worker数越多,性能越好?先看一张Worker数量和吞吐量对比的曲线(来源于JStorm文档: https://github.com/alibaba/jstorm/tree/master/docs/ 0.9.4.1jstorm性能测试.docx)。

从图可以看出,在12个Worker的情况下,吞吐量最大,整体性能最优。这是由于一方面,每新增加一个Worker进程,都会将一些原本线程间的内存通信变为进程间的网络通信,这些进程间的网络通信还需要进行序列化与反序列化操作,这些降低了吞吐率。另一方面,每新增加一个Worker进程,都会额外地增加多个线程(Netty发送和接收线程、心跳线程、SystemBolt线程以及其他系统组件对应的线程等),这些线程切换消耗了不少CPU,sys 系统CPU消耗占比增加,在CPU总使用率受限的情况下,降低了业务线程的使用效率。

8. 平衡吞吐量和时效性

Storm的数据传输默认使用Netty。在数据传输性能方面,有如下的参数可以调整: storm.messaging.netty.server_worker_threads和storm.messaging.netty.client_worker_threads分别为接收消息线程和发送消息线程的数量。netty.transfer.batch.size是指每次 Netty Client向 Netty Server发送的数据的大小,如果需要发送的Tuple消息大于netty.transfer.batch.size,则Tuple消息会按照netty.transfer.batch.size进行切分,然后多次发送。storm.messaging.netty.buffer_size为每次批量发送的Tuple序列化之后的TaskMessage消息的大小。storm.messaging.netty.flush.check.interval.ms表示当有TaskMessage需要发送的时候, Netty Client检查可以发送数据的频率。

降低storm.messaging.netty.flush.check.interval.ms的值,可以提高时效性。增加netty.transfer.batch.size和storm.messaging.netty.buffer_size的值,可以提升网络传输的吐吞量,使得网络的有效载荷提升(减少TCP包的数量,并且TCP包中的有效数据量增加),通常时效性就会降低一些。因此需要根据自身的业务情况,合理在吞吐量和时效性直接的平衡。

除了这些参数,我们怎么找到Storm中性能的瓶颈,可以通过如下的一些途径来进行:
在Storm的UI中,对每个Topology都提供了相应的统计信息,其中有3个参数对性能来说参考意义比较明显,包括Execute latency、Process latency和Capacity。

分别看一下这3个参数的含义和作用。

为了在Storm中达到高性能,我们在设计和开发Topology的时候,需要注意以下原则:

关于计算框架的后续想法

目前Hadoop/Hive专注于离线分析业务,每天点评有1.6万个离线分析任务。Storm专注于实时业务,实时每天会处理100亿+条的数据。

在这两个框架目前有很大的gap,一个是天级别,一个是秒级别,然后有大量的业务是准实时的,比如分钟级别。因此我们会使用Spark来做中间的补充。

Spark Streaming + Spark SQL也能够降低很大的开发难度。相对而言,目前Storm的学习和开发成本还是偏高。要做一个10万+ TPS的业务在Storm上稳定运行,需要对Storm了解比较深入才能做到,不然会发现有这样或者那样的问题。

后面,我们计划在的大数据开发者平台上,统一实时计算/准实时计算和离线计算任务的管理和监控。

我这边需要分享的内容大体结束,非常感谢大家的耐心。

Q&A

【问题1】 Blackhole和Swallow专注点区别是什么?
Blackhole主要专注于日志类型的业务,就像Kafka一样,日志类型的对可靠性和一致性要求不会那么高,但是需要支持非常大的QPS,比如几十万到几百万。

【问题2】日志格式是统一定义的吧?能分享一下日志格式吗?
日志格式是统一的,我们提供了一个基于log4j的日志框架,里面定义好了KV的分隔符。业务把日志输出到文件,然后通过Blackhole把日志文件读取,然后在Spout中完成解析,在Blot中就是具体的日志的KV对了,业务就自己去使用。至于格式,很简单,只要定义好每个KV对的分隔符,然后K和V的分隔符就可以了。

【问题3】Storm专注在业务上?考虑过事务么,会不会有重复处理造成数据异常的问题?
对于这个问题,首先我们在实际业务中还没有使用事务。在没有启用事务的情况下,需要考虑业务的幂等的问题。如果业务可以幂等,那么重复数据不会有任何问题。因为像Kafka等系统,保证的是at least once,数据源就会有重复数据出现。然后启用事务会对性能也有比较大的影响,这个就自己权衡了

【问题4】APP Client端的数据采集,是否有延迟的问题?
如果是打点数据有延迟,如果一直访问,延迟很小,1s以内;如果只浏览几次,那么的确可能延迟比较大。client端是以batch发上来,为了省流量。因此有些数据就通过从数据库那边拖来,比如用户收藏了商户,打点和数据库都可以拿到,那么就从数据库拿

【问题5】 系统中的MQ也是用kafka吗?点评的量级,Kafka的集群数大概是多少?
MQ不是Kafka,是点评基于ActiveMQ修改的,然后消息持久化是在mongodb中。我们用了7台broker支撑了每天2T+的流量

【问题6】根据用户行为排序,这个会不会影响搜索的性能,是如何解决的?
点评推荐系统就是根据用户id去redis获取实时信息,作为score的一个feature。 对搜索影响不大的。作为推荐第一个使用实时数据,效果提升很明显的。

【问题7】实时计算这里是多大级别的服务器集群呢?
目前,只用1台Nimbus + 9台Supervisor支撑了了20多个业务,峰值的时候大概可以跑到40万TPS。

【问题8】日志采用写文件方式,是不是对磁盘io负载高?并发能达到多少?blackhole拉取 这个不能实时吧?
写文件是写Page Cache的,因此不会高,可以参考Kafka的文档。blackhole拉取现在是监听了文件的变更,因此毫秒内可以知道。

【问题9】 请问点评Storm集群中,共享spout的多个业务的topology划分粒度是怎样的?
是这样的,比如流量类型的,后面很多业务会用到流量数据,IP维度的统计,GUID维度的统计,PV统计等,这类会在一个Topology中,因为后续业务只需要使用这个Topology的输出就可以了。而且流量数据很大,每个业务自己处理,那的确浪费很严重。因此这个是共享的,我们也保证他的可用性。其他业务目前我们没有共享的情况

【问题10】你们的数据抽取会对业务系统有性能影响,而且你们可以做到毫秒级,你们如何降低或消除这些性能影响的?
目前所有的抽取都是旁路的,不是业务的主流程上,因此不会有多大影响。比如业务输出日志,发送MQ消息等。

【问题11】在最开始的时候您说点评开发了自己的RPC框架。为什么点评要自己开发而不用现有的开源框架呢?
自己开发时候开源还很少。而且不成熟。

【问题12】对于某些数据的采集,是否有采样策略,如APP Client端的数据采集,还是全量采集?
目前打点数据是全量的,PV MV等都是全量过来的,通过长链,小批量+压缩过来。有一些特殊性的,量又不大的,会走实时发送的通路

【问题13】 除了上面讲到的业务点,点评目前还在哪些业务线用到storm计算实时数据?
安全,反作弊,推荐,广告等都用。

【问题14】各个业务的Spout数据接口是如何定义的。怎么与业务开发人员交互?
比如日志类型的Spout,业务需要知道订阅那个数据源就可以,其他不管。输出就是KV对,然后我们有个地方可以去查,这些日志格式是什么含义。

【问题15】 听过一次腾讯的分享,他们对于storm的使用做了sql接口,点评在做这样的尝试么,有没有可以分享的sql解析工具?
目前没有使用SQL接口,可以参考esper。

【问题16】Storm使用的那个版本,对JVM做了那些优化,有没有遇到当cpu90%以上时,出现worker宕掉,然而发生连锁反应work全挂?
线上版本是0.9.3,0.9.3有几个bug比较讨厌,然后考虑升级0.9.4,同时修改netty server的接收代码逻辑,在上游数据处理快,下游来不及处理的时候,并且不开ACK情况下目前会导致下游OOM。 cpu 90%没有遇到worker down的情况,比如今天某个高峰worker就跑到了500%。

路人补充:这个是0.9.3netty client的一个bug,在0.9.4给修正了,当worker在不同supervisor上迁移时,就可能出现这个问题。

【问题17】在Storm中有没有应用Esper?
目前没有。

【问题18】介绍里说实时计算用Storm,分钟级别计算用Spark;是否一定要严格这么划分,有无其他评判标准?比如数据量等
目前没有严格规定,主要是看你对实时性和可靠性的要求。Spark目前在7*24小时的次序运行我们觉得稳定性还差一点。然后Storm的实时性会高一些,Spark略差一些,但是Spark开发成本低,因此业务自己来选。

【问题19】Storm业务配置变更是怎么实现动态更新的?
这个目前配置项都是放在点评基于zk的lion上来完成的,因此可以反推。

【问题20】storm的计算结果存储都采用的什么介质?
目前我们是用Redis为主,HBase和MySQL为辅,然后部分结果发到MQ。

官方出版

QCon高可用架构群官方博客 +