Как пакетировать N элементов в потоковом конвейере с небольшими связками? - PullRequest
0 голосов
/ 12 июня 2018

Я реализовал пакетирование по N элементам, как описано в этом ответе: Может ли вход хранилища данных в конвейере потока данных Google обрабатываться в пакете из N записей одновременно?



    package com.example.dataflow.transform;

    import com.example.dataflow.event.ClickEvent;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
    import org.joda.time.Instant;

    import java.util.ArrayList;
    import java.util.List;

    public class ClickToClicksPack extends DoFn> {
        public static final int BATCH_SIZE = 10;

        private List accumulator;

        @StartBundle
        public void startBundle() {
            accumulator = new ArrayList(BATCH_SIZE);
        }

        @ProcessElement
        public void processElement(ProcessContext c) {
            ClickEvent clickEvent = c.element();
            accumulator.add(clickEvent);
            if (accumulator.size() >= BATCH_SIZE) {
                c.output(accumulator);
                accumulator = new ArrayList(BATCH_SIZE);
            }
        }

        @FinishBundle
        public void finishBundle(FinishBundleContext c) {
            if (accumulator.size() > 0) {
                ClickEvent clickEvent = accumulator.get(0);
                long time = clickEvent.getClickTimestamp().getTime();

                c.output(accumulator, new Instant(time), GlobalWindow.INSTANCE);
            }
        }
    }


Но когда я запускаю конвейер в потоковом режиме, есть много пакетов только с 1 или 2 элементами.Как я понимаю, это из-за небольших размеров пачек.После одного дня работы среднее количество элементов в пакете составляет примерно 4. Мне действительно нужно, чтобы оно было ближе к 10 для лучшей производительности следующих шагов.

Есть ли способ контролировать размер пакетов?Или я должен использовать преобразование "GroupIntoBatches" для этой цели.В этом случае для меня не ясно, что должно быть выбрано в качестве ключа.

ОБНОВЛЕНИЕ: является ли хорошей идеей использовать идентификатор потока Java или имя хоста ВМ для ключа, чтобы применить преобразование "GroupIntoBatches"?

1 Ответ

0 голосов
/ 14 июня 2018

Я закончил делать составное преобразование с "GroupIntoBatches" внутри.Следующий ответ содержит рекомендации относительно выбора ключа: https://stackoverflow.com/a/44956702/4888849

В моей текущей реализации я использую случайные ключи для достижения параллелизма, и я создаю окна событий для регулярной выдачи результатов, даже если их меньше, чем BATCH_SIZE.события одним ключом.



    package com.example.dataflow.transform;

    import com.example.dataflow.event.ClickEvent;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.GroupIntoBatches;
    import org.apache.beam.sdk.transforms.PTransform;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.transforms.windowing.FixedWindows;
    import org.apache.beam.sdk.transforms.windowing.Window;
    import org.apache.beam.sdk.values.KV;
    import org.apache.beam.sdk.values.PCollection;
    import org.joda.time.Duration;

    import java.util.Random;

    /**
     * Batch clicks into packs of BATCH_SIZE size
     */
    public class ClickToClicksPack extends PTransform, PCollection>> {
        public static final int BATCH_SIZE = 10;
        // Define window duration.
        // After window's end - elements are emitted even if there are less then BATCH_SIZE elements
        public static final int WINDOW_DURATION_SECONDS = 1;
        private static final int DEFAULT_SHARDS_NUMBER = 20;
        // Determine possible parallelism level
        private int shardsNumber = DEFAULT_SHARDS_NUMBER;

        public ClickToClicksPack() {
            super();
        }

        public ClickToClicksPack(int shardsNumber) {
            super();
            this.shardsNumber = shardsNumber;
        }

        @Override
        public PCollection> expand(PCollection input) {
            return input
                    // assign keys, as "GroupIntoBatches" works only with key-value pairs
                    .apply(ParDo.of(new AssignRandomKeys(shardsNumber)))
                    .apply(Window.into(FixedWindows.of(Duration.standardSeconds(WINDOW_DURATION_SECONDS))))
                    .apply(GroupIntoBatches.ofSize(BATCH_SIZE))
                    .apply(ParDo.of(new ExtractValues()));
        }

        /**
         * Assigns to clicks random integer between zero and shardsNumber
         */
        private static class AssignRandomKeys extends DoFn> {
            private int shardsNumber;
            private Random random;

            AssignRandomKeys(int shardsNumber) {
                super();
                this.shardsNumber = shardsNumber;
            }

            @Setup
            public void setup() {
                random = new Random();
            }

            @ProcessElement
            public void processElement(ProcessContext c) {
                ClickEvent clickEvent = c.element();
                KV kv = KV.of(random.nextInt(shardsNumber), clickEvent);
                c.output(kv);
            }
        }

        /**
         * Extract values from KV
         */
        private static class ExtractValues extends DoFn>, Iterable> {
            @ProcessElement
            public void processElement(ProcessContext c) {
                KV> kv = c.element();
                c.output(kv.getValue());
            }
        }
    }


Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...