Spark Streaming: время пакетной обработки постоянно составляет 2,2 минуты по понятной причине - PullRequest
0 голосов
/ 07 января 2019

Мы разрабатываем приложение Spark Streaming, получающее данные (события аутентификации) из раздела kafka. Каждые 1 мин (интервал потоковой передачи = 1 мин) пакет событий обрабатывается в окне продолжительностью 6 мин (окно = 6 мин), причем упомянутое окно скользит каждую минуту (интервал скольжения = 1 мин). По некоторым странным причинам среднее время обработки пакета составляет 2,2 минуты (независимо от того, имеет ли оно запись или более 100 записей).

Мы попытались изменить конфигурацию с 3 интервалами (потоковое, скользящее, оконное); задействовать некоторые обходные пути в нашем исходном коде в процессе сериализации / десериализации, преобразования, применяемые к каждому RDD (переключение с groupByKey на ReductionByKey, а затем ReduckByKeyAndWindow и т. д.) безуспешно.

Пожалуйста, посмотрите в следующих строках немного нашего кода:

inputDStream = KafkaUtils.createDirectStream (streamingContext, LocationStrategies.PreferConsistent (), ConsumerStrategies. Подписка (reqSub, getKafkaParams (com.indigo.deep.common.model.RequestEventDerSer2.class)));

    JavaDStream<RequestEvent> requestEventStream = inputDStream.map(kafkaRecord -> {
        numInputMessages.add(1);
        return kafkaRecord.value();

    });


    JavaPairDStream<String, Long> ipStream = requestEventStream.filter(event -> event.getIPAddress()!=null )
            .mapToPair(event -> new Tuple2<String, Long>(event.getIPAddress(), 1L));

    JavaPairDStream<String, Long> ipCountDStream = ipStream.reduceByKeyAndWindow(((v1, v2) -> {
        return v1 + v2;
    }), ((v1, v2) -> {
        return v1 - v2;
    }), WINDOW_DURATION, SLIDE_DURATION);
    ipCountDStream.cache();


    JavaPairDStream<String, Long> ipRisky = ipCountDStream.filter(f -> f._2.intValue() > 5);
    JavaPairDStream<String, Long> ipRiskyToPersist = ipRisky
            .mapToPair(t -> new Tuple2<String, Long>(t._1, 1L));

    ipRiskyToPersist.foreachRDD(f -> {
        Map<String, Long> mappedIPs = new HashMap<>();
        f.foreach(t -> mappedIPs.put(t._1, t._2));


        LOGGER.info(" RequestEvent: Total risky ips: " + f.count());
        LOGGER.info("Processed " + numInputMessages.localValue() + " unique IPs");

    });

Есть предложения по улучшению времени пакетной обработки?

...