浙江移动亿万级实时流计算之路与踩过的坑

  • 2016-08-18 10:12:43
  • 康祖令

本文根据康祖令老师在〖2016 DAMS中国数据资产管理峰会〗现场演讲内容整理而成。



(点击这里获取康祖令演讲完整PPT)


讲师介绍

康祖令,浙江移动大数据架构师,数据科学家。


大家好,我是来自浙江移动的康祖令,今天我将跟大家分享我们在建设亿万级实时流计算过程中的架构设计和实践经验,以及踩过的一些坑。


本次演讲主要分为四个部分:

1、概述:平台与应用现状、数据量、运营商为什么需要流计算? 

2、流计算的技术现状、选型与架构 

3、Storm流计算的实践经验分享:我们踩过的坑 

4、流计算平台和应用的运维经验交流


一、流计算在运营商中的用途


  • 流计算在运营商中有什么用途?


简单来说,就是应付一些时延敏感的计算领域。


举两个例子,第一,在网络层面上,运营商有实时监控的需求,他们希望能够捕捉网络运行过程中一些参数、指标的变化,去预测这些设备接下来可能会出现某种故障或者性能的下降。比如,移动网络是通过一系列基站小区去组成的,而每一个基站小区的容量,在特定的配置下都是有限的。如果一个小区下面出现用户聚集,那么它的数据量可能就会超过配置值,这时就需要快速、提前地去汇总这些流量指标。


第二个例子,是关于位置信息服务。在这里,我先介绍一种在运营商中算是比较有特色的关于实时人流的应用。大家可能有这样的经验,外出的时候,为了节省手机流量,通常会把数据连接关掉,一旦你把你的数据接口关掉,互联网公司就收集、采集不到你的位置数据。这意味着什么?意味着互联网公司不管它自身的架构多么先进,这个先天性的问题使得它在总量人口的分析上必然会存在误差。而运营商的先天性问题就是不管你的数据开关是开的还是关的,你的手机必须定时跟基站汇报你所在的位置,这样运营商才能比较精确地知道各个区域人口的聚集情况,而这些聚集的情况,如果想比较低延时地呈现在电子地图中,就需要用到实时流计算技术。


  • 大数据在运营商网络维护和优化中的应用


这里有三个示例,其实这不仅仅是流计算,也是浙江移动这一年多来在大数据和实时计算方面的几个应用实例。



其中包括:1、故障的双向定界;2、高铁模拟路测,所谓的高铁模拟路测,即通过大数据分析的方法,去判断高铁的网络,也就是说大家在高铁上,哪一段会感觉比较差,可以有针对地对它进行优化;3、病毒短信的探测,比如说,有时候大家可能会收到一些短信里会附有一条链接,当你打开这条链接,手机就会中毒,然后你的支付宝密码也不见了。对此,我们通过一些数据分析和学习的办法去判断这个链接是否会隐藏病毒。类似的例子还有很多,包括网络优化上,针对IPhone手机的优化等一些应用,以及LTE分流支撑、反骚扰电话、基站小区的质差原因分析等。


  • 平台架构


这个是项目中整个Storm流计算的架构图。实际上这是一个非常通用的架构,不管你是任职在一个比较大型的企业或者中等规模的公司,当你需要自己构建一个大数据平台、流计算或者搭建大数据架构时,都可以采用这样一个架构。它的核心在于,前端数据采集后,通过Kafka集群进行数据分发,数据分发以后分为两路,一路跟流计算系统对接,一路直接进入传统的HDFS集群。


这样一个架构有什么好处呢?


首先,现在的流计算技术,不管你是开源或者被开源的组件,它其实都支持跟Kafka的数据连接,所以一旦你布署了Kafka,就意味着可以连接所有流计算的框架。从这一角度说,Kafka实际是处于流计算架构中的数据分发中心的位置。


第二,Kafka的性能非常高,只需要少许的服务器就可以实现近乎“0”的时延,从我们的实践经验来看,基本可以认定Kafka能达到整个网卡带宽的极限。这说明一般规模的公司,可能三台Kafka节点就够用了,而不需要购买到十台或者几十台。而对于比较大型的公司,可能五台到六台就够用了。所以,这是一个比较廉价并且高效的架构方式。第三,它对现有Hadoop架构的改动量很小。无论是Flume还是HDFS,都只需要做极少的适配,就能实现与流计算架构的集成。


  • 平台与数据规模


