MinBy не возвращает никакого результата Apache Storm - PullRequest
1 голос
/ 19 января 2020

Я построил топологию Storm с извлечением данных из Kafka. И я хотел бы построить агрегацию с минимальным подсчетом для каждой партии на одном из полей. Я попытался использовать функцию maxBy в потоке, однако она не отображает никаких результатов, хотя данные проходят через систему, а функция вывода работала с другими агрегатами. Как это можно реализовать по-другому или что можно исправить в текущей реализации?

Вот моя текущая реализация:

val tridentTopology = new TridentTopology()

    val stream = tridentTopology.newStream("kafka_spout",
      new KafkaTridentSpoutOpaque(spoutConfig))
      .map(new ParserMapFunction, new Fields("created_at", "id", "text", "source", "timestamp_ms",
        "user.id", "user.name", "user.location", "user.url", "user.description", "user.followers_count",
        "user.friends_count", "user.favorite_count", "user.lang", "entities.hashtags"))
      .maxBy("user.followers_count")
      .map(new OutputFunction)

Моя пользовательская функция вывода:

class OutputFunction extends MapFunction{

  override def execute(input: TridentTuple): Values = {
    val values = input.getValues.asScala.toList.toString
    println(s"TWEET: $values")
    new Values(values)
  }
}
...