Я построил топологию Storm с извлечением данных из Kafka. И я хотел бы построить агрегацию с минимальным подсчетом для каждой партии на одном из полей. Я попытался использовать функцию maxBy в потоке, однако она не отображает никаких результатов, хотя данные проходят через систему, а функция вывода работала с другими агрегатами. Как это можно реализовать по-другому или что можно исправить в текущей реализации?
Вот моя текущая реализация:
val tridentTopology = new TridentTopology()
val stream = tridentTopology.newStream("kafka_spout",
new KafkaTridentSpoutOpaque(spoutConfig))
.map(new ParserMapFunction, new Fields("created_at", "id", "text", "source", "timestamp_ms",
"user.id", "user.name", "user.location", "user.url", "user.description", "user.followers_count",
"user.friends_count", "user.favorite_count", "user.lang", "entities.hashtags"))
.maxBy("user.followers_count")
.map(new OutputFunction)
Моя пользовательская функция вывода:
class OutputFunction extends MapFunction{
override def execute(input: TridentTuple): Values = {
val values = input.getValues.asScala.toList.toString
println(s"TWEET: $values")
new Values(values)
}
}