Другой пользователь дал хороший ответ, который говорит вам, как делать то, что вы хотите, однако, если я правильно понимаю вашу проблему, я думаю, что могу рекомендовать более чистый подход.
Если предположить, что верно следующее, то Вы хотите:
- Принять сообщение Pub / Sub в начале вашего конвейера
- Обработать сообщение, поместив его в большое окно сообщений
- Запись каждое окно сообщений в корзину GCS в виде файла
- Помимо окон, описанных выше, обрабатывайте каждую строку по-другому
Затем вы можете вместо этого создать один конвейер, который просто разветвляется после шага «принять публикацию / подписку». Поток данных поддерживает это изначально очень хорошо. Вы бы сохранили ссылку на объект PCollection
, возвращаемый при использовании приемника Pub / Sub в начале вашего конвейера. Затем вы должны применить несколько цепочек DoFn
реализаций et c к этой единственной ссылке. Вы сможете выполнять управление окнами с записью в GCS, как сейчас, плюс обрабатывать каждое отдельное сообщение любым удобным для вас способом.
Это может выглядеть так:
Pipeline pipeline = Pipeline.create(options);
PCollection<String> messages = pipeline.apply("Read from Pub/Sub", PubsubIO.readStrings().fromTopic("my_topic_name));
// Current pipeline fork for windowing into GCS
messages.apply("Handle for GCS", ...);
// New fork for more handling
messages.apply("More stuff", ...);