Поле процесса с отметкой времени в потоке данных - PullRequest
1 голос
/ 26 марта 2019

Я получаю сообщения из Google Cloud Pub / Sub в следующем формате:

{u'date': u'2019-03-26T09:57:52Z', 'field1': value1, u'field2': u'value2', u'field3': u'value3', u'field4': u'value4',...}

И мне бы хотелось, чтобы эти сообщения обрабатывались в конвейере с окном:

| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 10))

Поле 'дата' будет обрабатываться как эталонная временная метка окна.

Нужно ли мне использовать WindowFn или Как это сделать?

1 Ответ

1 голос
/ 27 марта 2019

Вам нужно указать свою временную метку так:

def custom_timestamp(message):
    # assuming that message is already parsed JSON (dict)
    import datetime as dt
    import apache_beam as beam
    ts = dt.datetime.strptime(message["date"], "%Y-%m-%dT%H:%M:%SZ")
    return beam.window.TimestampedValue(message, ts.timestamp())

и затем:

| 'CustomTimestamp' >> beam.Map(custom_timestamp)
| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 10))

Вы можете найти полную информацию здесь: https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements

Однако вы должны заметить, что Streaming Python SDK для Apache Beam содержит много недостающих частей, и некоторые вещи не работают так, как вы могли ожидать. Я хотел реализовать тот же случай, что и у вас, и после добавления пользовательских временных меток DataFlow Runner отбросил мои сообщения из-за того, что они называют dropDueToLateness . Я все еще не уверен, возможно ли установить системный водяной знак для обработки исторических данных, используя PubSub и Python.

...