Я создаю конвейер потоковой обработки Apache Beam для запуска в GCP Dataflow. У меня есть ряд преобразований, расширяющих DoFn и CombineFn. В DoFn журналы хорошо визуализируются с помощью окна LOGS в деталях задания Dataflow. Однако журналы от преобразований CombineFn не показаны.
Я пробовал разные уровни журналов, и они также хорошо видны при использовании DirectRunner.
Вот пример кода. Для краткости я изменил ввод и вывод на String, в моем коде есть несколько пользовательских классов.
import java.io.Serializable;
import org.apache.avro.reflect.Nullable;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AverageSpv extends CombineFn<String, AverageSpv.Accum, String> {
private static final Logger LOG = LoggerFactory.getLogger(AverageSpv.class);
@DefaultCoder(AvroCoder.class)
public static class Accum implements Serializable {
@Nullable String id;
}
@Override
public Accum createAccumulator() {
return new Accum();
}
@Override
public Accum addInput(Accum accumulator, String input) {
LOG.info("Add input: id {}, input);
accumulator.id = input;
return accumulator;
}
@Override
public Accum mergeAccumulators(Iterable<Accum> accumulators) {
LOG.info("Merging accumulator");
Accum merged = createAccumulator();
for (Accum accumulator : accumulators) {
merged.id = accumulator.id;
}
return merged;
}
@Override
public VehicleSpeedPerSegmentInfo extractOutput(Accum accumulator) {
LOG.info("Extracting accumulator");
LOG.info("Extract output: id {}", acummulator.id);
return acummulator.id;
}
}