什么是数据倾斜

数据倾斜在MapReduce编程模型中十分常见。用最通俗易懂的话来说,数据倾斜无非就是大量的相同key被partition分配到一个分区里,造成了“一个人累死,其他人闲死”的情况。这种情况违背了并行计算的初衷。一个节点要承受着巨大的压力,而其他节点计算完毕后要一直等待这个忙碌的节点,拖累了整体的计算时间。效率是十分低下的。

MapReduce程序执行时,reduce节点大部分执行完毕,但是有一个或者几个reduce节点运行很慢,导致整个程序的处理时间很长,这是因为某一个key的条数比其他key多很多(有时是百倍或者千倍之多),这条key所在的reduce节点所处理的数据量比其他节点就大很多,从而导致某几个节点迟迟运行不完。Hive的执行是分阶段的,map处理数据量的差异取决于上一个stage的reduce输出,所以如何将数据均匀地分配到各个reduce中,就是解决数据倾斜的根本所在。

举例:在执行shuffle操作的时候,按照shuffle的原理是按照key来进行values的数据的输出、拉取和聚合的。同一个key的values,一定是分配到一个reduce task进行处理的。假设多个key对应的values,总共是90万。问题在于,可能某个key对应了88万数据,key-88万values,分配到一个task上去面去执行。另外两个task,可能各分配到了1万数据,可能是数百个key,对应的1万条数据。

第一个和第二个task,各分配到了1万数据;假设1万条数据需要10分钟计算完毕,则第一、二个task可能同时在10分钟内都运行完了;但是第三个task有88万条,则需要88 * 10 = 880分钟 = 14.5个小时;

数据倾斜的表现

任务进度长时间维持在99%或者100%的附近,查看任务监控页面,发现只有少量 reduce 子任务未完成,因为其处理的数据量和其他的 reduce 差异过大。 单一 reduce 处理的记录数和平均记录数相差太大,通常达到好几倍之多,最长时间远大于平均时长。

数据倾斜的情形&解决

关键词 情形 后果
join 大表与小表join,小表对应于大表join的字段较为集中 分发到某一个或几个Reduce上的数据远高于平均值
大表与大表join,但是partition的判断字段0值或空值过多 这些空值都由一个reduce处理,非常慢
group by 按照某个字段进行group by,但是该字段维度过小,而某值的数量过多 处理某值的reduce非常耗时
count distinct 某特殊值过多 处理此特殊值的reduce耗时

原因:

  1. key 分布不均匀
  2. 业务数据本身的特性
  3. 建表考虑不周全
  4. 某些 HQL 语句本身就存在数据倾斜

group by倾斜

一般采用以下两种参数调节方式:

1
hive.map.aggr = true

hive.map.aggr默认值已经为true,意思是做map aggregation,也就是在mapper里面做聚合。这个方法不同于直接写MapReduce的时候可以实现的combiner,但是却实现了类似combiner的效果。事实上各种基于mr的框架如pig、cascading等等用的都是map aggregation(或者叫partial aggregation)而非combiner的策略,也就是在mapper里面直接做聚合操作而不是输出到buffer给combiner做聚合。对于map aggregation,hive还会做检查,如果aggregation的效果不好,那么hive会自动放弃map aggregation。判断效果的依据就是经过一小批数据的处理之后,检查聚合后的数据量是否减小到一定的比例,默认是0.5,由hive.map.aggr.hash.min.reduction这个参数控制。所以如果确认数据里面确实有个别取值倾斜,但是大部分值是比较稀疏的,这个时候可以把比例强制设为1,避免极端情况下map aggr失效。

1
hive.groupby.skewindata = true

本质是将一个mapreduce拆分为两个MR。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。所以这个参数其实跟hive.map.aggr做的是类似的事情,只是拿到reduce端来做,而且要额外启动一轮job,所以其实不怎么推荐用,效果不明显。

count distinct倾斜

count distinct碰到大量相同特殊值时,若目标字段存在大量值为NULL或空的记录,将值为空的情况单独处理。如果只是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。

case:

1
2
3
4
select cast(count(distinct (user_id)) + 1 as bigint) as user_cnt
from tab_a
where user_id is not null
and user_id <> '';

如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。比如多个distinct集中在一个reduce上进行:

bad case:

1
2
3
select day, count(distinct session_id), count(distinct user_id)
from log a
group by day;

optimize case(空间换时间):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
select day
, count(case when type = 'session' then 1 else null end) as session_cnt
, count(case when type = 'user' then 1 else null end) as user_cnt
from (select day
, session_id
, type
from (select day
, session_id
, "session" as type
from log
union all
select day
, user_id
, "user" as type
from log) t1
group by day, session_id, type) t2
group by day;

思路:将同一个表互相union,通过设置type字段用于代替distinct xxx_id,虽然消耗了更大的空间,但是提高了执行时间的效率。

