Можем ли мы прочитать фактический контент или сообщение, опубликованное в Pub / Sub, из этого PubsubIO.readStrings (). FromTopi c (options.getInputTopi c ()) - PullRequest
0 голосов
/ 31 марта 2020

Можем ли мы прочитать фактический контент или сообщение, опубликованное в Pub / Sub, из этого?

PubsubIO.readStrings().fromTopic(options.getInputTopic())

Я пытаюсь прочитать фактический контент из PubsubIO.Read или могу сказать сообщение, опубликованное в моем топи c Теперь я хочу отправить строку сообщения на другой сервер.

Проблема в том, что мой код, который отправляет данные на сервер, может выполняться только при запуске команды сборки, а не когда сообщение публикуется в моем topi c в Pub / Sub , Поскольку код может выполняться только при запуске команды сборки, у меня есть данные PubsubIO.Read на моем сервере, очевидно, это потому, что я преобразовал объект в строку.

Скажите, пожалуйста, каким образом я могу отправить собственное сообщение, опубликованное на PubSub, и каким образом можно развернуть объект PubsubIO.Read.

public class CustomPubsubToText {
 /**
     * Options supported by the pipeline.
     *
     * <p>
     * Inherits standard configuration options.
     * </p>
     */
    public interface Options extends PipelineOptions, StreamingOptions {
    @Description("The Cloud Pub/Sub topic to read from.")
    @Required
    ValueProvider<String> getInputTopic();

    void setInputTopic(ValueProvider<String> value);

    @Description("The directory to output files to. Must end with a slash.")
    @Required
    ValueProvider<String> getOutputDirectory();

    void setOutputDirectory(ValueProvider<String> value);

    @Description("The filename prefix of the files to write to.")
    @Default.String("output")
    @Required
    ValueProvider<String> getOutputFilenamePrefix();

    void setOutputFilenamePrefix(ValueProvider<String> value);

    @Description("The suffix of the files to write.")
    @Default.String("")
    ValueProvider<String> getOutputFilenameSuffix();


    void setOutputFilenameSuffix(ValueProvider<String> value);

    @Description("The shard template of the output file. Specified as repeating sequences "
        + "of the letters 'S' or 'N' (example: SSS-NNN). These are replaced with the "
        + "shard number, or number of shards respectively")
    @Default.String("W-P-SS-of-NN")
    ValueProvider<String> getOutputShardTemplate();

    void setOutputShardTemplate(ValueProvider<String> value);

    @Description("The maximum number of output shards produced when writing.")
    @Default.Integer(1)
    Integer getNumShards();

    void setNumShards(Integer value);

    @Description("The window duration in which data will be written. Defaults to 5m. "
        + "Allowed formats are: "
        + "Ns (for seconds, example: 5s), "
        + "Nm (for minutes, example: 12m), "
        + "Nh (for hours, example: 2h).")
    @Default.String("5m")
    String getWindowDuration();

    void setWindowDuration(String value);
    }

    /**
     * Main entry point for executing the pipeline.
     * @param args  The command-line arguments to the pipeline.
     */


  public static void main(String[] args) {

    Options options = PipelineOptionsFactory
        .fromArgs(args)
        .withValidation()
        .as(Options.class);

    options.setStreaming(true);
    run(options);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return  The result of the pipeline execution.
   */
  public static PipelineResult run(Options options) {

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);
       try{
          /* sendData(PubsubIO.readStrings().fromTopic(options.getInputTopic()).toString());
          Logger.getLogger(PubsubToText.class.getName());*/

        OkHttpClient client = new OkHttpClient();
        MediaType mediaType = MediaType.parse("application/json");
        RequestBody body = RequestBody.create(mediaType,  PubsubIO.readStrings().fromTopic(options.getInputTopic()));
        Request request = new Request.Builder()
        .url("server-url")
        .post(body)
        .addHeader("cache-control", "no-cache")
        .build();
        Response response = client.newCall(request).execute();
       } catch(Exception e){
           Logger.getLogger(PubsubToText.class.getName());
           System.out.print("error");
       }     

    /*
     * Steps:
     *   1) Read string messages from PubSub
     *   2) Window the messages into minute intervals specified by the executor.
     *   3) Output the windowed files to GCS
     */

    pipeline
        .apply("Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
        .apply(
            options.getWindowDuration() + " Window",
            Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))

            // Apply windowed file writes. Use a NestedValueProvider because the filename
                // policy requires a resource generated from the input value at runtime.

                // call to the 
        .apply(
            "Write File(s)",
            TextIO.write()
                .withWindowedWrites()
                .withNumShards(options.getNumShards())
                .to(
                    new WindowedFilenamePolicy(
                        options.getOutputDirectory(),
                        options.getOutputFilenamePrefix(),
                        options.getOutputShardTemplate(),
                        options.getOutputFilenameSuffix()))
                .withTempDirectory(NestedValueProvider.of(
                    options.getOutputDirectory(),
                    (SerializableFunction<String, ResourceId>) input ->
                        FileBasedSink.convertToFileResourceIfPossible(input))));

    // Execute the pipeline and return the result.
    return pipeline.run();
  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...