Ведение журнала из других потоков в облачном потоке данных Google - PullRequest
0 голосов
/ 04 марта 2019

Работая с Google Cloud Dataflow, я столкнулся со странным поведением.Из экземпляров Runnable / Callable / etc, выполняющихся в других потоках, ни один из выходных данных журнала не отображается в консоли Stackdriver.При тестировании того же кода с использованием «прямого запуска» ожидаемое ведение журнала происходит очень хорошо.

Я не вижу никакой документации, предлагающей ведение журнала из других потоков не должно работать , не так ли?что-то не поддерживается Dataflow?Если он поддерживается, как его использовать или что я могу делать неправильно?

Пример кода (предупреждение: выдает кучу строк на консоль):

TestApp.java

package myapp;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

// basic pipeline setup...
public class TestApp {
  public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.create();

    final Pipeline p = Pipeline.create(options);

    p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
      .apply("Processing", new TestAppProcess());

    p.run();
  }
}

TestAppProcess.java

package myapp;

import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;

public class TestAppProcess extends PTransform<PCollection<String>, PCollection<String>> {
  private static class ProcessFn extends DoFn<String, String> {
    private static final org.slf4j.Logger log =
      org.slf4j.LoggerFactory.getLogger(TestAppProcess.class);

    @ProcessElement
    public void processElement(ProcessContext pc) throws InterruptedException {
      // This appears when using direct-runner
      // This appears in the Stackdriver console
      Runnable r1 = () -> log.info("Hello world from r1");
      r1.run();

      // This appears when using direct-runner
      // This DOESN'T appear in the Stackdriver console
      Runnable r2 = () -> log.info("Hello world from r2");
      Thread t = new Thread(r2);

      t.start();
      t.join();
    }
  }

  @Override
  public PCollection<String> expand(PCollection<String> input) {
    return input.apply(ParDo.of(new ProcessFn()));
  }
}
...