Получение сообщений из pubsub и последующее сохранение их в почасовых или других интервальных файлах в gcs не работает.Работа записывает файлы только когда я закрываю работу.Кто-нибудь может указать мне правильное направление?
topic = 'test.txt'
jobname = 'streaming-' + topic.replace('.', '-')
input_topic= 'projects/PROJECT/topics/' + topic
u = Utils()
parsed_schema = u.get_parsed_avro_from_schema_service(
schema_name=topic,
schema_repo_url='localhost'
)
p = beam.Pipeline(options=pipelineoptions)
messages = p | 'Read from topic: ' + topic >> ReadFromPubSub(topic=input_topic).with_input_types(bytes)
windowed_lines = (
messages
| 'decode' >> beam.ParDo(DecodeAvro(), parsed_schema)
| beam.WindowInto(
window.FixedWindows(60),
trigger=AfterWatermark(),
accumulation_mode=AccumulationMode.DISCARDING
)
)
output = windowed_lines | 'write result' >> WriteToAvro(
file_path_prefix='gs://BUCKET/streaming/tests/',
shard_name_template=topic.split('.')[0] + '_' + str(uuid.uuid4()) + '_SSSS-of-NNNN',
schema=parsed_schema,
file_name_suffix='.avro',
num_shards=2
)
result = p.run()
result.wait_until_finish()