项目背景
这个项目是深圳一家证卷公司一个TB级日志离线导入项目,当数据达到10T左右的时候,他们的存贮架构以及检索系统直接奔溃,在朋友的推荐下,我负责对这个项目进行整体的重构以及优化,对于我一个大数据新手来说这是一个挑战也是一个学习的机会,最终幸不辱命最终完成了系统的优化,由原来的48小时导入优化至7个小时,并且提供亚秒级的查询检索,下面就是我对这个项目的总结
历史方案介绍 #
集群架构
集群由8台高性能服务器组成,其中两台负责接收历史日志包,两台提供ES和Spark服务,其他四台提供HDFS存贮服务,总存贮量在200TB,其中我们导入主要在两台高性能服务器上面,每台内存256GB,总计512G内存
历史方案
原来的解决方案是日志被解析完成之后导入两份数据分别存贮到ElasticSearch(下面简称ES)和Kafka之中,其中存贮到Kafka之中的数据会被Spark读取,然后写入到HBase
架构优化 #
读取和存贮优化 #
历史解决方案对日志数据的存贮是一个并行化的,但是由于项目对数据准确性要求严格,假如其中ES导入或者HBase导入失败之后,所以的数据都得重新生成,但是原来架构不提供删除HBase数据的功能(能实现但是不能检测是否删除),所以在读取方面我丢弃使用Kafka并行导入的方案,待ES导入完成并且校对完成后再从ES中获取数据导入到HBase中
原来的导入是直接使用HBase提供的Put方案,单个插入到HBase当中,由于我们一次导入的量多的话大概在10个亿左右,虽然HBase能够承受住压力但是由于行键设计导致会造成region的"脑裂",原来解决方案为了避免这个现象进行了region的预分配,由于这个预分配导致了另外的一个Bug,后面会提到。所以为了提高导入效率和避免脑裂,我采用了HBase的Bulk Load将数据直接"存"至HBase
回头看看,其实大数据解决方案非常简单,当我刚接触项目的时候,在看原来的解决方案,惊讶的发现他们的代码其实非常简单,就像教程里面的demo一样,但是随着在实际接触这个"大"数据的过程中,大数据考验的是你代码的"结实",能不能扛的住,什么花里胡哨的骚操作都没有,所以大数据的代码非常简洁质朴,如果说那些优雅的代码是面对对象的话,那么大数据的代码就是面对数据的。所以我接下来也不谈大数据框架,谈一谈我在这个项目中学到的大数据处理经验
提高处理速度 #
读取速度
大数据处理最核心的点就是分而治之,也就是我们常说的MapReduce
中Map
,无论数据有多大,只要我们分的够小,每一份我们计算机都很很快处理,那么我同时运行这些,意味着我就能在一个很短的时间内跑一个巨大的数据。
由于我们是从ES中读取数据,ES的Map的个数是由分片(shards)来控制的ES的分片默认是十片(5个原始5个备份),假如我们采用默认设置,我们读取数据Map的大小最大为10(具体跟你给的Container大小和每个核心数的大小有关,给足的话就是10),这里要谈一下这个项目中的分片设计,分片这个东西对于每一个Index来说,不是越多越好,越多读取速度快,但是写入慢,反之。所以这个项目使用别名的方法来保证分片不会膨胀,当一个Index数据大于20GB的时候,会自动新建一个别的Index,比如原来是es _index_0
,写满20G后,再新建一个 es_index_1
,然后把新数据导入到里面去,然后每个Index固定分片为4片,这样所有的数据会均匀的分布到ES中去
这种解决方法带来一个问题就是,我们获取数据的时候只能通过es_index_*
,而且我们的ES中每次导入的Map.size
只能是4,所以无论给Spark
任务多少个Container
多少个核心,他的Task任务最大也就是4个,一开始我一直在琢磨是否能够提高这个Task来加速数据读取,我尝试使用ES时间索引把数据切分成多份,并行跑多个Spark任务,但是无论我并行多少个,从ES一个月导出来时间总和单个Spark任务读取速度一致
出现这个问题的原因主要是磁盘的性能关系,由于Bulk Load有一个很重要的部分就是Shuffle排序,所以数据在导入HBase之前一定会落盘,所以读取由两个条件限制,ES的获取速度和磁盘写入速度,由于ES读取速度也跟磁盘读取速度有关,所以读取基本上由磁盘的性能有关,由于集群Spark跑在机械硬盘上,除非换SSD否则在软件上优化无法取得较大的收益
所以我放弃在读取上面优化,接下来介绍我在处理上面的优化
处理速度
其实导入到HBase的原理非常简单,只要使用Spark生成一个HFile然后调用HRegion将数据导入到HBase中去,HFile你可以看着一个排序好的文件,但是如何将1TB数据排序并在内存中生成HFile文件(可能比1TB小如果采用压缩的话,不压缩比原始文件要大得多)这个步骤就很复杂了
Spark作为大数据界的神兵利器,处理PB级的数据也不在话下,但是怎么处理呢,简单来说就是三个和尚挑水,现在缸很大,但是和尚很多,如果要像迅速把缸里的水挑完,那就让很多和尚通力合作,同时去挑,但是如果里面10个和尚,9个是不干活,只有一个挑水,那么这个任务就花很长时间来完成,这个就是我们说的数据倾斜
所以其实提高处理速度的方法只有一个把任务均分给CPU每个核心,接下来我就这个项目来谈一谈我遇到的数据倾斜问题
首先我们要知道一个前面我们讲了Spark作为大数据上的神兵利器,他不会假设用户内存能装下所有的数据,但是用户的磁盘一定能装下所有的数据,我们其实要Spark做的最主要的一件事就是对数据的RowKey(HBase的行键)进行排序,然后把排好序的文件导入到HBase中,排序这个东西最主要的是要把所有的数据都放到一起排一遍,也就是说1个T的数据至少都得放到内存或者磁盘里面来进行排序
我们接下来缕缕Spark处理数据的流程,其实你可以在UI界面上看到所以的stage,前面读取数据算一个Map stage,接下来我们要完成,排序的stage,还有保存数据的stage,按照正常流程,每个stage都会做一个Shuffle,也就是理论上我们总共要做两次Shuffle,然而TB级数据的Shuffle在机械硬盘上的速度非常慢(相比SSD),所以我们就的优化Shuffle过程
我们先来谈一下一个正常大数据的排序怎么样,我们用打擂台来打比方,首先我们将所有选手分成10个小组,等小组赛打完,所以等小组都分好名次,然后我们搞一个10个人的擂台,小组第一名全部上擂台打比赛,打完决定出第一名,第一名成为擂主出擂台,然后这个时候再从其他其他小组中找一个积分最高的上来,继续打擂台,打完再找出第一名,继续上面的循环,最后每个人都出了擂台,名次也排练出来了
这个排序方法是Spark早期使用的方法,这个方法可以面对超大数据,假设数据有10个T,我只有1M内存,我也能完成数据的排序,只是每次排1M,然后写文件,这种方式可以面对海量数据,但是有一个弊端,在排完序后每次必须要把所有的文件全部都打开读取"第一名",也就是说我们做完一次Map之后,还的进行一次Reduce
那怎么来去掉这个Reduce呢,考虑这种情况,我们在分组的时候,由一个"裁判"把每个人公正的计算他原来的历史积分在什么档次,然后把他分在那个档次的组,等所有组打完,排名就出来了,你属于哪个档第几名,也就知道你的排名了
这个方法就是预先评估你的组数,但是这个也有一个问题,假如选手水平层次不齐,强者太少,弱者太多,那么有些组非常多的人打比赛,有的组没有人打比赛,这样,最后总耗时是那个人最多的小组的时间
接下来我们回到上面的Shuffle过程,如果我们把那个排序过程使用上面的预估方法,那么我们就能在减少一次Shuffle过程,但是这个也带来了一个风险,就是预估如果失败(HBase会加载HFile失败)或者预估不均会导致运行异常
所以我采用Spark提供的sortWithinPartitions
方法来ES数据进行一个分区并排序,分区的方法是通过账号ID来进行,很不幸线上数据部分账号异常,一个月某些账号占了总数的40%左右,这个就导致我们预估不均,那如何来解决这个问题呢,解决方案由很多,由于我们数据只有少量资金账户异常,所以我们采用最简单的方案,分多批次跑,每个任务分剔除大账户的部分,其他的任务就是单个大账户单独跑,得益ES强大的统计功能,只需要执行几次HTTP请求就能完成设计
至此整个数据优化导入部分就说完了,当然还有其他优化的地方没有讲,比如使用堆外内存加速排序,使用更大的Shuffle cache优化写入,切分更小的块避免磁盘split …..
PS:亚秒级的查询主要通过Spark一个聚合生成HBase一张小表来实现,当然也可以使用MySql这些关系型数据库
总结 #
记得我以前去面试大数据岗位的时候,我问面试官怎么才能提高自己大数据能力,他说必须要通过实践才能体会,经过这次项目我也感受到了,只有真正的面对大数据,你才能提高自己的大数据能力,小数据一下子就跑完了,完全体会不到内存OOM的"乐趣",大数据的难点在于了解数据如何被处理,以及对大数据各个组件“协作”对经验。