join倾斜

skew join

join造成的倾斜,常见情况是不能做map join的两个表(能做map join的话基本上可以避免倾斜),其中一个是行为表,另一个应该是属性表。比如我们有三个表,一个用户属性表users,一个商品属性表items,还有一个用户对商品的操作行为表日志表logs。假设现在需要将行为表关联用户表:

1
select * from logs a join users b on a.user_id = b.user_id;

其中logs表里面会有一个特殊用户user_id = 0,代表未登录用户,假如这种用户占了相当的比例,那么个别reduce会收到比其他reduce多得多的数据,因为它要接收所有user_id = 0的记录进行处理,使得其处理效果会非常差,其他reduce都跑完很久了它还在运行。

hive给出的解决方案叫skew join,其原理把这种user_id = 0的特殊值先不在reduce端计算掉,而是先写入hdfs,然后启动一轮map join专门做这个特殊值的计算,期望能提高计算这部分值的处理速度。当然你要告诉hive这个join是个skew join,即设置:

1
hive.optimize.skewjoin = true;

还有要告诉hive如何判断特殊值,根据hive.skewjoin.key设置的数量hive可以知道,比如默认值是100000,那么超过100000条记录的值就是特殊值。

特殊值分开处理法

不过上述方法还要去考虑阈值之类的情况,不够通用。所以针对join倾斜的问题,一般都是通过改写sql解决。对于上面这个问题,我们已经知道user_id = 0是一个特殊key,那么可以把特殊值隔离开来单独做join,这样特殊值肯定会转化成map join,非特殊值就是没有倾斜的普通join了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
select *
from (
select *
from logs
where user_id = 0
) a
join
(
select *
from users
where user_id = 0
) b
on
a.user_id = b.user_id
union all
select *
from logs a
join users b on a.user_id <> 0 and a.user_id = b.user_id;

随机数分配法

上面这种个别key倾斜的情况只是一种倾斜情况。最常见的倾斜是因为数据分布本身就具有长尾性质,比如我们将日志表和商品表关联:

1
select * from logs a join items b on a.item_id = b.item_id;

这个时候,分配到热门商品的reducer就会很慢,因为热门商品的行为日志肯定是最多的,而且我们也很难像上面处理特殊user那样去处理item。这个时候就会用到加随机数的方法,也就是在join的时候增加一个随机数,随机数的取值范围n相当于将item给分散到n个reducer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
select a.*,
b.*
from (
select *, cast(rand() * 10 as int) as r_id
from logs
) a
join
(
select *, r_id
from items lateral view explode(range_list(1, 10)) rl as r_id
) b
on
a.item_id = b.item_id
and a.r_id = b.r_id

上面的写法里,对行为表的每条记录生成一个1-10的随机整数,对于item属性表,每个item生成10条记录,随机key分别也是1-10,这样就能保证行为表关联上属性表。其中range_list(1,10)代表用udf实现的一个返回1-10整数序列的方法。这个做法是一个解决join倾斜比较根本性的通用思路,就是如何用随机数将key进行分散。当然,可以根据具体的业务场景做实现上的简化或变化。

业务场景

字段较为集中的场景

方法1:将较为集中的字段值进行打散,并且数据量小的表放在join的左边,这样可以有效减少内存溢出错误发生的几率

1
2
3
4
5
6
7
8
9
10
11
12
13
create table small_table as
select a.key
, sum(a.Cnt) as Cnt
from (
select key
, count (1) as Cnt
from table_name
group by key,
case when key = "较为集中的字段" then Hash (rand()) % 50
else key
end
) a
group by a.key;

方法2:使用map join让小的维度表先进内存

1
2
3
select *
from small_table s
join big_table b on s.key = b.key;

空值产生的数据倾斜

在日志中,常会有信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和用户表中的 user_id 相关联,就会碰到数据倾斜的问题。

方法1:筛选出不为空值的参与关联

1
2
3
4
5
6
7
select *
from log a
join user b on a.user_id is not null and a.user_id = b.user_id
union all
select *
from log c
where c.user_id is null;

方法2:赋予空值新的key值

1
2
3
4
5
select *
from log a
left outer join user b
on
case when a.user_id is null then concat('hive', rand()) else a.user_id end = b.user_id;

对比:方法 2 比方法 1 效率更好,不但 IO 少了,而且作业数也少了,方案 1 中,log 表读了两次,jobs 肯定是 2,而方案 2 是 1。这个优化适合无效 id(比如-99,’ ‘,null)产生的数据倾斜,把空值的 key 变成一个字符串加上一个随机数,就能把造成数据倾斜的数据分到不同的 reduce 上解决数据倾斜的问题。改变之处在于使本身为 null 的所有记录不会拥挤在同一个 reduceTask 了,会由于有替代的随机字符串值,而分散到了多个 reduceTask 中了,由于 null 值关联不上,处理后并不影响最终结果。

