вывод pubsub для запуска пакетного конвейера - PullRequest
0 голосов
/ 09 декабря 2018

Я создал java-конвейеры, где один конвейер должен читать сообщения от подписчика темы, и, если, например, строка «Start» обнаружила, выполнить другой конвейер в той же программе, которая читает файл csv, поиск в хранилище данных, шифрует данные изаписывает в вывод CSV.

В этом процессе из-за каких-либо условий я не могу передать выходные данные конвейера pubsub в качестве начальной точки пакетного конвейера.

======================= Эта программа прекращает работу и не запускается.Хотя ошибки нет ...

Если я уберу условие if проверки вывода конвейера pubsub, то есть ниже единицы, тогда поток данных покажет 2 конвейера, один для pubsub и другой для обработки файлов.Конвейер обработки файлов работает даже без какого-либо сообщения для публикации подтемы, и ничего не происходит, если дать другой .. короче говоря, не в состоянии запустить пакетный конвейер только после завершения pubsub конвейера.Пожалуйста, помогите.

Фрагмент кода, как показано ниже:

PCollection<String> pubsubPipeline = 
p.apply(PubsubIO.readStrings().fromTopic(myTpoic))
            .apply("window",
                    Window.into(SlidingWindows//
                            .of(Duration.standardSeconds(30))//
                            .every(Duration.standardSeconds(30)))) //
            .apply("WordsPerLine", ParDo.of(new DoFn<String, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws 
Exception {
                    String s = c.element();
                    final String start = "Start";
                    if (start.equals(s)) {
                        c.output(s.toString());
                    } else {
                        LOG.info("Start not found");
                        return;
 //                     throw new Exception();
                    }
                }
            }));
    String start = "Start";
//      String stsubs = pubsubPipeline.toString();
    if (start.equals(pubSubPipeline))
            {
            LOG.info("Come in if condition");   

    LOG.info("Reading input file");
    PCollection<String> lines = p.apply("Read 
  File",TextIO.read().from(input));
    LOG.info("Lookup in the datastore");
    PCollection<HashMap<String, List<Entity>>> entitySet = 
  lines.apply("Query", ParDo.of(new DoFn<String, HashMap<String, 
  List<Entity>>>() {
        @ProcessElement
  ::
  :
  :
    PCollection<String> output = userSet.apply("Print Entity", 
  ParDo.of(new DoFn<User, String>() {
        @ProcessElement
        public void processElement(ProcessContext c) throws Exception {
            User user = c.element();
            if (user != null && user.getEmail() != null && 
  user.getEmail().equals(user.getEncryptedEmail())) {
                user.setEncryptedEmail(null);
            }
            c.output(user.toString());
        }
    }));


    output.apply(TextIO.write().withHeader("User Id,Email,Encrypted Email").to(outputPrefix).withSuffix(".csv").withoutSharding());
    p.run().waitUntilFinish();

            }   

    else
        LOG.info("comes in else condition");
    return;
  }
...