set mapred.reduce.tasks=2; Select sale_id, amount from t_order Distribute by sale_id Sort by sale_id, amount;
这样能够保证查询的销售记录集合中,销售ID对应的数量是正确排序的,但是销售ID不能正确排序,原因是hive使用hadoop默认的HashPartitioner分发数据。
这就涉及到一个全排序的问题。解决的办法无外乎两种:
1.) 不分发数据,使用单个reducer:
set mapred.reduce.tasks=100; set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner; set total.order.partitioner.path=/tmp/ range_key_list; Select sale_id, amount from t_order Cluster by sale_id Sort by amount;
有很多种方法生成这一区间文件(例如hadoop自带的o.a.h.mapreduce.lib.partition.InputSampler工具)。这里介绍用Hive生成的方法,例如有一个按id有序的t_sale表:
CREATE TABLE if not exists t_sale ( id int, name string, loc string );
则生成按sale_id分发的区间文件的方法是:
create external table range_keys(sale_id int) row format serde'org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe' stored as inputformat'org.apache.hadoop.mapred.TextInputFormat' outputformat'org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat' location '/tmp/range_key_list'; insert overwrite table range_keys select distinct sale_id from source t_sale sampletable(BUCKET 100 OUT OF 100 ON rand()) s sort by sale_id;
生成的文件(/tmp/range_key_list目录下)可以让TotalOrderPartitioner按sale_id有序地分发reduce处理的数据。区间文件需要考虑的主要问题是数据分发的均衡性,这有赖于对数据深入的理解。
测试案例:
数据 140g, 按照字段time 降序排列 选出最大的前50个。
使用 一般方法 select * from table order by time desc limit 50. 执行了1小时6分钟完全算出。
任务数1个 map数 1783 reduce 1
而 select * from (select * from table distribute by time sort by time desc limit 50 ) t order by time desc limit 50;
需要5分钟算出。结果一致。
任务数2个 分别是:
map 1783 reduce 245
map 245 reduce 1