Apache Spark подсчитывает средние значения - PullRequest
0 голосов
/ 29 ноября 2018

У меня есть разные сообщения в формате 'eventTimestamp-eventName? ChatID = 123', которые поступают в мой брокер Kafka, но мне нужно рассчитать временное расстояние AVG между двумя сообщениями с одним и тем же полем chatID :

1543420877919-chat.start?chatID=11
1543420877922-chat.start?chatID=12
1543420877923-chat.status?chatID=12
1543420877926-chat.start?chatID=13
1543420906433-chat.end?chatID=12
1543420906437-chat.end?chatID=11

So the chat duration with:
chatID=11 is 1543420906437-1543420877919=28518 mls
chatID=12 is 1543420906433-1543420877922 = 28511 mls
__ 
AVG duration is (28518+28511)/2=28514mls
(chatID=13 event must not be taken into account cause we dont have 'chat.end' 
event at this moment for this chatID)

на стороне Spark У меня есть следующий код:

JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(
                    streamingContext,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
            );

    // filter "chat.start" and "chat.end" events
    stream.filter(new Function<ConsumerRecord<String, String>, Boolean>() {
        @Override
        public Boolean call(ConsumerRecord<String, String> rec) throws Exception {
            return chatStartEndEvents.contains(rec.value());
        }
    })
    .foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
        @Override
        public void call(JavaRDD<ConsumerRecord<String, String>> consumerRecordJavaRDD) throws Exception {
            consumerRecordJavaRDD
                    // convert event strings to chatID<->eventTimestamp pair
                    .mapToPair(new PairFunction<ConsumerRecord<String, String>, String, Long>() {
                        @Override
                        public Tuple2<String, Long> call(ConsumerRecord<String, String> rec) throws Exception {
                            Event evt = EventFactory.createEventFromString(rec.value());
                            return new Tuple2<String, Long>(evt.getChatId(), evt.getOccurredAt());
                        }
                    })
                    // substract 'end' and 'start' chat dates
                    .reduceByKey(new Function2<Long, Long, Long>() {
                        @Override
                        public Long call(Long val1, Long val2) throws Exception {
                            return Math.abs(val1 - val2);
                        }
                    })
                    // :TODO what shoul I do next, 
I mean how can I transform groupped key->time pair to AVG time for the all pairs and print it lets say every 10 sec to console.

        }
    });

Итак, как вы видите, я остановился на преобразовании lowerByKey (), где я вычитал значения меток времени, и в настоящее время у меня есть некоторые проблемысо следующим преобразованием.

...