В пакетном конвейере как назначить временные метки для данных из пакетных источников, например, для файлов CSV в конвейере Beam - PullRequest
0 голосов
/ 01 февраля 2019

Я читаю данные из ограниченного источника, файла csv, в пакетном конвейере и хотел бы назначить временную метку для элементов на основе данных, хранящихся в виде столбца в файле csv.Как мне сделать это в конвейере Apache Beam?

1 Ответ

0 голосов
/ 01 февраля 2019

Если ваш пакетный источник данных содержит основанную на событии временную метку для каждого элемента, например, у вас есть событие click с кортежем {'timestamp, 'userid','ClickedSomething'}.Вы можете назначить временную метку для элемента в пределах DoFn в вашем конвейере.

Java:

public void process(ProcessContext c){
     c.outputWithTimestamp(
         c.element(), 
         new Instant(c.element().getTimestamp()));
}

Python:

'AddEventTimestamps' >> beam.Map(
            lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))

[Редактировать не-лямбда-пример Python из руководства Beam:]

class AddTimestampDoFn(beam.DoFn):

  def process(self, element):
    # Extract the numeric Unix seconds-since-epoch timestamp to be
    # associated with the current log entry.
    unix_timestamp = extract_timestamp_from_log_entry(element)
    # Wrap and emit the current entry and new timestamp in a
    # TimestampedValue.
    yield beam.window.TimestampedValue(element, unix_timestamp)

timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn())

[Редактировать согласно комментарию Антона] Дополнительную информацию можно найти @

https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...