从批处理到流式计算的罪与罚 ——storm实践总结

近期是对storm做了不少的研究与分享,包括我之前的一篇文章《数据处理神器storm的理解与思考 ——让你的数据化作行云流水》,无论是看官方的文档,还是看其他第三方文献介绍推荐,总会让你觉得各种高端先进,毕竟它代表了一种比较新潮的设计思想,刚开始接触了解的人更会跃跃欲试。然而storm是否真如看上去那么美?还是说,storm只是另一个喜好新鲜事物的开发者把玩的玩物?这些都需要亲自尝试过才会得知。归根到底,我们应该问的问题是:我们的任务是否适合利用storm来实现?

前段时间,为做日志分析系统的改造,尝试实践了一套flume->kafka->strom->database的实现。本文将着重以此作为实例,对storm作一次实践分析,并介绍在storm系统引入过程中所遇到的与填过的一些坑。

不用storm的日志分析

不涉及业务内容,首先看下一下,我们原先的日志分析的基本流程。
假设,一条日志记志里面有两个属性字段,一个是adId,代表广告的id,另一个是clicks,代表点击次数。现要求统计按小时进行统计:

logAnalyzer.png
将日志产生处会将小时存储日志文件(上图logHourFile1,logHourFile2),由logCollector将同一个小时的日志汇集成一个大日志文logHourFileAllInOne。日志系统LogAnalyzer逐行读取logHourFileAllIOne文件,每条纪录包含了adId与clicks信息,例如“adId=123,clicks=2”,表示广告123,被点击了两次。 logAnalyzer会初始化一个map,key为adId,value为点击次数。如果一条log纪录过来,发现map中将adId为kye的纪录不存在,将这条纪录存入map;如果已经存在,取map中的该纪录的value,累加当前纪录的clicks值到其上。当读到文件尾的时候,map中包含了每个广告在该小时内的总点击数。最终将map中的信息逐条存入数据库中,结束该小时统计。
过程应该算是相当简单清晰。但是这样的分析过程有几个明显的缺点:

  • 当前小时的统计结果,必须下个小时才能看结果。因为必须等当前小时结束,才能得对该小时的日志进行汇总。
  • 为了进行汇总工作,文件要统一传输到一个节点,会瞬时占用较大的网络带宽。对其他的服务可能会带来冲击。

引入storm的日志分析

