У меня есть конвейер, который получает некоторые данные из sub pub, выполняет некоторую обработку, и мне нужно обработать все данные в Bigtable на основе результата этой обработки.
Например, у меня есть паб-сообщение типа: {clientId: 10}
, поэтому мне нужно прочитать из Bigtable все данные для clientId 10 (я знаю, как создать сканирование на основе clientId). Проблема состоит в том, что оба чтения, которые мы имеем на данный момент для Bigtable (BigtableIO и CloudBigtableIO), основаны на том факте, что конвейер начинается с bigtable, поэтому я не могу (или не смог найти способ) использовать их в середине трубопровод. Как мне добиться этого случая?
Простой псевдоподобный код:
Pipeline p = Pipeline.create(...)
p.apply(PubsubIO.readMessagesWithAttributes ...)
.apply( PubsubMessageToScans()) // I know how to do this
.apply( ReadBigTable()) // How to do this?