Потоковое задание Flink не масштабируется должным образом - PullRequest
0 голосов
/ 26 мая 2018

Мы находимся в процессе тестирования способности Flink к масштабированию.Но мы обнаружили, что масштабирование не работает, независимо от увеличения слота или увеличения количества задач диспетчера.Мы ожидаем линейного, если не близкого к линейному масштабирования, но результат даже покажет ухудшение.Ценю любые комментарии.

Детали теста,

-VMWare vsphere

-Просто простой тест,

- auto gen source 3mil records, each 1kb in size, parallelism=1

- source pass into next map operator, which just return the same record, and sent counter to statsD, parallelism is in cases = 2,4,6
  • 3 ТМ, всего 6 слотов (2 / ТМ), каждый JM / TM имеет 32 виртуальных ЦП, 100 ГБ памяти

  • Результат:

    • 2 слота: 26 секунд, 3 мили / 26 = 115 кбит / с

    • 4 слота: 23 секунды, 3 мили / 23 = 130 кбит / с

    • 6 слотов: 22секунд, 3 мили / 22 = 136 тыс. т / с

Как показано, масштабирование почти ничто.Любая подсказка?Спасибо.

Ответы [ 2 ]

0 голосов
/ 26 мая 2018

Вы действительно должны использовать RichParallelSourceFunction .Если вы хотите, чтобы записи из разных экземпляров источника отличались друг от друга, вы можете получить индекс каждого экземпляра из RuntimeContext , который доступен через метод getRuntimeContext () в интерфейсе RichFunction.

Кроме того, у Flink есть встроенный репортер statsd metrics , который вы должны использовать вместо того, чтобы использовать свой собственный.Более того, numRecordsIn, numRecordsOut, numRecordsInPerSecond и numRecordsOutPerSecond уже вычисляются для вас , поэтому нет необходимости создавать эту инструментарий самостоятельно.Вы также можете получить доступ к этим метрикам через веб-интерфейс Flink или REST API.

Что касается причин, по которым вы, возможно, испытываете плохую масштабируемость с потребителем Kafka, это может быть вызвано многими причинами.Если вы используете обработку времени события, то незанятые разделы могут удерживать вещи (см. https://issues.apache.org/jira/browse/FLINK-5479). Если поток имеет ключ, то может возникнуть проблема с перекосом данных. Если вы подключаетесь к внешней базе данных или службе, тогдаэто может быть узким местом. Если контрольная точка настроена неправильно, это может вызвать это. Или недостаточную пропускную способность сети.

Я бы начал отлаживать это, посмотрев на некоторые ключевые метрики в веб-интерфейсе Flink. Хорошо ли сбалансирована нагрузка?через подзадачи, или это перекос? Вы могли бы включить отслеживание задержки и посмотреть, не ведет ли себя один из разделов kafka (проверяя задержку в приемнике (ах), которая будет сообщаться для каждого раздела)И вы могли бы искать обратное давление.

0 голосов
/ 26 мая 2018

см. Пример кода,

    public class passthru extends RichMapFunction<String, String> {
        public void open(Configuration configuration) throws Exception {
        ... ... 
            stats = new NonBlockingStatsDClient();
        }
        public String map(String value) throws Exception { 
            ... ...
            stats.increment(); 
            return value;
        }
    }

    public class datagen extends RichSourceFunction<String> {
        ... ...
        public void run(SourceContext<String> ctx) throws Exception {
            int i = 0;
            while (run){
                String idx = String.format("%09d", i);
                ctx.collect("{\"<a 1kb json content with idx in certain json field>\"}");
                i++;
                if(i == loop) 
                    run = false;
            }
        }
        ... ...
    }
    public class Job {
        public static void main(String[] args) throws Exception {
        ... ...
            DataStream<String> stream = env.addSource(new datagen(loop)).rebalance();
            DataStream<String> convert = stream.map(new passthru(statsdUrl));
            env.execute("Flink");
        } 
    }

код сокращения штата,

    dataStream.flatMap(xxx).keyBy(new KeySelector<xxx, AggregationKey>() {
        public AggregationKey getKey(rec r) throws Exception {
            ... ...             
           }
        }).process(new Aggr());

    public class Aggr extends ProcessFunction<rec, rec> {
        private ReducingState<rec> store;
        public void open(Configuration parameters) throws Exception {
            store= getRuntimeContext().getReducingState(new ReducingStateDescriptor<>(
                "reduction store", new ReduceFunction<rec>() {
            ... ...
        }
    public void processElement(rec r, Context ctx, Collector<rec> out)
        throws Exception {
            ... ...
            store.add(r);
...