不同数据类型关联产生数据倾斜

用户表中 user_id 字段为 int,log 表中 user_id 为既有 string 也有 int 的类型, 当按照两个表的 user_id 进行 join 操作的时候,默认的 hash 操作会按照 int 类型的 id 进行分配,这样就会导致所有的 string 类型的 id 就被分到同一个 reducer 当中。

解决方法:把数字类型 id 转换成 string 类型的 id

1
2
3
select *
from user a
left outer join log b on b.user_id = cast(a.user_id as string)

大小表关联查询产生数据倾斜

使用map join解决小表关联大表造成的数据倾斜问题。这个方法使用的频率很高。

map join 概念:将其中做连接的小表(全量数据)分发到所有 MapTask 端进行 Join,从而避免了 reduceTask,前提要求是内存足以装下该全量数据。

1228818-20180415144152040-870536898

以大表 a 和小表 b 为例,所有的 maptask 节点都装载小表 b 的所有数据,然后大表 a 的一个数据块数据(比如说 a1)去跟 b 全量数据做链接,就省去了 reduce 做汇总的过程。所以,相对来说,在内存允许的条件下使用 map join 比直接使用 MapReduce 效率还高些, 当然这只限于做 join 查询的时候。

在 hive 中,直接提供了能够在 HQL 语句指定该次查询使用 map join的方法。map join 的用法是在查询/子查询的SELECT关键字后面添加/*+ MAPJOIN(tablelist) */,提示优化器转化为map join(早期的 Hive 版本的优化器是不能自动优化 map join 的)。其中 tablelist 可以是一个表,或以逗号连接的表的列表。tablelist 中的表将会读入内存,通常应该是将小表写在这里。

map join 具体用法:

1
2
3
4
5
6
7
8
select /* +mapjoin(a) */ a.id aid, name, age
from a
join b on a.id = b.id;

select /* +mapjoin(movies) */ a.title, b.rating
from movies a
join ratings b on a.movieid =
b.movieid;

在 hive0.11 版本以后会自动开启 map join 优化,由两个参数控制:

1
2
set hive.auto.convert.join=true; #设置 MapJoin 优化自动开启
set hive.mapjoin.smalltable.filesize=25000000 #设置小表不超过多大时开启 mapjoin 优化

大大表关联查询产生数据倾斜

大事化小,小事化了。把大表切分成小表,然后分别 map join。

小表不大不小的数据倾斜

使用 map join 解决小表(记录数少)关联大表的数据倾斜问题,这个方法使用的频率非常高,但如果小表很大,大到 map join 会出现 bug 或异常,这时就需要特别的处理。

举一例:日志表和用户表做链接

1
2
3
select *
from log a
left outer join users b on a.user_id = b.user_id;

若users 表有 600w+的记录,把 users 分发到所有的 map 上也是个不小的开销,而且 map join 不支持这么大的小表。如果用普通的 join,又会碰到数据倾斜的问题。

改进方案:

1
2
3
4
5
6
7
8
select /*+mapjoin(x)*/*
from log a
left outer join (
select /*+mapjoin(c)*/ d.*
from (select distinct user_id from log) c
join users d on c.user_id = d.user_id
) x
on a.user_id = x.user_id;

假如log里user_id有上百万个,这就又回到原来map join问题。所幸,每日的会员uv不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等。所以这个方法能解决很多场景下的数据倾斜问题。

总结

使map的输出数据更均匀的分布到reduce中去,是我们的最终目标。由于Hash算法的局限性,按key Hash会或多或少的造成数据倾斜。大量经验表明数据倾斜的原因是人为的建表疏忽或业务逻辑可以规避的。在此给出较为通用的步骤:

  1. 采样log表,哪些user_id比较倾斜,得到一个结果表tmp1。由于对计算框架来说,所有的数据过来,他都是不知道数据分布情况的,所以采样是并不可少的。
  2. 数据的分布符合社会学统计规则,贫富不均。倾斜的key不会太多,就像一个社会的富人不多,奇特的人不多一样。所以tmp1记录数会很少。把tmp1和users做map join生成tmp2,把tmp2读到distribute file cache。这是一个map过程。
  3. map读入users和log,假如记录来自log,则检查user_id是否在tmp2里。如果是,输出到本地文件a,否则生成的key-value对,假如记录来自member,生成的key-value对,进入reduce阶段。
  4. 最终把a文件,把Stage3 reduce阶段输出的文件合并起写到hdfs。

如果确认业务需要这样倾斜的逻辑,考虑以下的优化方案:

  • 对于join,在判断小表不大于1G的情况下,使用map join
  • 对于group by或distinct,设定 hive.groupby.skewindata=true
  • 尽量使用上述的SQL语句调节进行优化

参考