Mapreduce job非常合适处理大文件,不善于处理大量的小文件。在处理大量小文件的时候,因为一个mapperjob需要的数据几乎全部来自网络,建立连接和传递数据的开销很大,所以导致job运行的时间变长,时间效率降低。同时大量的小文件会占用很多的namespace。
所以在pig中要避免出现大量的小文件。但有时候这样的现象经常出现,尤其是当我们需要从一大堆数据中抽取几列数据作分析的时候。所以下面的代码很常见。
- -- register
- REGISTER DuckPigUdf.jar;
- -- define
- DEFINE duck_storage duck.java.pig.udf.storage();
- -- clean up
- rmf $output_data_path;
- -- load data
- raw_events = LOAD '$input_data_path' USING duck_storate;
- trim_events = FOREACH raw_events {
- GENERATE
- name AS name,
- age AS age,
- timestamp AS timestamp;
- };
- -- store
- STORE trim_events INTO '$output_data_path' USING PigStorage('\u0001');
例如,你用这样的pig处理了一天的数据。因为数据量非常大,所以pig调度了3000个mapper来把数据抽取出来。一个mapper会生成一个part文件。于是,在结果文件中出现了3000多个part文件。如果你不担心namespace的问题,也可以这么做。但是当你做后续处理的时候你就崩溃了。
例如,你紧接着做了一个GROUP操作。当pig在调度mapreduce job时,发现3000多个文件的大小总和不到100M,于是它就调度了一个mapper来处理数据。这个时候你就会发现,这个该死的mapper居然用了20minutes才处理了100M的数据!你会仔细分析job的日志,然后发现,因为每个part都要传递到一个mapper上,导致处理非常慢。所以,需要把这些小文件合并称大文件,减少网络传递开销。
常用的方法是是在数据抽取job中引入reduce。其实数据抽取的pig是个map only的job,根本没有reduce阶段,数据也就不会做聚合,导致文件特别多。所以只需要在pig中引入group操作(或者其他可以引入reduce的操作,比如order by)。
于是在pig脚本中引入了group all操作,然后做flatten。Does it work?
NO!非常不幸。这个时候数据抽取的job运行非常慢。实际上,这样做只是把数据处理的时间消耗移动到了数据抽取阶段。
需要引入多个reducer,于是在group all中加入parallel 10。Does it work?
NO!非常不幸。因为pig中group all是reducer 1的操作。定义就是如此。
于是groupby 一个字段,这个时候,你需要仔细的选择字段,不要出现某一个字段上数据量特别大(也就是数据倾斜的问题)。
与其冒险的选择一个字段,不如引入一个随机数字段,在这个字段上做group,设置parallel后,可以控制job的运行时间。于是pig就变成了这样。
- -- register
- REGISTER DuckPigUdf.jar;
- -- define
- DEFINE duck_storage duck.java.pig.udf.storage();
- -- clean up
- rmf $output_data_path;
- -- load data
- raw_events = LOAD '$input_data_path' USING duck_storage;
- trim_events = FOREACH raw_events {
- rand_seed = (int)(RANDOM()*100); -- this field is to handle large numbers of small files
- GENERATE
- rand_seed AS rand_seed,
- name AS name,
- age AS age,
- timestamp AS timestamp;
- };
- group_events = GROUP trim_events BY rand_seed PARALLEL 10;
- result = FOREACH group_events GENERATE flatten(trim_events);
- -- store
- STORE result INTO '$output_data_path' USING PigStorage('\u0001');
总结:如果你想把大量的小文件合并成大文件,这应该是一个通用的方法。在字段中引入随机数,把数据均匀的分散开。
联系客服