Работая с Google Cloud Dataflow, я столкнулся со странным поведением.Из экземпляров Runnable
/ Callable
/ etc, выполняющихся в других потоках, ни один из выходных данных журнала не отображается в консоли Stackdriver.При тестировании того же кода с использованием «прямого запуска» ожидаемое ведение журнала происходит очень хорошо.
Я не вижу никакой документации, предлагающей ведение журнала из других потоков не должно работать , не так ли?что-то не поддерживается Dataflow?Если он поддерживается, как его использовать или что я могу делать неправильно?
Пример кода (предупреждение: выдает кучу строк на консоль):
TestApp.java
package myapp;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
// basic pipeline setup...
public class TestApp {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
final Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
.apply("Processing", new TestAppProcess());
p.run();
}
}
TestAppProcess.java
package myapp;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
public class TestAppProcess extends PTransform<PCollection<String>, PCollection<String>> {
private static class ProcessFn extends DoFn<String, String> {
private static final org.slf4j.Logger log =
org.slf4j.LoggerFactory.getLogger(TestAppProcess.class);
@ProcessElement
public void processElement(ProcessContext pc) throws InterruptedException {
// This appears when using direct-runner
// This appears in the Stackdriver console
Runnable r1 = () -> log.info("Hello world from r1");
r1.run();
// This appears when using direct-runner
// This DOESN'T appear in the Stackdriver console
Runnable r2 = () -> log.info("Hello world from r2");
Thread t = new Thread(r2);
t.start();
t.join();
}
}
@Override
public PCollection<String> expand(PCollection<String> input) {
return input.apply(ParDo.of(new ProcessFn()));
}
}