这是目前我们的平台规模情况。我们采用了20个节点,每个节点24个CPU核心的规模来支持。按照记录数的话,数据量是1200亿条,实际上是1000-1500亿条,但根据波动来看,基准值大约是1200亿条。而以基准值来算,数据大概是50TB。50TB是个什么概念呢?说真的,一开始我们对50TB这个量级也不是很有概念,直到今年我参加了北京的一次峰会,在会议上腾讯公布了广点通Spark Streaming平台每天的数据量也是50TB,也就是说,浙江移动一个省公司的数据量基本与它相当。当然,腾讯的应用数量比我们多很多,但从数据量来看,应该是处于同一个量级。也就是说,这整套架构可以支持目前我们所能遇到的最大数据量的应用运行。


二、流计算的技术现状、选型与架构


Storm和Hadoop有点相似,也是一种主从架构,也有一个主控节点,并且名称不一样。Storm的主控节点叫Nimbus,从节点叫Supervisor。在Hadoop当中,大家都知道有NameNode、DataNode,并且DataNode是直接跟NameNode通信的。但Storm有点不一样,它中间并不是直接通信的,而是把一些Nimbus控制状态写到zookeeper中,然后Supervisor通过跟zookeeper之间的通信来实现控制的同步。


图中左侧是Storm平台的物理架构,衔接上面的话,就是你在部署的时候会有一台物理节点叫Nimbus,而其他的物理机器叫做Supervisor。然后,从逻辑架构上来看,具体的应用在Storm平台运行时是右侧这么一个架构。首先,它有一个数据的源头,大家可以理解为这是一个水龙头,叫Spout,数据就是源源不断地从Spout这个节点流出来,或者说实际上它在Storm当中是一个线程,然后由一个叫Bolt的组件来进行数据处理。处理包括对数据进行各种聚合,聚合之后会从一条记录生成一条或多条记录,亦或把多条记录并成一条记录。在Storm中,我们把每一条记录称为一个tuples。以上是逻辑结构的几个基本的名词。


三、Storm流计算的实践经验:我们踩过的坑


接下来,我将从8个问题来谈谈浙江移动在Storm实施过程中踩过的一些坑,以及我们是如何定位并解决这些坑的,希望对大家的流计算工程实践有所帮助。


1
从SDTP到Flume Source:透传 Vs. 落盘

Flume是Kafka的前端,是所有数据进入Kafka的一个源头。我们在实践过程中发现,在Flume当中出现了流量瓶颈,更准确来说,是Flume的前一级服务器出现了数据堆积。要描述清楚这个问题,先要看一下Flume的架构。


图中右下角是Flume内部的几个功能组件,它有一个数据的源头称为Source,还有一个数据所去往的目标叫做Sink,Source连接的是数据来的地方,而Sink是连接数据去的地方,在这张图中去往的地方是HDFS,但实际上在我们的架构中这部分连接的是Kafka。


因此,一旦Flume出现堆积,首先要判断的就是数据堆积到底是出现在Flume的前面还是后面。后来我发现了一个很简单的判断方式,图中的Channel其实是个简单的共享内存。大家可以想象,如果数据的堆积是出现在后面Sink一端的话,那么当数据被源源不断地写到Channel里面,Sink又没法消费掉的时候,是不是这个Channel就会满呢?所以,只要检查一下Channel中有没有满的报错,就能够分析出数据的瓶颈到底是出现在Flume的前面还是后面。事实也如此,经过检查,我们发现Channel没有存在满的报错这个情况,则判断出数据的堆积是在Flume的前面。


接下来,由于数据是通过透传的方式进入Flume,一路数据即连接一个Source,如果数据不落,理论上来说这个性能已经是最高的了,但我们还是出现了瓶颈,此时这个问题貌似已经无解了,你只能要么跟人家协商,让过来的数据一路分成两路,但客观来讲,这个在我们的项目中是不现实的,所以我就得另想办法。你可以理解为这是Flume中它设计得不太好或比较无奈的地方,反正事实就是这样,后来我发现了一个新的问题,如果一路数据过来,在Flume接收的地方是一个线程,那么大胆设想,有没有可能把一个线程分成多个线程?数据本身没法一路分成多路,那我们是不是可以先把它落盘,把它解到硬盘里面,只要IO性能能够勉强扛得住,通过一路分成多路,就可以通过多线程并发的方式来解决这个问题?实际上我们也是大胆尝试了,使得问题被解决的。


所以,我想分享的经验是:在流计算当中,透传的性能不见得比落盘的性能要来得高,在实践过程中,只要IO性能扛得住,有时候我们可以改用落盘的方式,通过多线程来换取性能的提升。


2
Kafka的流量“陷阱”

第二个问题,是关于Kafka的流量问题。一般而言, Kafka是一个性能非常高的组件,一般不会形成性能的瓶颈。但正是因为性能很高,瓶颈就更不容易被发现,平时我们也不太会去关注它。在项目当中,我们通过一个叫Kafka Offset Monitor的开源监控工具,发现Kafka真的形成瓶颈了!



