Чтение сжатого файла с использованием Apache Beam, обернутого в TextIOWrapper, приводит к тому, что «объект CompressedFile не имеет атрибута« доступный для записи »» - PullRequest
0 голосов
/ 22 октября 2019

Я работаю над реализацией простого считывателя CSV в Apache Beam, следуя тесту репозитория луча: https://github.com/apache/beam/blob/b85795adbd22d8b5cf9ebc684ce43e172a789587/sdks/python/apache_beam/io/fileio_test.py#L128-L148

def get_csv_reader(readable_file):
  import sys
  import csv
  import io
  if sys.version_info >= (3, 0):
    return csv.reader(io.TextIOWrapper(readable_file.open()))
  else:
    return csv.reader(readable_file.open())

with beam.Pipeline() as p:
  content_pc = (p
                | beam.Create([CSV_FILE])
                | fileio.ReadMatches()
                | beam.FlatMap(get_csv_reader)
                | beam.Map(print))

Это работает правильно, если CSV_FILE не сжимается, и я получаюбез ошибокОднако, если я использую сжатый файл в качестве ввода, я получаю:

<ipython-input-114-4830c3592163> in get_csv_reader(readable_file)
      6   import io
      7   if sys.version_info >= (3, 0):
----> 8     return csv.reader(io.TextIOWrapper(readable_file.open()))
      9   else:
     10     return csv.reader(readable_file.open())

AttributeError: 'CompressedFile' object has no attribute 'writable' [while running 'FlatMap(get_csv_reader)']

Я понимаю, почему это происходит (что TextIOWrapper ищет читаемый И записываемый объект). Есть ли люди, которые лучше разбираются в пучке / потоке данных Apache и могут подсказать, как лучше всего реализовать это для обработки как сжатых, так и несжатых входных данных?

1 Ответ

2 голосов
/ 23 октября 2019

Извините за беспокойство. К сожалению, это очень неловко в данный момент. Способ сделать это прямо сейчас - использовать внутреннее содержимое объекта ReadableFile, например, так:

filesystems.Filesystems.open(rf.metadata.path, compression=MY_COMPRESSION)

NOT YET AVAILABLE - InЛуч 2.18.0, вы сможете сделать следующее:

def get_csv_reader(readable_file):
  import sys
  import csv
  import io
  if sys.version_info >= (3, 0):
    return csv.reader(io.TextIOWrapper(readable_file.open(compression_type=MY_COMPRESSION)))
  else:
    return csv.reader(readable_file.open(compression_type=MY_COMPRESSION))

with beam.Pipeline() as p:
  content_pc = (p
                | beam.Create([CSV_FILE])
                | fileio.ReadMatches()
                | beam.FlatMap(get_csv_reader)
                | beam.Map(print))
...