Можем ли мы прочитать фактический контент или сообщение, опубликованное в Pub / Sub, из этого?
PubsubIO.readStrings().fromTopic(options.getInputTopic())
Я пытаюсь прочитать фактический контент из PubsubIO.Read
или могу сказать сообщение, опубликованное в моем топи c Теперь я хочу отправить строку сообщения на другой сервер.
Проблема в том, что мой код, который отправляет данные на сервер, может выполняться только при запуске команды сборки, а не когда сообщение публикуется в моем topi c в Pub / Sub , Поскольку код может выполняться только при запуске команды сборки, у меня есть данные PubsubIO.Read
на моем сервере, очевидно, это потому, что я преобразовал объект в строку.
Скажите, пожалуйста, каким образом я могу отправить собственное сообщение, опубликованное на PubSub, и каким образом можно развернуть объект PubsubIO.Read
.
public class CustomPubsubToText {
/**
* Options supported by the pipeline.
*
* <p>
* Inherits standard configuration options.
* </p>
*/
public interface Options extends PipelineOptions, StreamingOptions {
@Description("The Cloud Pub/Sub topic to read from.")
@Required
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
@Description("The directory to output files to. Must end with a slash.")
@Required
ValueProvider<String> getOutputDirectory();
void setOutputDirectory(ValueProvider<String> value);
@Description("The filename prefix of the files to write to.")
@Default.String("output")
@Required
ValueProvider<String> getOutputFilenamePrefix();
void setOutputFilenamePrefix(ValueProvider<String> value);
@Description("The suffix of the files to write.")
@Default.String("")
ValueProvider<String> getOutputFilenameSuffix();
void setOutputFilenameSuffix(ValueProvider<String> value);
@Description("The shard template of the output file. Specified as repeating sequences "
+ "of the letters 'S' or 'N' (example: SSS-NNN). These are replaced with the "
+ "shard number, or number of shards respectively")
@Default.String("W-P-SS-of-NN")
ValueProvider<String> getOutputShardTemplate();
void setOutputShardTemplate(ValueProvider<String> value);
@Description("The maximum number of output shards produced when writing.")
@Default.Integer(1)
Integer getNumShards();
void setNumShards(Integer value);
@Description("The window duration in which data will be written. Defaults to 5m. "
+ "Allowed formats are: "
+ "Ns (for seconds, example: 5s), "
+ "Nm (for minutes, example: 12m), "
+ "Nh (for hours, example: 2h).")
@Default.String("5m")
String getWindowDuration();
void setWindowDuration(String value);
}
/**
* Main entry point for executing the pipeline.
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(Options.class);
options.setStreaming(true);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
try{
/* sendData(PubsubIO.readStrings().fromTopic(options.getInputTopic()).toString());
Logger.getLogger(PubsubToText.class.getName());*/
OkHttpClient client = new OkHttpClient();
MediaType mediaType = MediaType.parse("application/json");
RequestBody body = RequestBody.create(mediaType, PubsubIO.readStrings().fromTopic(options.getInputTopic()));
Request request = new Request.Builder()
.url("server-url")
.post(body)
.addHeader("cache-control", "no-cache")
.build();
Response response = client.newCall(request).execute();
} catch(Exception e){
Logger.getLogger(PubsubToText.class.getName());
System.out.print("error");
}
/*
* Steps:
* 1) Read string messages from PubSub
* 2) Window the messages into minute intervals specified by the executor.
* 3) Output the windowed files to GCS
*/
pipeline
.apply("Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
.apply(
options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
// Apply windowed file writes. Use a NestedValueProvider because the filename
// policy requires a resource generated from the input value at runtime.
// call to the
.apply(
"Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(
new WindowedFilenamePolicy(
options.getOutputDirectory(),
options.getOutputFilenamePrefix(),
options.getOutputShardTemplate(),
options.getOutputFilenameSuffix()))
.withTempDirectory(NestedValueProvider.of(
options.getOutputDirectory(),
(SerializableFunction<String, ResourceId>) input ->
FileBasedSink.convertToFileResourceIfPossible(input))));
// Execute the pipeline and return the result.
return pipeline.run();
}
}