Я работаю над реализацией простого считывателя CSV в Apache Beam, следуя тесту репозитория луча: https://github.com/apache/beam/blob/b85795adbd22d8b5cf9ebc684ce43e172a789587/sdks/python/apache_beam/io/fileio_test.py#L128-L148
def get_csv_reader(readable_file):
import sys
import csv
import io
if sys.version_info >= (3, 0):
return csv.reader(io.TextIOWrapper(readable_file.open()))
else:
return csv.reader(readable_file.open())
with beam.Pipeline() as p:
content_pc = (p
| beam.Create([CSV_FILE])
| fileio.ReadMatches()
| beam.FlatMap(get_csv_reader)
| beam.Map(print))
Это работает правильно, если CSV_FILE не сжимается, и я получаюбез ошибокОднако, если я использую сжатый файл в качестве ввода, я получаю:
<ipython-input-114-4830c3592163> in get_csv_reader(readable_file)
6 import io
7 if sys.version_info >= (3, 0):
----> 8 return csv.reader(io.TextIOWrapper(readable_file.open()))
9 else:
10 return csv.reader(readable_file.open())
AttributeError: 'CompressedFile' object has no attribute 'writable' [while running 'FlatMap(get_csv_reader)']
Я понимаю, почему это происходит (что TextIOWrapper ищет читаемый И записываемый объект). Есть ли люди, которые лучше разбираются в пучке / потоке данных Apache и могут подсказать, как лучше всего реализовать это для обработки как сжатых, так и несжатых входных данных?