hadoop sreaming + AWK 大数据处理

Hadoop Streaming 使用了 Unix 的标准输入输出作为 Hadoop 和其他编程语言的开发接口,因此可以使用Hadoop Streaming 结合AWK 处理文本大数据。 本篇主要结合实际应用中的一个任务需求进行介绍。

0.任务

一份大数据文档以TAB 为分隔符,文档格式为:

1
A "\t" B "\t" C "\t" D

mapper中利用AWK对文档进行处理,输出Key = A”\t”B,Value = C”\t”D。

reducer接收输入按Key=A”\t”B分桶并排序,进行运算合并Key对应Value。

最终输出文档 Key=A”\t”B 唯一。

举例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
mapper num = 2,reducer num = 2,mapper、reducer运算皆为对应字段简单相加
输入:
a1 \t b1 \t c1 \t d1
a1 \t b1 \t c2 \t d2
a2 \t b2 \t c3 \t d3
a2 \t b2 \t c4 \t d4
a2 \t b2 \t c5 \t d5
a1 \t b1 \t c6 \t d6

mapper 输出:
a1 \t b1 \t c1+c2 \t d1+d2
a2 \t b2 \t c3 \t d3

a2 \t b2 \t c4+c5 \t d4+d5
a1 \t b1 \t c6 \t d6

reducer 输入:
a1 \t b1 \t c1+c2 \t d1+d2
a1 \t b1 \t c6 \t d6

a2 \t b2 \t c4+c5 \t d4+d5
a2 \t b2 \t c3 \t d3

reducer 输出:
a1 \t b1 \t c1+c2+c6 \t d1+d2+d6

a2 \t b2 \t c4+c5+c3 \t d4+d5+d3

一.Hadoop Streaming 概要

Hadoop Streaming 使用了 Unix 的标准输入输出作为 Hadoop 和其他编程语言的开发接口,map/reduce 程序只要遵循从标准输入stdin 读,写出到标准输出stdout 即可。

同时本地调试可通过linux指令进行,指令如下:

1
# cat inputfile | mapper | sort | reducer > output

mapper、reducer 为对应的map/reduce 程序。

当然,本地调试与hadoop实际运行仍有差异,主要差异在sort的理解和应用。下面重点介绍sort相关知识。

二.分桶与排序

1.Key 和 Value

经过mapper后hadoop输出数据以Key、Value格式组织,Key、Value默认分隔符为第一个TAB。之后会根据Key 对数据进行分桶。

2.mapper

如果想自定义mapper 输出,则可进行如下设置:

1
2
3
#-D stream.map.output.field.separator=.    # 指定mapper每条输出key,value分隔符
-D stream.num.map.output.key.fields=2 # 第2个.之前的为key, 剩下的为value
#-D map.output.key.field.separator=. # 设置map输出中,Key内部的分隔符

由于默认以TAB作分隔符,本例中设置分隔符的操作可以省略,仅设置输出Key列数。

3.Partition

Partition是分桶过程,如reduce num 设置为N,则Partition会依据Key将map后的数据按规则分配为N组,默认规则是按Key 的hash 值。

官网看到的Partition类型有如下几种:

KeyFieldBasedPartitioner 可以按Key分隔的Field进行分桶,分桶规则依然是Hash。

TotalOrderPartitioner可以全局排序。

本例中需要使用KeyFieldBasedPartitioner,其他类可自行了解。

Partition过程可进行如下设置:

1
2
3
4
5
6
7
8
9
10
11
A 指定key列数
#-D num.key.fields.for.partition=1 # 第1列key
-D num.key.fields.for.partition=2 # 前两列key

#B 指定某些字段做key
#-D mapred.text.key.partitioner.option =-k1,2 # Key第1,2列
#-D mapred.text.key.partitioner.option =-k2,2 # key第2列

#同时需要修改使用的partition 类

-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

4.Comparator

如上设置后已可以完成开头提出的任务。但存在一个问题,由于是按 A”\t”B Hash分桶,对于A、B字段按字典序排列将被分配至不同reducer,如:

1
2
3
"a" \t "b" \t "c" \t "d"
"a" \t "b1" \t "c1" \t "d1"
"a" \t "b""a" \t "b1" Hash值会不同,可能分配至不同桶

如果想保留Key的字典序,一个思路是按Key=”A”分桶,在reduce输入排序阶段按Key=A”\t”B排序。这就会使用到Comparator,进行如下设置:

1
2
3
4
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
-D mapred.text.key.comparator.options=-k1,2
#同时修改partion 使用的key
-D num.key.fields.for.partition=1 # 第1列key

最终得到预期输出。

四.参考资料

1.Hadoop Streaming 官网
2.Hadoop Streaming详解
3.MapReduce排序过程详解