Как читать сообщения журнала для функции CombineFn в GCP Dataflow? - PullRequest
0 голосов
/ 07 октября 2019

Я создаю конвейер потоковой обработки Apache Beam для запуска в GCP Dataflow. У меня есть ряд преобразований, расширяющих DoFn и CombineFn. В DoFn журналы хорошо визуализируются с помощью окна LOGS в деталях задания Dataflow. Однако журналы от преобразований CombineFn не показаны.

Я пробовал разные уровни журналов, и они также хорошо видны при использовании DirectRunner.

Вот пример кода. Для краткости я изменил ввод и вывод на String, в моем коде есть несколько пользовательских классов.

import java.io.Serializable;
import org.apache.avro.reflect.Nullable;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AverageSpv extends CombineFn<String, AverageSpv.Accum, String> {
    private static final Logger LOG = LoggerFactory.getLogger(AverageSpv.class);

    @DefaultCoder(AvroCoder.class)
    public static class Accum implements Serializable {
        @Nullable String id;
    }

    @Override
    public Accum createAccumulator() {
        return new Accum();
    }

    @Override
    public Accum addInput(Accum accumulator, String input) {
        LOG.info("Add input: id {}, input);

        accumulator.id = input;

        return accumulator;
    }

    @Override
    public Accum mergeAccumulators(Iterable<Accum> accumulators) {
        LOG.info("Merging accumulator");

        Accum merged = createAccumulator();
        for (Accum accumulator : accumulators) {
            merged.id = accumulator.id;            
        }

        return merged;
    }

    @Override
    public VehicleSpeedPerSegmentInfo extractOutput(Accum accumulator) {
        LOG.info("Extracting accumulator");

        LOG.info("Extract output: id {}", acummulator.id);

        return acummulator.id;
    }
}

1 Ответ

0 голосов
/ 10 октября 2019

Операции Apache Beam CombineFn выполняются в несколько этапов в потоке данных. (В частности, как много предварительного объединения происходит перед перетасовкой всех результатов в один ключ, а затем все исходные результаты объединяются в конечный результат на последующем шаге после GBK.) Тот факт, что нет единственного «шага» выполненияСоответствующий первоначальный шаг объединения на графике, вероятно, мешает найти логи.

Это ошибка, которую следует исправить. Как уже упоминалось, обходной путь заключается в просмотре всех журналов из конвейера.

...