Как изменить время события в Apache Beam? - PullRequest
0 голосов
/ 19 сентября 2019

Я принимаю данные (файл json) через Pub / sub. Поэтому мое время события по умолчанию - это время публикации в теме.Я хочу форсировать время события и изменить его.Я добавил поле даты и времени в мои данные.
Я хочу выполнить агрегацию и комбинирование в соответствии с новыми полями меток времени в моем файле json.

Ps: поле называется «метка времени» и является строкой,вот почему я конвертирую его в datetime, а затем в метку времени в потоке данных

def get_timestamp(data):
    my_date = (data['timestamp']) # date : 2010-09-18......string
    times = datetime.fromisoformat(my_date) #type: datetime.datetime
    return beam.window.TimestampedValue(data, datetime.timestamp(times))

Позже я вызову функцию в моем конвейере, прежде чем я сделаю Windowing

Я получу свои данные из pubsub:

lines = p | 'receive_data' >> beam.io.ReadFromPubSub(
        subscription=known_args.in_topic).with_input_types(str) 
        | 'decode' >> beam.Map(lambda x: x.decode('utf-8')) 
        | 'jsonload' >> beam.Map(lambda x: json.loads(x))

, а затем выполните мою обработку:

 (lines |'timestamp' >> beam.Map(get_timestamp)
           | 'print timestamp' >> beam.ParDo(PrintFn2())
           | 'window' >> beam.WindowInto(
            window.FixedWindows(10),
            trigger=trigger.AfterWatermark(),
            accumulation_mode=trigger.AccumulationMode.ACCUMULATING
        )
        | 'CountGlobally' >> beam.CombineGlobally(
                beam.combiners.CountCombineFn()
            ).without_defaults() 
    )

1 Ответ

0 голосов
/ 20 сентября 2019

При чтении из PubSub лучший способ установить EvenTime для элемента - использовать

Java withTimestampAttribute

Python timestamp_attribute

Это установит метку времени для элементов и обеспечит правильность данных для сигналов водяного знака .

Если это не вариант, вы можете изменить временную метку для элемента в DoFn согласно Добавление временных меток к PCollection .Однако этот метод не позволяет устанавливать временные метки <затем текущий водяной знак.Вот почему метод withTimestampAttribute является лучшим способом решения этого шаблона. </p>

...