Время обработки потока луча Apache - PullRequest
0 голосов
/ 09 ноября 2019

Я пытаюсь создать поток обработки событий с использованием Apache Beam.

Шаги, которые происходят в моем потоке:

  1. Чтение тем Кафки в формате avro и десериализация avro с использованиемРеестр схемы
  2. Создать Окно фиксированного размера (1 час) с триггером каждые 10 минут (время обработки)
  3. Записывать файлы avro в разделительные каталоги GCP по названию темы. (filename = schema + start-end-window-pane)

Теперь давайте углубимся в код.

  1. Этот код показывает, как я читаю из Kafka. Я использую пользовательский десериализатор и кодер для правильной десериализации с использованием реестра схемы (в моем случае это hortonworks).
KafkaIO.<String, AvroGenericRecord>read()
               .withBootstrapServers(bootstrapServers)
               .withConsumerConfigUpdates(configUpdates)
               .withTopics(inputTopics)
               .withKeyDeserializer(StringDeserializer.class)
               .withValueDeserializerAndCoder(BeamKafkaAvroGenericDeserializer.class, AvroGenericCoder.of(serDeConfig()))
               .commitOffsetsInFinalize()
               .withoutMetadata();
В конвейере после чтения записей KafkaIO создает управление окнами.
records.apply(Window.<AvroGenericRecord>into(FixedWindows.of(Duration.standardHours(1)))
                .triggering(AfterWatermark.pastEndOfWindow()
                        .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))
                        .withLateFirings(AfterPane.elementCountAtLeast(1))
                )
                .withAllowedLateness(Duration.standardMinutes(5))
                .discardingFiredPanes()
        )

Чего я хочу добиться с помощью этого окна, так это группировать данные по времени события каждые 1 час и триггер каждые 10 минут .

После группировки по окну он начинает запись в Google Cloud Storage (GCS).
public class WriteAvroFilesTr extends PTransform<PCollection<AvroGenericRecord>, WriteFilesResult<AvroDestination>> {
    private String baseDir;
    private int numberOfShards;

    public WriteAvroFilesTr(String baseDir, int numberOfShards) {
        this.baseDir = baseDir;
        this.numberOfShards = numberOfShards;
    }

    @Override
    public WriteFilesResult<AvroDestination> expand(PCollection<AvroGenericRecord> input) {
        ResourceId tempDir = getTempDir(baseDir);

        return input.apply(AvroIO.<AvroGenericRecord>writeCustomTypeToGenericRecords()
                .withTempDirectory(tempDir)
                .withWindowedWrites()
                .withNumShards(numberOfShards)
                .to(new DynamicAvroGenericRecordDestinations(baseDir, Constants.FILE_EXTENSION))
        );
    }

    private ResourceId getTempDir(String baseDir) {
        return FileSystems.matchNewResource(baseDir + "/temp", true);
    }
}

И

public class DynamicAvroGenericRecordDestinations extends DynamicAvroDestinations<AvroGenericRecord, AvroDestination, GenericRecord> {
    private static final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
    private final String baseDir;
    private final String fileExtension;

    public DynamicAvroGenericRecordDestinations(String baseDir, String fileExtension) {
        this.baseDir = baseDir;
        this.fileExtension = fileExtension;
    }

    @Override
    public Schema getSchema(AvroDestination destination) {
        return new Schema.Parser().parse(destination.jsonSchema);
    }

    @Override
    public GenericRecord formatRecord(AvroGenericRecord record) {
        return record.getRecord();
    }

    @Override
    public AvroDestination getDestination(AvroGenericRecord record) {
        Schema schema = record.getRecord().getSchema();
        return AvroDestination.of(record.getName(), record.getDate(), record.getVersionId(), schema.toString());
    }

    @Override
    public AvroDestination getDefaultDestination() {
        return new AvroDestination();
    }

    @Override
    public FileBasedSink.FilenamePolicy getFilenamePolicy(AvroDestination destination) {
        String pathStr = baseDir + "/" + destination.name + "/" + destination.date + "/" + destination.name;
        return new WindowedFilenamePolicy(FileBasedSink.convertToFileResourceIfPossible(pathStr), destination.version, fileExtension);
    }

    private static class WindowedFilenamePolicy extends FileBasedSink.FilenamePolicy {
        final ResourceId outputFilePrefix;
        final String fileExtension;
        final Integer version;

        WindowedFilenamePolicy(ResourceId outputFilePrefix, Integer version, String fileExtension) {
            this.outputFilePrefix = outputFilePrefix;
            this.version = version;
            this.fileExtension = fileExtension;
        }

        @Override
        public ResourceId windowedFilename(
                int shardNumber,
                int numShards,
                BoundedWindow window,
                PaneInfo paneInfo,
                FileBasedSink.OutputFileHints outputFileHints) {

            IntervalWindow intervalWindow = (IntervalWindow) window;

            String filenamePrefix =
                    outputFilePrefix.isDirectory() ? "" : firstNonNull(outputFilePrefix.getFilename(), "");

            String filename =
                    String.format("%s-%s(%s-%s)-(%s-of-%s)%s", filenamePrefix,
                            version,
                            formatter.print(intervalWindow.start()),
                            formatter.print(intervalWindow.end()),
                            shardNumber,
                            numShards - 1,
                            fileExtension);
            ResourceId result = outputFilePrefix.getCurrentDirectory();
            return result.resolve(filename, RESOLVE_FILE);
        }


        @Override
        public ResourceId unwindowedFilename(
                int shardNumber, int numShards, FileBasedSink.OutputFileHints outputFileHints) {
            throw new UnsupportedOperationException("Expecting windowed outputs only");
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            builder.add(
                    DisplayData.item("fileNamePrefix", outputFilePrefix.toString())
                            .withLabel("File Name Prefix"));
        }
    }

}

У меня естьзаписал весь мой трубопровод. Это вроде хорошо работает, но я неправильно понял (не уверен), что я обрабатываю события по времени события.

Может ли кто-нибудь просмотреть мой код (особенно шаги 1 и 2, когда я читаю и группирую по окнам) либоэто окна по времени события или нет?

PS Для каждой записи в Кафке у меня есть поле метки времени внутри.

UPD

Спасибо jjayadeep

Я включаю в KafkaIO пользовательскую TimestampPolicy

static class CustomTimestampPolicy extends TimestampPolicy<String, AvroGenericRecord> {

        protected Instant currentWatermark;

        CustomTimestampPolicy(Optional<Instant> previousWatermark) {
            this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        @Override
        public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<String, AvroGenericRecord> record) {
            currentWatermark = Instant.ofEpochMilli(record.getKV().getValue().getTimestamp());
            return currentWatermark;
        }

        @Override
        public Instant getWatermark(PartitionContext ctx) {
            return currentWatermark;
        }
    }

1 Ответ

1 голос
/ 10 ноября 2019

Из документации, представленной здесь [1], время события используется в качестве времени обработки по умолчанию в KafkaIO

By default, record timestamp (event time) is set to processing time in KafkaIO reader and source watermark is current wall time. If a topic has Kafka server-side ingestion timestamp enabled ('LogAppendTime'), it can enabled with KafkaIO.Read.withLogAppendTime(). A custom timestamp policy can be provided by implementing TimestampPolicyFactory. See KafkaIO.Read.withTimestampPolicyFactory(TimestampPolicyFactory) for more information.

Также время обработки является методом отметки времени по умолчанию, используемым, как описано ниже

// set event times and watermark based on LogAppendTime. To provide a custom
// policy see withTimestampPolicyFactory(). withProcessingTime() is the default.

1 - https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/kafka/KafkaIO.html

...