Hadoop Streaming 使用了 Unix 的标准输入输出作为 Hadoop 和其他编程语言的开发接口,因此可以使用Hadoop Streaming 结合AWK 处理文本大数据。 本篇主要结合实际应用中的一个任务需求进行介绍。
0.任务
一份大数据文档以TAB 为分隔符,文档格式为:
1 | A "\t" B "\t" C "\t" D |
mapper中利用AWK对文档进行处理,输出Key = A”\t”B,Value = C”\t”D。
reducer接收输入按Key=A”\t”B分桶并排序,进行运算合并Key对应Value。
最终输出文档 Key=A”\t”B 唯一。
举例:
1 | mapper num = 2,reducer num = 2,mapper、reducer运算皆为对应字段简单相加 |
一.Hadoop Streaming 概要
Hadoop Streaming 使用了 Unix 的标准输入输出作为 Hadoop 和其他编程语言的开发接口,map/reduce 程序只要遵循从标准输入stdin 读,写出到标准输出stdout 即可。
同时本地调试可通过linux指令进行,指令如下:
1 | # cat inputfile | mapper | sort | reducer > output |
mapper、reducer 为对应的map/reduce 程序。
当然,本地调试与hadoop实际运行仍有差异,主要差异在sort的理解和应用。下面重点介绍sort相关知识。
二.分桶与排序
1.Key 和 Value
经过mapper后hadoop输出数据以Key、Value格式组织,Key、Value默认分隔符为第一个TAB。之后会根据Key 对数据进行分桶。
2.mapper
如果想自定义mapper 输出,则可进行如下设置:
1 | #-D stream.map.output.field.separator=. # 指定mapper每条输出key,value分隔符 |
由于默认以TAB作分隔符,本例中设置分隔符的操作可以省略,仅设置输出Key列数。
3.Partition
Partition是分桶过程,如reduce num 设置为N,则Partition会依据Key将map后的数据按规则分配为N组,默认规则是按Key 的hash 值。
官网看到的Partition类型有如下几种:
- BinaryPartitioner
- HashPartitioner
- IndexUpdatePartitioner
- KeyFieldBasedPartitioner
- SleepJob
- TotalOrderPartitioner
KeyFieldBasedPartitioner 可以按Key分隔的Field进行分桶,分桶规则依然是Hash。
TotalOrderPartitioner可以全局排序。
本例中需要使用KeyFieldBasedPartitioner,其他类可自行了解。
Partition过程可进行如下设置:
1 | A 指定key列数 |
4.Comparator
如上设置后已可以完成开头提出的任务。但存在一个问题,由于是按 A”\t”B Hash分桶,对于A、B字段按字典序排列将被分配至不同reducer,如:
1 | "a" \t "b" \t "c" \t "d" |
如果想保留Key的字典序,一个思路是按Key=”A”分桶,在reduce输入排序阶段按Key=A”\t”B排序。这就会使用到Comparator,进行如下设置:
1 | -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator |
最终得到预期输出。