У меня есть случай использования, когда я инициализирую HashMap, который содержит набор данных поиска (информацию о физическом местоположении и т. Д. Устройств IoT).Эти данные поиска служат в качестве справочных данных для второго набора данных, который представляет собой PCollection.Эта коллекция ПК представляет собой поток данных, который предоставляет данные, которые записывают устройства IoT.Поток данных с устройств IoT использует конвейер Apache Beam, который работает как поток данных Google с использованием Google Cloud pub / sub.
Когда я обрабатываю PCollection (данные устройства), я связываю данные Google Pub pub / sub.к соответствующей записи поиска в HashMap.
Мне нужно обновить HashMap, основываясь на 2-ом pub / sub, который отправляет изменения в свои данные.Вот как я получаю PCollection и выполняю поиск с использованием HashMap:
HashMap -> содержит предварительно загруженные данные поиска (информация об устройствах IoT)
PCollection -> содержит данные из конвейерного потока данных (данные, записанные устройствами IoT)
Я создаю HashMap для данных поиска устройств IoT в виде одиночного:
public class MyData {
private static final MyData instance = new MyData ();
private MyData () {
HashMap myDataMap = new HashMap<String, String>();
... logic to populate the map
this.referenceData = myDataMap;
}
public HashMap<Integer, DeviceReference> referenceData;
public static DeviceData getInstance(){
return instance;
}
}
Затем я использую HashMap в другом классе, где я подписываюсь на обновления данных (это сообщения, которые, например, дают мне новые данные, которые относятся к сущностям, уже сохраненным в HashMap).Я подписываюсь на изменения, используя паб / саб Google с Apache beam:
HashMap<String, String> referenceData = MyData.getInstance().referenceData;
Pipeline pipeLine = Pipeline.create(options);
// subscribe to changes in data
org.apache.beam.sdk.values.PCollection myDataUpdates;
myDataUpdates = pipeLine.begin()
.apply(String.format("Subscribe to data updates"),
PubsubIO.readStrings().fromTopic(
String.format("myPubSubPath")));
Я хочу эффективно применить обновления данных к одноэлементной HashMap (т.е. манипулировать HashMap на основе моих данныхподписка).Как я могу это сделать?
У меня ограниченное понимание Apache Beam, и я знаю только, как выполнять преобразования в данных конвейера для создания другого отдельного PCollection
.Я думаю, что в этом смысл Beam, что он предназначен для преобразования больших наборов данных в другую форму.Есть ли способ достижения того, что мне нужно ( обновление набора данных на основе подписки pub / sub) с использованием Apache Beam, или есть другой способ обновления HashMap с использованием pub / sub?(Я не могу опрашивать данные, так как это создает слишком большую задержку и стоимость, мне нужно обновить HashMap с помощью подписки).
В облачных документах Google напрямую отображается подписка на паб / саб Google Cloud, который не связан с конвейером Apache Beam .Это многообещающее потенциальное решение, и оно зависит от следующей зависимости Maven:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>1.53.0</version>
</dependency>
Я получаю конфликт, который конфликтует со следующими зависимостями Maven для Apache Beam:
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>2.5.0</version>
</dependency>
Проблема описана в отдельном вопросе здесь - Конфликт Maven в Java-приложении с зависимостью google-cloud-core-grpc .Из того, что я вижу, кажется, что не имеет значения, какую версию артефакта google-cloud-pubsub
Maven я использую, поскольку из того, что я понял, выглядит как луч v.2.5.0зависимость и ниже всегда будут конфликтовать с любой текущей версией зависимости от Google.
(я поднимал это как проблему в Beam Jira - https://issues.apache.org/jira/browse/BEAM-6118)
В настоящее время я изучаю побочные вводы и combine
как способ достижения обновления HashMap:
https://www.programcreek.com/java-api-examples/?api=org.apache.beam.sdk.transforms.Combine
Пример 10 показывает, как можно .getSideInputsMap()
применяется к payload
. Мне интересно, могу ли я как-то применить это к своей подписке на изменения поисковых данных. Если я получу PCollection
, как это, я не могу напрямую связать .getSideInputsMap()
с PCollection
deviceReferenceDataUpdates = pipeLine.begin()
.apply("Get changes to the IoT device lookup data"),
PubsubIO.readMessages().fromTopic("IoT device lookup data")).
Я задал отдельный вопрос, конкретно о том, как я могу использовать .getSideInputsMap()
- Apache Beam - как я могу применить .getSideInputsMap к подписке на паб / подписчик Google