这次的改造,不仅引入了storm。我们还使用了flume,采用tail的方式代替了原有的分析系统对日志的采集功能,实现汇总,再利用kafka作为消息队列,使数据可以被storm逐条获取。关于kafka的介绍,可参考《闲扯kafka mq》
剩下的主要工作便是设计实现一个storm的topology,将原系统的分析逻辑移到其中。刚开始的时候,我的topology大抵是这样的:
topology1.png
LogSpout基于kafka-storm(https://github.com/joshdevins/storm-kafka)实现,主要负责将kafka中的log消息读出,shuffle到下一个LogBuildBolt。LogBuildBolt负责构建Log纪录相关的实体对象,实际的处理肯定会比上文中只有adId与clicks的情况复杂,所以有这个Bolt存在的必要。StatisticBolt为统计结点,也就是为每小时数据准备一个map ,根据adId,累加点击数。DataStoreBolt,负责与数据库交互,接收上一小时来自StatisticBolt的map中数据,存入数据库。
这时别扭的情况出现了。显然这里的设计DataStoreBolt的基本是一小时集中做一次计算,而不是一条条平滑的过来进入数据库。这里你就很可能需要做定时器,也就意味的我们要自行起一个线程。在我看来这就与storm正常使用思想是违背的,因为storm所做的工作的一部份就是让我们不用自行建立线程,不用操心线程安全的问题。这个问题也不大,可能是我的精神洁癖作祟。但还有个问题我不能忍:storm的ack机制默认设置是超时时间30秒,而StatisticBolt是会定时一小时去发送一批数据到DataStoreBolt,如此的话,该tuple老早就被认为超时失败了,若将超时时间设置延长至一小时,ack Bolt也会因为pending的tuple过多而出现内存泄露,虽说很多文章建议出于性能上的考虑可以直接去掉ack功能,可我还是不想因为设计上的原因而过早的放弃storm的可靠性机器。
然而,此时的我,依旧保持乐观,既然如此,便将DataStoreBolt去掉,将数据库定时存储的功能移动StatisticBolt中去。StatisticBolt在收到数据并完成内存中的累加时,便直接ack,相当于Storm Ack机制的管辖范围到此提前结束。现在Topology变成了这样。
topology2.png
接下来更加别扭的地方出现了,因为storm采用的是流式计算,每条数据之间最好是没有联系的,也就是所谓的无状态的。而现在,需要按小时去划数据再做统计就是一种状态信息,对于流式计算是一种很恼人的状态。
首先,你要很仔细地去区分,每条数据(即log信息)是属于哪个小时段的,每条log信息就需好带上日志产生时间信息。
其次,你需要在storm中判断某个小时的数据是否已完全处理完毕,这时你就要信任log信息上所带的时间是准确的。
再者,由于StatisticBolt在做数据库交互之前,需要在内存中对每小时的统计信息的进信进行存储,storm原来数据不落地的设计,就被我们的应用给打破了,StatisticBolt就也再显得那么轻松了。
后来让我完全放弃的一个理由来了,在完成上述编码工作之后,我开始对完成topology做测试,跑了几天的数据。刚开始还好,一切正常,数据准确。可长期运行之后,总会发现个别小时的数据出现了部份丢失。查log我才发现,storm的worker会因为各种原因自行重启(我当时的造成的主要原因是网络超时),给我的印象就是storm似乎将这种重启现象当做正常现象,因为设计者认为正常情况下worker重启,其ack机制也能保证数据不丢失。但我的场景却不可以,因为由于上述已经提到过的原因,我将ack确认结束的时机被提前到了统计数据在内存上累加完成之时。这就意味着,woker一旦重启,我们这些还未来得及存入数据库的累加信息都要丢失。
最后我不得不承认一个现实,这次的storm尝试是一次失败,因为我分明就是用流式计算的方式去干批处理的活。

两种解决方式

1、你也可以做成让数据一条条或小批量在数据库里面累加的方式,但这就会涉及的数据库表结构、索引的重新设计。你更要考虑数据库的压力问题。
2、明显,这样的计算方式批处理的特性还是太多了。还是乖乖选择hadoop的MapReduce定时跑job吧。而且数据是按小时存在hdfs中的,你想随时想重跑之前的数据,也很容易。

其他你可能会踩到的一些坑

  1. Topology的提交流程,Bolt与Spout是何时初始化?
    回想一下,我们启动一个topo的时候,是用storm jar命令的。参数必须指名一个包含main函数的class,编写代码时,会直接在实中实例化Bolt与Spout,最后通过StormSubmitter完成提交。可见我们在提交storm的时候,就已完成了Bolt与Spout的实例化工作。

随后将序列化的bolt、spout以及jar包会被上传至nimbus节点。Supervisor从nimbus下载相应的bolt、spout序列化后的实例,以及相应的jar包。注意,Spout与Bolt的prepare方式还是会在worker内执行。

  1. 如何自定义当前toplogy相关的config文件?
    了解了Topology的基本提交流程,可知自定的config文件可以通做两种方式获取。一种就是将自定义的config文件放至jar包中,由在Spout或者Bolt的prepare方法中获取具体信息。这种方法我不喜欢,如果配置文件基本不需要改动的话,放在jar里面也好,但正常情况下,做配置文件是为了灵活改动用的,如果每次都要得新打jar,让人也会蛮不爽的。另一种,是在submit之前完成,完成config的读取。这需要在执行storm jar命令的时候,指定具体的config路径。注意,storm与hadoop类似,出于安全的考虑,不支持你自行指定classpath,所以我只能通过将config文件的路径当作main方法的一个参数传递。
  2. 关于调试与log打印。
    Strom与hadoop类似,是一个分布式系统,所以出错时查看log就出较麻烦。你很有可能需要到各个机器上去查看各种角色,包括多个worker在内的log信息。

Storm的WebUI与LogViewer很大程度地帮你简化了这个过程。你可以在webUI上看到最新由stormr捕捉到error信息。也可以在网页上对具体worker查看其最近一段时间的log信息,前提是已在worker所在的机器上开启LogViewer服务进程。
另外,你也可以自行捕捉exception,然后调用collector.reportError()方法。这样这些错误信息就能在storm UI上显示。但是注意不要将一些数据处理过程中可能大批量出现的错误通过这种方式显示,毕竟需要传送到storm UI上显示也是需要不少代价的。

  1. Netty 连接超时问题。
    我在实践0.9.3版本的时候,总会遇到storm worker不断重启的问题。查日记发现,是由于netty连接超时的造成的。而这个问题最恼人的部份是,从日志可以看出,一个worker连接操后,然后重启,导致下一个woker也超时,恶性循环,导致supervisor也重启。后来的解决方法是,加大超时时间设置。

storm.messaging.netty.max_retries: 300
storm.messaging.netty.max_wait_ms: 4000

  1. Supervisor自动重启时,会出现无法再启动的现象。报错:Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.
    在0.9.3版本上这个问题困扰我很久,后来是官网出了新版本出了之后,我直接升到0.9.4,就再也没有出现过了。

20150427首发于3dobe.com
本站链接:http://3dobe.com/archives/153/

标签: storm

添加新评论