打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
Pig 处理大量的小文件

Mapreduce job非常合适处理大文件,不善于处理大量的小文件。在处理大量小文件的时候,因为一个mapperjob需要的数据几乎全部来自网络,建立连接和传递数据的开销很大,所以导致job运行的时间变长,时间效率降低。同时大量的小文件会占用很多的namespace。

所以在pig中要避免出现大量的小文件。但有时候这样的现象经常出现,尤其是当我们需要从一大堆数据中抽取几列数据作分析的时候。所以下面的代码很常见。

 

  1. -- register
  2. REGISTER DuckPigUdf.jar;

  3. -- define
  4. DEFINE duck_storage duck.java.pig.udf.storage();

  5. -- clean up
  6. rmf $output_data_path;

  7. -- load data
  8. raw_events = LOAD '$input_data_path' USING duck_storate;

  9. trim_events = FOREACH raw_events {
  10. GENERATE
  11. name AS name,
  12. age AS age,
  13. timestamp AS timestamp;
  14. };

  15. -- store
  16. STORE trim_events INTO '$output_data_path' USING PigStorage('\u0001');

例如,你用这样的pig处理了一天的数据。因为数据量非常大,所以pig调度了3000个mapper来把数据抽取出来。一个mapper会生成一个part文件。于是,在结果文件中出现了3000多个part文件。如果你不担心namespace的问题,也可以这么做。但是当你做后续处理的时候你就崩溃了。

例如,你紧接着做了一个GROUP操作。当pig在调度mapreduce job时,发现3000多个文件的大小总和不到100M,于是它就调度了一个mapper来处理数据。这个时候你就会发现,这个该死的mapper居然用了20minutes才处理了100M的数据!你会仔细分析job的日志,然后发现,因为每个part都要传递到一个mapper上,导致处理非常慢。所以,需要把这些小文件合并称大文件,减少网络传递开销。

常用的方法是是在数据抽取job中引入reduce。其实数据抽取的pig是个map only的job,根本没有reduce阶段,数据也就不会做聚合,导致文件特别多。所以只需要在pig中引入group操作(或者其他可以引入reduce的操作,比如order by)。

于是在pig脚本中引入了group all操作,然后做flatten。Does it work?

NO!非常不幸。这个时候数据抽取的job运行非常慢。实际上,这样做只是把数据处理的时间消耗移动到了数据抽取阶段。

需要引入多个reducer,于是在group all中加入parallel 10。Does it work?

NO!非常不幸。因为pig中group all是reducer 1的操作。定义就是如此。

于是groupby 一个字段,这个时候,你需要仔细的选择字段,不要出现某一个字段上数据量特别大(也就是数据倾斜的问题)。

与其冒险的选择一个字段,不如引入一个随机数字段,在这个字段上做group,设置parallel后,可以控制job的运行时间。于是pig就变成了这样。

 

  1. -- register
  2. REGISTER DuckPigUdf.jar;

  3. -- define
  4. DEFINE duck_storage duck.java.pig.udf.storage();

  5. -- clean up
  6. rmf $output_data_path;

  7. -- load data
  8. raw_events = LOAD '$input_data_path' USING duck_storage;

  9. trim_events = FOREACH raw_events {
  10. rand_seed = (int)(RANDOM()*100); -- this field is to handle large numbers of small files
  11. GENERATE
  12. rand_seed AS rand_seed,
  13. name AS name,
  14. age AS age,
  15. timestamp AS timestamp;
  16. };

  17. group_events = GROUP trim_events BY rand_seed PARALLEL 10;
  18. result = FOREACH group_events GENERATE flatten(trim_events);

  19. -- store
  20. STORE result INTO '$output_data_path' USING PigStorage('\u0001');

总结:如果你想把大量的小文件合并成大文件,这应该是一个通用的方法。在字段中引入随机数,把数据均匀的分散开。

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
【微服务】161:SpringDataElasticsearch
第13章 MyBatis的关联映射
pig学习笔记
基于Kafka时间粒度消息回溯设计方案
在Windows10中使用filebeat将日志发送至kafka,并去除fileBeat添加的冗余字...
SqlLoader
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服