Я получаю PCollection
, подписавшись на паб / саб Google в Apache Beam следующим образом:
deviceReferenceDataUpdates = pipeLine.begin()
.apply("subscribe to published data"),
PubsubIO.readMessages().fromTopic("my_data"))
Я хочу использовать данные для обновления HashMap
других существующих данных.в том же классе (HashMap
напрямую доступен в той же области, что и PCollection
).Я исследую, как я могу использовать метод .getSideInputsMap()
в Apache Beam для достижения этой цели.Вот пример того, как getSideInputsMap()
может использоваться для помещения элементов в новый HashMap:
public Map<TupleTag<?>, PValue> getAdditionalInputs() {
Map<TupleTag<?>, PValue> additionalInputs = new HashMap<>();
for (Map.Entry<String, SideInput> sideInputEntry : payload.getSideInputsMap().entrySet()) {
try {
additionalInputs.put(
new TupleTag<>(sideInputEntry.getKey()),
rehydratedComponents.getPCollection(
protoTransform.getInputsOrThrow(sideInputEntry.getKey())));
} catch {
.... error handling
(взято из примера 10 здесь - https://www.programcreek.com/java-api-examples/?api=org.apache.beam.sdk.transforms.Combine)
Я не уверен, как (или если) я могу использовать getSideInputsMap()
до update my HashMap
. Он не доступен как прямой метод моего PCollection
. В приведенном выше примере он применяетсяна payload
, и я не уверен, нужно ли мне каким-то образом генерировать payload
, или мне нужно сгенерировать что-то еще, к чему я могу позвонить getSideInputsMap()
?