我先大概介绍下图中几条线的意思。蓝色线和白色线表示进入到Kafka当中的一个tuples(大家可以把它理解为一个通道)进入的和被消费掉的数据量,正常情况下,这两条线应该是平稳上升并且间隔非常小的,即进多少消费多少。但是,如果Kafka出现堆积,那么那条表示待消费数据量(当然在这幅图当中单位是不一样的)的红色线就会发生改变。正常来说,红色线会很平稳地保持一个很小的值,所以一旦红色线出现起伏特别大的情况时,就表示Kafka出现了瓶颈问题。


当Kafka出现新瓶颈时,我们是没有任何告警、日志可以看的,这是一件很讨厌的事情。你能看到的是什么呢?只能是最终数据中,数据的准确性出现问题了,数据量少了,或者如果你是在统计流量,就只会看到流量出现大幅度减少这个现象。后面我要跟大家分享的一系列问题,也是如此,很多截然不同的原因导致的问题在流计算当中出现的结果都一样,都是数据量少了,结果不准确了。


好了,当发现了这个问题,就要开始着手考虑怎么去解决了。正如前面所说,Kafka性能很高,CPU一般也不会很高,事实上我们看到的也是这样。在我们的项目中,最后发现问题出在网卡上。因为当时我们配置的是四块网卡,两主两备的方式,也就是说4块网卡中只有2块是有流量的,整个带宽是200MB左右。当出现数据堆积的时候,发现网络端口的send流量大约为190MB/s左右,几乎是2块千兆网卡的吞吐量,即网卡已经跑满了。因此,既然四块网卡是两主两备,那我就改成“四网卡”模式。最终,当开启“四活”模式后,节点的总send流量约为250 ~ 300MB/s,Kafka的流量“陷阱”至此解决。


事后我们进一步观察发现,当流量超过300MB/s,Kafka节点的CPU和IO的使用率仍然不算高。因此,可以大胆假设,如果用1块万兆网卡来替代现在的4块千兆网卡,每个Kafka节点的性能有望进一步提升,这样实际上还有望减少Kafka节点的数量。


3
运算结果的更新方式:增量式 Vs. 覆盖式

先介绍一下Storm对数据一致性的保证。在Storm里,有一个叫做at-least-once的数据一致性保证,它可以保证一条数据从进入Storm开始,至少被完整处理过一次。如果它没法被完整处理一次,这个数据就会重传。但是,请大家注意,at-least-once是有代价的,代价就是大约30%的额外性能开销,如果硬件允许的话,我在项目里可能会这么做,但事实上,当时的方案测过是可行的,可机器不允许我这么做。因此我就必须在业务能够接受一定的数据误差的情况下,选择另外一种方式。


那么,我就考虑,最终结果是写往Redis,一种方式是整个写,比如说我是五分钟一个汇总,那我就把五分钟的结果写到Redis里,但是这样又会引发什么问题呢?一旦写Redis的这个Bolt挂掉,那它在重新拉起以后所缓存的数据就会清理掉。假定它是在两分半钟的时候出现问题,那么它就会出现两分半钟的数据丢失。所以,我要尽可能减少数据的丢失,就不能用at-least-once这个体制。


这里就用到增量式更新的方法。比如说,如果我们将五分钟分成10个时间片,并采用增量式更新的方式,当Bolt出现异常时,平均丢失的数据量是5%——在我们的项目中,10%以内的误差都是可以接受的;而如果采用覆盖式更新方式的话,则当Bolt出现异常时,平均会产生50%的数据误差,这显然是无法接受的。同时增量式的更新不会加大对硬件的压力,也不需要为了保障at-least-once而付出30%的额外硬件负荷。


4
数据倾斜与分片

数据倾斜是一个老问题,做过DBA或者SQL优化的人应该都知道,这在Oracle中也是一件比较讨厌的事情。基本上没有一个统一的解决方案,一般来讲采用的方式都是进一步分片。这里我想说的是,在Oracle中出现这个问题,大家能看到的现象是什么?往往是一条SQL跑得很慢,出结果的时间很长,但在Storm里面不一样,你绝不会看到一个Toplogy或者一个Bolt跑得很慢,在Storm里你看到的就是数据丢失。因为在规定的时间内数据处理不完,但出结果时它又把写了不到一半的数据给写出去了。


解决的思路是什么?关键在于要把汇总的级别分得细一点,比如说原来是按照一个城市来进行汇总的,现在就把它改成一个基站小区的级别来进行,然后再把基站小区的数据导入到城市来做二次汇总。


5
Redis的吞吐能力


