最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

连接大文件、管道和奖励

SEO心得admin94浏览0评论
本文介绍了连接大文件、管道和奖励的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

有人问过(和回答过)类似的问题,但从来没有真正在一起过,而且我似乎无法解决任何问题.由于我刚刚开始使用 Python,所以一些易于理解的东西会很棒!

我有 3 个大数据文件 (>500G),我需要解压缩、连接、将其通过管道传输到子进程,然后将该输出通过管道传输到另一个子进程.然后我需要处理我想在 Python 中做的最终输出.请注意,除了处理之外,我不需要解压缩和/或连接的文件 - 创建一个我认为会浪费空间的文件.这是我目前所拥有的...

导入 gzip从子流程导入 Popen, PIPE#压缩文件zipfile1 = "./file_1.txt.gz"zipfile2 = "./file_2.txt.gz"zipfile3 = "./file_3.txt.gz"# 打开第一个管道p1 = Popen(["dataclean.pl"], stdin=PIPE, stdout=PIPE)# 解压缩文件并通过管道将它们导入(必须是一种更 Pythonic 的方式来做到这一点 -# 如果这甚至是正确的)unzipfile1 = gzip.open(zipfile1, 'wb')p1.stdin.write(unzipfile1.read())unzipfile1.close()unzipfile2 = gzip.open(zipfile2, 'wb')p1.stdin.write(unzipfile2.read())unzipfile2.close()unzipfile3 = gzip.open(zipfile3, 'wb')p1.stdin.write(unzipfile3.read())unzipfile3.close()# 将 p1 的输出通过管道传输到 p2p2 = Popen(["dataprocess.pl"], stdin=p1.stdout, stdout=PIPE)# 不知道这是做什么的 - 关于 SIGPIPEp1.stdout.close()## 不知道这有什么作用 - 但它在 pydoc 中输出 = p2municate()[0]## 对 p2.stdout 的更多处理...打印 p2.stdout

任何建议将不胜感激.*作为一个额外的问题...... read() 的 pydoc 是这样说的:

还要注意,在非阻塞模式下,即使没有给出大小参数,也可能返回比请求少的数据."

这看起来很可怕.任何人都可以解释它吗?我不想只读取数据集的一部分,认为它是全部.我认为保留文件的大小是一件好事,尤其是当我不知道文件的大小时.

谢谢,

GK

解决方案

第一件事;我认为您的模式不正确:

unzipfile1 = gzip.open(zipfile1, 'wb')

这个应该打开zipfile1以便写作,不读书.我希望你的数据仍然存在.

其次,您不想尝试一次处理全部数据.您应该以 16k 或 32k 或类似的块为单位处理数据.(最佳大小会因许多因素而异;如果此任务必须多次执行,请使其可配置,以便您可以安排不同的大小.)

您正在寻找的可能更像是未经测试的伪代码:

while (block = unzipfile1.read(4096*4)):p1.stdin.write(a)

如果您尝试使用 Python 将管道中的多个进程连接在一起,那么它可能看起来更像这样:

while (block = unzipfile1.read(4096*4)):p1.stdin.write(a)p2.stdin.write(p1.stdout.read())

这将尽可能快地从 p1 到 p2 的输出.我已经假设 p1 不会产生比给出的多得多的输入.如果 p1 的输出将比输入大十倍,那么你应该再做一个类似于这个的循环.

但是,我不得不说,这感觉就像复制 shell 脚本需要做很多额外的工作:

gzip -cd file1.gz file2.gz file3.gz |dataclean.py |数据处理文件

gzip(1) 将自动处理块大小的数据传输,如上所述,并假设您的 dataclean.py 和 dataprocess.pl 脚本还以块为单位处理数据,而不是执行完整读取(如您的脚本的原始版本所做的那样),那么它们都应该尽可能地并行运行.>

There has been similar questions asked (and answered), but never really together, and I can't seem to get anything to work. Since I am just starting with Python, something easy to understand would be great!

I have 3 large data files (>500G) that I need to unzip, concatenate, pipe it to a subprocess, then pipe that output to another subprocess. I then need to process that final output which I would like to do in Python. Note I do not need the unzipped and/or concatenated file except for the processing - creating one I think would be a waste of space. Here is what I have so far...

import gzip from subprocess import Popen, PIPE #zipped files zipfile1 = "./file_1.txt.gz" zipfile2 = "./file_2.txt.gz" zipfile3 = "./file_3.txt.gz" # Open the first pipe p1 = Popen(["dataclean.pl"], stdin=PIPE, stdout=PIPE) # Unzip the files and pipe them in (has to be a more pythonic way to do it - # if this is even correct) unzipfile1 = gzip.open(zipfile1, 'wb') p1.stdin.write(unzipfile1.read()) unzipfile1.close() unzipfile2 = gzip.open(zipfile2, 'wb') p1.stdin.write(unzipfile2.read()) unzipfile2.close() unzipfile3 = gzip.open(zipfile3, 'wb') p1.stdin.write(unzipfile3.read()) unzipfile3.close() # Pipe the output of p1 to p2 p2 = Popen(["dataprocess.pl"], stdin=p1.stdout, stdout=PIPE) # Not sure what this does - something about a SIGPIPE p1.stdout.close() ## Not sure what this does either - but it is in the pydoc output = p2municate()[0] ## more processing of p2.stdout... print p2.stdout

Any suggestions would be greatly appreciated. *As a bonus question...the pydoc for read() says this:

"Also note that when in non-blocking mode, less data than what was requested may be returned, even if no size parameter was given."

That seems scary. Can anyone interpret it? I don't want to read in only part of a dataset thinking it is the whole thing. I thought leaving the size of the file was a good thing, especially when I don't know the size of the file.

Thanks,

GK

解决方案

First things first; I think you've got your modes incorrect:

unzipfile1 = gzip.open(zipfile1, 'wb')

This should open zipfile1 for writing, not reading. I hope your data still exists.

Second, you do not want to try to work with the entire data all at once. You should work with the data in blocks of 16k or 32k or something. (The optimum size will vary based on many factors; make it configurable if this task has to be done many times, so you can time different sizes.)

What you're looking for is probably more like this untested pseudo-code:

while (block = unzipfile1.read(4096*4)): p1.stdin.write(a)

If you're trying to hook together multiple processes in a pipeline in Python, then it'll probably look more like this:

while (block = unzipfile1.read(4096*4)): p1.stdin.write(a) p2.stdin.write(p1.stdout.read())

This gives the output from p1 to p2 as quickly as possible. I've made the assumption that p1 won't generate significantly more input than it was given. If the output of p1 will be ten times greater than the input, then you should make another loop similar to this one.

But, I've got to say, this feels like a lot of extra work to replicate the shell script:

gzip -cd file1.gz file2.gz file3.gz | dataclean.py | dataprocess.pl

gzip(1) will automatically handle the block-sized data transfer as I've described above, and assuming your dataclean.py and dataprocess.pl scripts also work with data in blocks rather than performing full reads (as your original version of this script does), then they should all run in parallel near the best of their abilities.

发布评论

评论列表(0)

  1. 暂无评论