Как отправить сразу несколько (разных) кортежей из одного KafkaSpout на болт? - PullRequest
0 голосов
/ 05 марта 2019

Я новичок в Apache Storm.

Я пытаюсь разработать систему обработки потоков в реальном времени с использованием Apache Kafka, Storm и ESPER CEP.

Для этого яналичие одного KafkaSpout, который будет отправлять потоки в Bolts (с моими запросами CEP) для фильтрации потока.

Я уже создал топологию и пытаюсь запустить ее в локальном кластере

Проблема в том, что запрос CEP, выполняемый в моих болтах, требует пакетов кортежей для выполнения операций с окнами в потоках.И в моей топологии KafkaSpout отправляет только один кортеж за один раз в Bolts для обработки.Поэтому мой запрос CEP не работает должным образом.

Я использую KafkaSpout по умолчанию в Storm.Можно ли как-то отправить несколько разных кортежей одновременно в Bolts?Некоторые настройки конфигурации могут сделать это, или мне нужно сделать свой собственный KafkaSpout для этого?

Пожалуйста, помогите !!

Моя топология:

TopologyBuilder builder = new TopologyBuilder ();

builder.setSpout ("KafkaSpout", новый KafkaSpout <> (KafkaSpoutConfig.builder ("localhost:" + + 9092, "weatherdata"). SetProp (ConsumerConfig.GROUP_ID_CONFIG, "группа потребителей погоды")) .build ()), 4);

builder.setBolt ("A", новый FeatureSelectionBolt (), 2) .globalGrouping ("KafkaSpout");

builder.setBolt ("B ", новый TrendDetectionBolt (), 2) .shuffleGrouping (" A ")

Я использую 2 болта и один носик.

Мой запрос esper, выполняющийся в болте A, равен

выберите first (e), last (e) из weatherEvent.win:length(3) как e

Здесь я пытаюсь получить первое и последнее событие из окна длины три из потока событий,Но я получаю одно и то же первое и последнее событие, потому что KafkaSpout отправляет только один кортеж за раз.

1 Ответ

0 голосов
/ 05 марта 2019

Носик не может этого сделать, но вы можете использовать поддержку окон Storm https://storm.apache.org/releases/2.0.0-SNAPSHOT/Windowing.html, или просто написать болт агрегации и поместить его между носиком и остальной топологией.

Таким образом, ваша топология должна быть spout -> aggregator -> feature selection -> trend detection.

. Я бы порекомендовал вам попробовать встроенную поддержку управления окнами, но если вы предпочитаете писать собственную агрегацию, вашему болту действительно нужно просто получить некоторое количество кортежей.(например, 3) и создайте новый кортеж, содержащий все значения.

Болт агрегатора должен сделать что-то вроде

private List<Tuple> buffered;

execute(Tuple input) {
  if (buffered.size != 2) {
    buffered.add(input)
    return
  }
  Tuple first = buffered.get(0)
  Tuple second = buffered.get(1)
  Values aggregate = new Values(first.getValues(), second.getValues(), input.getValues())
  List<Tuple> anchors = List.of(first, second, input)
  collector.emit(anchors, aggregate)
  collector.ack(first, second, input)
  buffered.clear()
}

Таким образом, вы получите один кортеж, содержащий содержимое3 входных кортежа.

...