Я принимаю данные (файл json) через Pub / sub. Поэтому мое время события по умолчанию - это время публикации в теме.Я хочу форсировать время события и изменить его.Я добавил поле даты и времени в мои данные.
Я хочу выполнить агрегацию и комбинирование в соответствии с новыми полями меток времени в моем файле json.
Ps: поле называется «метка времени» и является строкой,вот почему я конвертирую его в datetime, а затем в метку времени в потоке данных
def get_timestamp(data):
my_date = (data['timestamp']) # date : 2010-09-18......string
times = datetime.fromisoformat(my_date) #type: datetime.datetime
return beam.window.TimestampedValue(data, datetime.timestamp(times))
Позже я вызову функцию в моем конвейере, прежде чем я сделаю Windowing
Я получу свои данные из pubsub:
lines = p | 'receive_data' >> beam.io.ReadFromPubSub(
subscription=known_args.in_topic).with_input_types(str)
| 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
| 'jsonload' >> beam.Map(lambda x: json.loads(x))
, а затем выполните мою обработку:
(lines |'timestamp' >> beam.Map(get_timestamp)
| 'print timestamp' >> beam.ParDo(PrintFn2())
| 'window' >> beam.WindowInto(
window.FixedWindows(10),
trigger=trigger.AfterWatermark(),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING
)
| 'CountGlobally' >> beam.CombineGlobally(
beam.combiners.CountCombineFn()
).without_defaults()
)