Фильтрация данных по болту Storm - PullRequest
0 голосов
/ 18 января 2020

У меня есть простая топология Storm, которая читает данные из Kafka, анализирует и извлекает поля сообщения. Я хотел бы отфильтровать поток кортежей по одному из значений полей и выполнить подсчет агрегации по другому. Как я могу сделать это в Storm? Я не нашел соответствующих методов для кортежей (фильтр, агрегат), поэтому я должен выполнять эти функции непосредственно над значениями поля?

Вот топология:

topologyBuilder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1)
topologyBuilder.setBolt("parser_bolt", new ParserBolt()).shuffleGrouping("kafka_spout")
topologyBuilder.setBolt("transformer_bolt", new KafkaTwitterBolt()).shuffleGrouping("parser_bolt")

val config = new Config()
cluster.submitTopology("kafkaTest", config, topologyBuilder.createTopology())

Я настроил KafkaTwitterBolt для подсчета и фильтрации с проанализированными полями. Мне удалось отфильтровать весь список значений только не по заданному c полю:

class KafkaTwitterBolt() extends BaseBasicBolt{

 override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
  val tweetValues = input.getValues.asScala.toList
  val filterTweets = tweetValues
     .map(_.toString)
     .filter(_ contains "big data")
  val resultAllValues = new Values(filterTweets)
  collector.emit(resultAllValues)
 }

 override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
  declarer.declare(new Fields("created_at", "id", "text", "source", "timestamp_ms",
   "user.id", "user.name", "user.location", "user.url", "user.description", "user.followers_count",
   "user.friends_count", "user.lang", "user.favorite_count", "entities.hashtags"))
 }
}

Ответы [ 2 ]

0 голосов
/ 20 января 2020

Ваш ответ на { ссылка } немного неправильный. API ядра Storm позволяет фильтровать и агрегировать, вам просто нужно написать логи c самостоятельно.

Болт фильтрации - это просто болт, который отбрасывает некоторые кортежи и передает другие. Например, следующий болт отфильтрует кортежи на основе строкового поля:

class FilteringBolt() extends BaseBasicBolt{

 override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
  val values = input.getValues.asScala.toList
  if ("Pass me".equals(values.get(0))) {
    collector.emit(values)
  }
  //Emitting nothing means discarding the tuple
 }

 override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
  declarer.declare(new Fields("some-field"))
 }
}

Агрегирующий болт - это просто болт, который собирает несколько кортежей и затем генерирует новый совокупный кортеж, закрепленный в исходных кортежах:

class AggregatingBolt extends BaseRichBolt {
  List<Tuple> tuplesToAggregate = ...;
  int counter = 0;

 override def execute(input: Tuple): Unit = {
  tuplesToAggregate.add(input);
  counter++;
  if (counter == 10) {
    Values aggregateTuple = ... //create a new set of values based on tuplesToAggregate
    collector.emit(tuplesToAggregate, aggregateTuple) //This anchors the new aggregate tuple to all the original tuples, so if the aggregate fails, the original tuples are replayed.
    for (Tuple t : tuplesToAggregate) {
      collector.ack(t); //Ack the original tuples now that this bolt is done with them
      //Note that you MUST emit before you ack, or the at-least-once guarantee will be broken.
    }
    tuplesToAggregate.clear();
    counter = 0;
  }
  //Note that we don't ack the input tuples until the aggregate gets emitted. This lets us replay all the aggregated tuples in case the aggregate fails
 }
}

Обратите внимание, что для агрегации вам нужно будет расширить BaseRichBolt и выполнить взлом вручную, поскольку вы хотите отложить взятие кортежа до его включения в составной кортеж.

0 голосов
/ 19 января 2020

Оказывается, что Storm core API не позволяет, чтобы для выполнения фильтрации по любому полю использовался Trident (он имеет встроенную функцию фильтрации). Код будет выглядеть так:

 val tridentTopology = new TridentTopology()

    val stream = tridentTopology.newStream("kafka_spout",
      new KafkaTridentSpoutOpaque(spoutConfig))
      .map(new ParserMapFunction, new Fields("created_at", "id", "text", "source", "timestamp_ms",
        "user.id", "user.name", "user.location", "user.url", "user.description", "user.followers_count",
        "user.friends_count", "user.favorite_count", "user.lang", "entities.hashtags"))
    .filter(new LanguageFilter)

Сама функция фильтрации:

class LanguageFilter extends BaseFilter{

  override def isKeep(tuple: TridentTuple): Boolean = {
    val language = tuple.getStringByField("user.lang")
    println(s"TWEET: $language")
    language.contains("en")
  }
}
...