Мы разрабатываем приложение Spark Streaming, получающее данные (события аутентификации) из раздела kafka. Каждые 1 мин (интервал потоковой передачи = 1 мин) пакет событий обрабатывается в окне продолжительностью 6 мин (окно = 6 мин), причем упомянутое окно скользит каждую минуту (интервал скольжения = 1 мин). По некоторым странным причинам среднее время обработки пакета составляет 2,2 минуты (независимо от того, имеет ли оно запись или более 100 записей).
Мы попытались изменить конфигурацию с 3 интервалами (потоковое, скользящее, оконное); задействовать некоторые обходные пути в нашем исходном коде в процессе сериализации / десериализации, преобразования, применяемые к каждому RDD (переключение с groupByKey на ReductionByKey, а затем ReduckByKeyAndWindow и т. д.) безуспешно.
Пожалуйста, посмотрите в следующих строках немного нашего кода:
inputDStream = KafkaUtils.createDirectStream (streamingContext, LocationStrategies.PreferConsistent (),
ConsumerStrategies. Подписка (reqSub,
getKafkaParams (com.indigo.deep.common.model.RequestEventDerSer2.class)));
JavaDStream<RequestEvent> requestEventStream = inputDStream.map(kafkaRecord -> {
numInputMessages.add(1);
return kafkaRecord.value();
});
JavaPairDStream<String, Long> ipStream = requestEventStream.filter(event -> event.getIPAddress()!=null )
.mapToPair(event -> new Tuple2<String, Long>(event.getIPAddress(), 1L));
JavaPairDStream<String, Long> ipCountDStream = ipStream.reduceByKeyAndWindow(((v1, v2) -> {
return v1 + v2;
}), ((v1, v2) -> {
return v1 - v2;
}), WINDOW_DURATION, SLIDE_DURATION);
ipCountDStream.cache();
JavaPairDStream<String, Long> ipRisky = ipCountDStream.filter(f -> f._2.intValue() > 5);
JavaPairDStream<String, Long> ipRiskyToPersist = ipRisky
.mapToPair(t -> new Tuple2<String, Long>(t._1, 1L));
ipRiskyToPersist.foreachRDD(f -> {
Map<String, Long> mappedIPs = new HashMap<>();
f.foreach(t -> mappedIPs.put(t._1, t._2));
LOGGER.info(" RequestEvent: Total risky ips: " + f.count());
LOGGER.info("Processed " + numInputMessages.localValue() + " unique IPs");
});
Есть предложения по улучшению времени пакетной обработки?