WriteToText пишет только во временные файлы - PullRequest
1 голос
/ 10 июля 2019

Я новичок в Apache Beam и пытаюсь написать свой первый конвейер на Python для вывода данных из подписки Google Pub / Sub на плоские файлы для дальнейшего использования;в идеале я хочу собирать их в файл, скажем, каждые полчаса.У меня есть следующий код в качестве окончательного преобразования в моем конвейере: -

| 'write output' >> WriteToText('TestNewPipeline.txt')

Однако все созданные файлы находятся в каталоге с префиксом "beam-temp-TestNewPipeline.txt- [somehash]" и пакетируютсяна группы по 10 человек, чего я не ожидал.

Я пытался поиграть с оконной функцией, но, похоже, она не имела большого эффекта, поэтому либо я совершенно не понимаю концепцию, либо делаю что-то совершенно неверное.

код для окна: -

 | 'Window' >> beam.WindowInto(beam.window.FixedWindows(5))

Я предполагал, что это приведет к записи в текстовый файл в статическом пятисекундном окне, но это не так.

Fullкод ниже: -

options = PipelineOptions()
options.view_as(StandardOptions).streaming=True

def format_message(message, timestamp=beam.DoFn.TimestampParam):    
    formatted_message = {
        'data': message.data,
        'attributes': str(message.attributes),
        'timestamp': float(timestamp)
    }

    return formatted_message

with beam.Pipeline(options=options) as p:
    (p
    | 'Read From Pub Sub' >> ReadFromPubSub(subscription='projects/[my proj]/subscriptions/[my subscription]',with_attributes=True)
    | 'Window' >> beam.WindowInto(beam.window.FixedWindows(5))
    | 'Map Message' >> beam.Map(format_message)
    | 'write output' >> WriteToText('TestNewPipeline.txt')
    )
result = p.run()

Как и ожидалось, процесс запускается бесконечно и успешно читает сообщения из подписки;однако он записывает их только в временные файлы луча.Кто-нибудь может помочь указать, где я иду не так?

Обновление:

Следуя комментариям Джейсона, я немного изменил конвейер: -

class AddKeyToDict(beam.DoFn):
    def process(self, element):
        return [(element['rownumber'], element)]

    with beam.Pipeline(options=options) as p:
        (p
        | 'Read From Pub Sub' >> ReadFromPubSub(subscription=known_args.input_subscription)# can't make attributes work as yet! ,with_attributes=True) 
        # failed attempt 1| 'Map Message' >> beam.Map(format_message)
        # failed attempt 2| 'Parse JSON' >> beam.Map(format_message_element)
        | 'Parse to Json' >> beam.Map(lambda x: json.loads(x))
        | 'Add key' >> beam.ParDo(AddKeyToDict())
        | 'Window' >> beam.WindowInto(beam.window.FixedWindows(5), trigger=AfterProcessingTime(15), accumulation_mode=AccumulationMode.DISCARDING)
        | 'Group' >> beam.GroupByKey()
        | 'write output' >> WriteToText(known_args.output_file)
        )

Я еще не смог извлечь message_id или опубликованное время из PubSub, поэтому я просто использую число, сгенерированное в моем сообщении.На данный момент я все еще получаю только временные файлы, созданные, и ничего не накапливается в окончательный файл?Начинаю задумываться, не все ли немного не хватает реализации Python, и мне придется взять Java ....

Ответы [ 2 ]

0 голосов
/ 23 июля 2019

Благодаря взаимодействию с парнями из Apache Beam Python потоковая запись в GCS (или локальную файловую систему) еще не поддерживается в Python, поэтому потоковая запись не происходит; в настоящее время поддерживаются только неограниченные цели (например, таблицы больших запросов).

Очевидно, это будет поддерживаться в следующем выпуске Beam для Python v2.14.0.

0 голосов
/ 11 июля 2019

С Документация Apache Beam по ограничениям для окон :

Если вы задаете функцию управления окнами с помощью преобразования Window, каждый элемент присваивается окну, но окна не рассматриваются, пока GroupByKey или Combine не агрегируют по окну и ключу.

Поскольку в этом примере, кажется, нет понятия ключей, можете ли вы попробовать Combine?

...