Apache Beam - как я могу применить .getSideInputsMap к подписке на паб / саб Google? - PullRequest
0 голосов
/ 26 ноября 2018

Я получаю PCollection, подписавшись на паб / саб Google в Apache Beam следующим образом:

deviceReferenceDataUpdates = pipeLine.begin()
    .apply("subscribe to published data"),
         PubsubIO.readMessages().fromTopic("my_data"))

Я хочу использовать данные для обновления HashMap других существующих данных.в том же классе (HashMap напрямую доступен в той же области, что и PCollection).Я исследую, как я могу использовать метод .getSideInputsMap() в Apache Beam для достижения этой цели.Вот пример того, как getSideInputsMap() может использоваться для помещения элементов в новый HashMap:

public Map<TupleTag<?>, PValue> getAdditionalInputs() {
  Map<TupleTag<?>, PValue> additionalInputs = new HashMap<>();
  for (Map.Entry<String, SideInput> sideInputEntry : payload.getSideInputsMap().entrySet()) {
try {
  additionalInputs.put(
      new TupleTag<>(sideInputEntry.getKey()),
      rehydratedComponents.getPCollection(
          protoTransform.getInputsOrThrow(sideInputEntry.getKey())));
} catch {

.... error handling

(взято из примера 10 здесь - https://www.programcreek.com/java-api-examples/?api=org.apache.beam.sdk.transforms.Combine)

Я не уверен, как (или если) я могу использовать getSideInputsMap() до update my HashMap. Он не доступен как прямой метод моего PCollection. В приведенном выше примере он применяетсяна payload, и я не уверен, нужно ли мне каким-то образом генерировать payload, или мне нужно сгенерировать что-то еще, к чему я могу позвонить getSideInputsMap()?

...