Многопоточная топология шторма - PullRequest
0 голосов
/ 13 декабря 2018

Прежде всего, искренние извинения, если мой вопрос дублируется, я попытался найти, но не смог найти подходящий ответ на мой вопрос

Прежде всего, искренние извинения, если я задаю что-то очень простое, так как яновичок в шторме.А также, если мой вопрос дублирован, так как я пытался найти, но не смог найти соответствующий ответ на мой вопрос

Пожалуйста, сообщите о моем случае использования ниже.

Мой случай использования:

У меня есть Spout, считывающий данные из одного внутреннего механизма обмена сообщениями, так как его кортежи приема и передачи имеют очень высокую частоту (100 с / сек).

Теперь каждый, кроме данных, каждый кортеж также имеет частоту (int) (поскольку может быть всего 4-5 типов частоты).

Теперь мне нужно спроектировать Bolt для пакетирования / объединения всех кортежей и генерации только периодически по частоте, с возможностью генерации только последнего кортежа (в случае дубликата, полученного до следующей партии), поскольку у нас есть строковый ключ в данных кортежа для идентификации дубликата.

например,

  1. Таким образом, все кортежи с 25секунд как частота будет объединена и будет излучаться Болтом каждые 25 секунд (в случае дублирования кортежа, полученного в течение 25 секунд, будет рассматриваться только последняя).

  2. Аналогично всем кортежам с 10-минутным интервалом, так как частота будет объединена и будет генерироваться Bolt через каждые 10 минут (в случае дублированного кортежа, полученного в течение 10 минут, будет рассматриваться только последний).

** Теперь, поскольку мы можем иметь частоты 4-5 типов (например, 10 секунд, 25 секунд, 10 минут, 20 минут и т. Д., Они настроены), и каждый кортеж должен быть забит всоответствующая партия и выбрасывается (как указано выше).

Fyi.Для группировки болтов я использовал «fieldsGrouping», как показано ниже.

*.fieldsGrouping("FILTERING_BOLT",new Fields(PUBLISHING_FREQUENCY));*

Пожалуйста, помогите или посоветуйте, каков наилучший подход для моего варианта использования, поскольку я просто не мог придумать что-либо для реализации потоков параллельных кортежей и управления внутренним параллелизмом Storm.

1 Ответ

0 голосов
/ 17 декабря 2018

Звучит так, как будто вы хотите использовать оконные болты https://storm.apache.org/releases/2.0.0-SNAPSHOT/Windowing.html. Возможно, вы хотите, чтобы окно было опрокидывающимся (то есть без перекрытия между оконными интервалами)

Оконные болты позволяют вам установить интервал, при котором они должны испускать окна (например,каждые 10 секунд), а затем болт буферизует все кортежи, которые он получает за предыдущие 10 секунд, прежде чем вызывать метод execute, который вы предоставляете.

Структура, которую я думаю, вам нужна, например,

spout -> splitter -> 5 second window bolt
                  -> 10 second window bolt

Сплиттер должен получить кортежи, исследовать поле частоты и отправить кортеж на правый оконный болт.Вы заставляете это делать, объявляя поток для каждого типа частоты.

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare("5-sec-stream", ...);
    declarer.declare("10-sec-stream", ...);
}

public void execute(Tuple input) {
    if (frequencyIsFive(input)) {
        collector.emit("5-sec-stream", new Values(input.getValues()))
    }
    //more cases here
}

Затем, когда объявляете свою топологию, вы делаете

topologyBuilder.setBolt("splitter", new SplitterBolt())
     .shuffleGrouping("spout")

topologyBuilder.setBolt("5-second-window", new YourWindowingBolt())
     .globalGrouping("splitter", "5-sec-stream")

, чтобы заставить все 5-секундные кортежи перейти к 5второй оконный болт.

См. https://storm.apache.org/releases/2.0.0-SNAPSHOT/Concepts.html для получения дополнительной информации об этом, в частности, о потоках и группировках.

Простой пример топологии окон в https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java.

Одна вещь, о которой вы, возможно, захотите знать, - это время ожидания кортежа Storm.Если вам нужно окно, например, 10 минут, вам нужно значительно увеличить время ожидания кортежа по умолчанию, равное 30 секундам, чтобы кортежи не останавливались во время ожидания в очереди.Вы можете сделать это, установив, например, conf.setMessageTimeoutSecs(15*60) при настройке топологии.Вы хотите, чтобы между интервалами окон и тайм-аутом кортежа была небольшая задержка, потому что вы хотите избежать как можно большего времени ожидания кортежей.

...