按照我们项目通信的一个架构,一般来讲,流处理的结果都会被写往Redis。Redis是一个高性能的内存数据库,它之所以性能这么高,有赖于以下几个原因:


  1. 绝大部分请求是纯粹的内存操作(非常快速),不需要IO; 

  2. 它是单线程的,避免了不必要的上下文切换和竞争条件。单线程最大的好处就是它不需要加锁。做过Oracle的这方面可能也有印象,频繁的加锁其实是一个非常耗性能的操作。

  3. 它在网络IO上采用的是非阻塞IO,即epoll的方式。


但是,Redis也会出现性能瓶颈。通常来讲,在updata的模式下,一分钟能承受的更新次数是50万次左右。所以如果一分钟之内超过50万次更新,性能瓶颈就会出现。关键是怎么判断。如果Redis出现了瓶颈,你会在Storm自带的图形化管理界面中看到有一个满的Exception被抛出来,同时因为Redis是单线程模型,所以当出现瓶颈时,会有一个CORE飙到100%,而其他CORE都很闲。当看到这两种现象,基本上就能认定Redis出现性能不足。


Redis吞吐能力的问题解决起来是非常简单的。方法就是考虑把数据分片,然后在机器上再起一个Redis Server,把一部分数据写到另外一个Redis Server当中即可。


6
数据延时的计算方法力


数据延时是一个需要密切关注的问题。在我们的架构上,有两类数据延迟是需要特别当心的。


一个是数据进入到Flume,到从Kafka出的那段时间的延时,因为过了Kafka之后就是Storm了,所以这个延时是指它在进入Storm以前的延时。如果这个延时过大的话,证明你的程序是没问题的,问题在于Storm或者Kafka。另一个就是从Flume进去,到Storm最终输出结果的延时。这个延时我们称之为端到端的数据延时。在这里大家需要注意的一个问题,实际上我们没有办法知道这一类流计算中数据延时确切是多少,因为它是一个概率分布。左边的那幅图上,就是一个概率分布,它的数据延时基准时是15s,有些数据延时会更高,像30s、35s,有些也会低于基准时,比如说11s、12s,它是一个分布值。


当知道数据延时时,我们就能大概知道能给用户提供的结果大概是延迟多少,重点是这个延时怎么算。现在没有任何一个自动化工具可以让你去算,能够直接告诉你这个延时大概是多少,需要你自己去算。我们采用的是一个非常简单却有效的方法——在Flume那头加入了一些专门用于测试的特殊数据记录,通过Storm把这些数据读出来,再把读出来的延时进行统计,进而得知整个数据延时大致的分布。


7
原始数据的清洗与过滤


做过数据仓库的人,往往会有一个想法,就是数据在入库之前,先要清洗,把一些有问题的数据通通扔掉。但实际上在大数据当中,我们很难要求进入Storm的这些数据都是非常完美的,这也是不现实的。曾经我们也试过,把所有的数据全部进行检查,如果检查到有问题,就把它扔掉,可后来发现,大概会有二分之一的数据在格式上或多或少的存在一些瑕疵,但是这二分之一有瑕疵的数据中,只有很小一部分会影响最终结果,所以,我们就采用了另外一种办法,允许那些有瑕疵的数据先进入平台,只要它能给出结果,那就OK。


8
基站小区活跃用户数的统计方法

最后一个问题,是关于基站小区活跃用户数的统计。这对互联网公司流计算来说,是一个专门的课题。这里我们就不说计算一个小区的活跃度数了,拿一个URL地址访问人数的实时统计就行了。一般最傻的办法就是把访问过这个网页的用户的ID一个个记录下来,有多少ID它的活跃度数就有多少,可如果用户数非常大,你要记录的ID就会非常多,会把Storm的内存给吃光,这是一个非常影响性能的操作。再从另一个角度来说,我们需不需要这么精确呢?如果不需要这么精确,就可以通过概率化的方法来计算。


这里常用的方法有两个,一个是布隆过滤器,一个是HyperLogLog。它们在资源消耗、误差性、统计结果的可聚合性这些性能属性是有差异的。最终我们在项目当中选取的是Bloom Filter的方法。虽然它的消耗比较多,但误差比较小。


在这我想说的是,在流计算当中,很多我们习以为常的做法、算法是没法适用的,尤其在流计算中有一些是针对特殊问题的特殊算法,像采样这些都是很考验流计算开发人员的地方。


四、流计算系统的监控


最后,对流计算系统进行监控,与常规的监控有相当大的差异,因为我们不仅要关注各个进程、CPU是否正常等常规的机器监控, 还要特别关注数据是否积压,这里面有一系列的工具和方法,这些在我刚才的演讲中都有讲到: