У меня есть простая топология Storm, которая читает данные из Kafka, анализирует и извлекает поля сообщения. Я хотел бы отфильтровать поток кортежей по одному из значений полей и выполнить подсчет агрегации по другому. Как я могу сделать это в Storm? Я не нашел соответствующих методов для кортежей (фильтр, агрегат), поэтому я должен выполнять эти функции непосредственно над значениями поля?
Вот топология:
topologyBuilder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1)
topologyBuilder.setBolt("parser_bolt", new ParserBolt()).shuffleGrouping("kafka_spout")
topologyBuilder.setBolt("transformer_bolt", new KafkaTwitterBolt()).shuffleGrouping("parser_bolt")
val config = new Config()
cluster.submitTopology("kafkaTest", config, topologyBuilder.createTopology())
Я настроил KafkaTwitterBolt для подсчета и фильтрации с проанализированными полями. Мне удалось отфильтровать весь список значений только не по заданному c полю:
class KafkaTwitterBolt() extends BaseBasicBolt{
override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
val tweetValues = input.getValues.asScala.toList
val filterTweets = tweetValues
.map(_.toString)
.filter(_ contains "big data")
val resultAllValues = new Values(filterTweets)
collector.emit(resultAllValues)
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
declarer.declare(new Fields("created_at", "id", "text", "source", "timestamp_ms",
"user.id", "user.name", "user.location", "user.url", "user.description", "user.followers_count",
"user.friends_count", "user.lang", "user.favorite_count", "entities.hashtags"))
}
}