打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
如何在python中使用pyarrow从S3读取分区镶木地板文件

我正在寻找使用python从s3读取多个分区目录数据的方法.

data_folder / SERIAL_NUMBER = 1 / cur_date = 20-12-2012 / abcdsd0324324.snappy.parquet
data_folder / SERIAL_NUMBER = 2 / cur_date = 27-12-2012 / asdsdfsd0324324.snappy.parquet

pyarrow的ParquetDataset模块具有从分区读取的能力.所以我尝试了以下代码:

>>> import pandas as pd>>> import pyarrow.parquet as pq>>> import s3fs>>> a = "s3://my_bucker/path/to/data_folder/">>> dataset = pq.ParquetDataset(a)

它引发了以下错误:

Traceback (most recent call last):  File "<stdin>", line 1, in <module>  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__    self.metadata_path) = _make_manifest(path_or_paths, self.fs)  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 601, in _make_manifest    .format(path))OSError: Passed non-file path: s3://my_bucker/path/to/data_folder/

根据pyarrow的文档,我尝试使用s3fs作为文件系统,即:

>>> dataset = pq.ParquetDataset(a,filesystem=s3fs)

这会引发以下错误:

Traceback (most recent call last):  File "<stdin>", line 1, in <module>  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__    self.metadata_path) = _make_manifest(path_or_paths, self.fs)  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 583, in _make_manifest    if is_string(path_or_paths) and fs.isdir(path_or_paths):AttributeError: module 's3fs' has no attribute 'isdir'

我只能使用ECS集群,因此不能选择spark / pyspark.

有没有办法我们可以轻松地从s3中的这些分区目录中的python中轻松读取镶木地板文件?我认为列出所有目录,然后阅读这不是一个很好的做法,如link所示.我需要将读取数据转换为pandas数据帧以进行进一步处理&因此更喜欢与fastparquet或pyarrow相关的选项.我也对python中的其他选项持开放态度.

解决方法:

我设法使用最新版本的fastparquet& s3fs.以下是相同的代码:

import s3fsimport fastparquet as fps3 = s3fs.S3FileSystem()fs = s3fs.core.S3FileSystem()#mybucket/data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet s3_path = "mybucket/data_folder/*/*/*.parquet"all_paths_from_s3 = fs.glob(path=s3_path)myopen = s3.open#use s3fs as the filesystemfp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)#convert to pandas dataframedf = fp_obj.to_pandas()

通过我们的conversation向马丁指出我正确的方向

注意:根据benchmark,这比使用pyarrow要慢.一旦通过ARROW-1213在pyarrow中实现s3fs支持,我将更新我的答案

我使用pyarrow& amp;进行了单独迭代的快速基准测试.文件列表以glob形式发送到fastparquet.对于s3fs vs pyarrow,fastparquet更快,我的hackish代码.但我认为pyarrow s3fs一旦实现就会更快.

代码&基准如下:

>>> def test_pq():...     for current_file in list_parquet_files:...         f = fs.open(current_file)...         df = pq.read_table(f).to_pandas()...         # following code is to extract the serial_number & cur_date values so that we can add them to the dataframe...         #probably not the best way to split :)...         elements_list=current_file.split('/')...         for item in elements_list:...             if item.find(date_partition) != -1:...                 current_date = item.split('=')[1]...             elif item.find(dma_partition) != -1:...                 current_dma = item.split('=')[1]...         df['serial_number'] = current_dma...         df['cur_date'] = current_date...         list_.append(df)...     frame = pd.concat(list_)...>>> timeit.timeit('test_pq()',number =10,globals=globals())12.078817503992468>>> def test_fp():...     fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)...     df = fp_obj.to_pandas()>>> timeit.timeit('test_fp()',number =10,globals=globals())2.961556333000317

更新2019年

在所有PR之后,问题如Arrow-2038& Fast Parquet – PR#182已经解决.

使用Pyarrow读取镶木地板文件

# pip install pyarrow# pip install s3fs>>> import s3fs>>> import pyarrow.parquet as pq>>> fs = s3fs.S3FileSystem()>>> bucket = 'your-bucket-name'>>> path = 'directory_name' #if its a directory omit the traling />>> bucket_uri = f's3://{bucket}/{path}''s3://your-bucket-name/directory_name'>>> dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)>>> table = dataset.read()>>> df = table.to_pandas() 

使用Fast parquet阅读镶木地板文件

# pip install s3fs# pip install fastparquet>>> import s3fs>>> import fastparquet as fp>>> bucket = 'your-bucket-name'>>> path = 'directory_name'>>> root_dir_path = f'{bucket}/{path}'# the first two wild card represents the 1st,2nd column partitions columns of your data & so forth>>> s3_path = f"{root_dir_path}/*/*/*.parquet">>> all_paths_from_s3 = fs.glob(path=s3_path)>>> fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)>>> df = fp_obj.to_pandas()

快速基准

这可能不是对其进行基准测试的最佳方式.请阅读blog post以获得直通基准

#pyarrow>>> import timeit>>> def test_pq():...     dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)...     table = dataset.read()...     df = table.to_pandas()...>>> timeit.timeit('test_pq()',number =10,globals=globals())1.2677053569998407#fastparquet>>> def test_fp():...     fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)...     df = fp_obj.to_pandas()>>> timeit.timeit('test_fp()',number =10,globals=globals())2.931876824000028

关于Pyarrow的speed的进一步阅读

参考:

> fastparquet
> s3fs
> pyarrow
>基于discussion&amp ;;的pyarrow箭头代码还有文件
>基于讨论的fastparquet代码PR-182,PR-182&还有文件

来源:https://www.icode9.com/content-1-484451.html
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
Pandas库 | 高速读取csv文件的方法
解密 parquet 文件,以及如何用 Python 去处理它
Pandas 初学者代码优化指南
如何正确使用Pandas库提升项目的运行速度?
比pandas速度更快的数据分析工具-polars,已支持python语言!
pandas100个骚操作:使用 Datetime 提速 50 倍运行速度!
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服