Я создал java-конвейеры, где один конвейер должен читать сообщения от подписчика темы, и, если, например, строка «Start» обнаружила, выполнить другой конвейер в той же программе, которая читает файл csv, поиск в хранилище данных, шифрует данные изаписывает в вывод CSV.
В этом процессе из-за каких-либо условий я не могу передать выходные данные конвейера pubsub в качестве начальной точки пакетного конвейера.
======================= Эта программа прекращает работу и не запускается.Хотя ошибки нет ...
Если я уберу условие if проверки вывода конвейера pubsub, то есть ниже единицы, тогда поток данных покажет 2 конвейера, один для pubsub и другой для обработки файлов.Конвейер обработки файлов работает даже без какого-либо сообщения для публикации подтемы, и ничего не происходит, если дать другой .. короче говоря, не в состоянии запустить пакетный конвейер только после завершения pubsub конвейера.Пожалуйста, помогите.
Фрагмент кода, как показано ниже:
PCollection<String> pubsubPipeline =
p.apply(PubsubIO.readStrings().fromTopic(myTpoic))
.apply("window",
Window.into(SlidingWindows//
.of(Duration.standardSeconds(30))//
.every(Duration.standardSeconds(30)))) //
.apply("WordsPerLine", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) throws
Exception {
String s = c.element();
final String start = "Start";
if (start.equals(s)) {
c.output(s.toString());
} else {
LOG.info("Start not found");
return;
// throw new Exception();
}
}
}));
String start = "Start";
// String stsubs = pubsubPipeline.toString();
if (start.equals(pubSubPipeline))
{
LOG.info("Come in if condition");
LOG.info("Reading input file");
PCollection<String> lines = p.apply("Read
File",TextIO.read().from(input));
LOG.info("Lookup in the datastore");
PCollection<HashMap<String, List<Entity>>> entitySet =
lines.apply("Query", ParDo.of(new DoFn<String, HashMap<String,
List<Entity>>>() {
@ProcessElement
::
:
:
PCollection<String> output = userSet.apply("Print Entity",
ParDo.of(new DoFn<User, String>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
User user = c.element();
if (user != null && user.getEmail() != null &&
user.getEmail().equals(user.getEncryptedEmail())) {
user.setEncryptedEmail(null);
}
c.output(user.toString());
}
}));
output.apply(TextIO.write().withHeader("User Id,Email,Encrypted Email").to(outputPrefix).withSuffix(".csv").withoutSharding());
p.run().waitUntilFinish();
}
else
LOG.info("comes in else condition");
return;
}