отправка ввода с одного излива на несколько болтов с группировкой полей в Apache Storm - PullRequest
0 голосов
/ 15 ноября 2018
builder.setSpout("spout", new TweetSpout());
builder.setBolt("bolt", new TweetCounter(), 2).fieldsGrouping("spout",
new Fields("field1"));

У меня есть поле ввода "field1", добавленное в группировку полей. По определению группировки полей все твиты с одинаковым «field1» должны переходить к одной задаче TweetCounter. Количество исполнителей для болта TweetCounter равно 2.

Однако, если «field1» одинаково во всех кортежах входящего потока, означает ли это, что даже если я указал 2 исполнителя для TweetCounter, поток будет отправлен только одному из них, а другой экземпляр останется пустым?

Чтобы продолжить работу с моим конкретным вариантом использования, как я могу использовать один носик и отправлять данные на разные болты на основе определенного значения поля ввода (field1)?

Ответы [ 2 ]

0 голосов
/ 26 ноября 2018

Альтернативой использованию emitDirect, как описано в этого ответа , является реализация собственной группировки потоков. Сложность примерно та же, но она позволяет вам повторно использовать логику группировки по нескольким болтам.

Например, группировка случайных чисел в Storm реализована как CustomStreamGrouping следующим образом:

public class ShuffleGrouping implements CustomStreamGrouping, Serializable {
    private ArrayList<List<Integer>> choices;
    private AtomicInteger current;

    @Override
    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
        choices = new ArrayList<List<Integer>>(targetTasks.size());
        for (Integer i : targetTasks) {
            choices.add(Arrays.asList(i));
        }
        current = new AtomicInteger(0);
        Collections.shuffle(choices, new Random());
    }

    @Override
    public List<Integer> chooseTasks(int taskId, List<Object> values) {
        int rightNow;
        int size = choices.size();
        while (true) {
            rightNow = current.incrementAndGet();
            if (rightNow < size) {
                return choices.get(rightNow);
            } else if (rightNow == size) {
                current.set(0);
                return choices.get(0);
            }
        } // race condition with another thread, and we lost. try again
    }
}

Storm вызовет prepare, чтобы сообщить вам идентификаторы задач, за которые отвечает ваша группа, а также некоторый контекст в топологии. Когда Storm испускает кортеж из болта / излива, где вы используете эту группировку, Storm вызовет chooseTasks, что позволит вам определить, к каким задачам должен идти кортеж. Затем вы будете использовать группировку при построении топологии, как показано:

TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("spout", new MySpout(), 1);
tp.setBolt("bolt", new MyBolt())
  .customGrouping("spout", new ShuffleGrouping());

Имейте в виду, что группировки должны быть Serializable и потокобезопасными.

0 голосов
/ 17 ноября 2018

Кажется, один из способов решения этой проблемы - использовать Прямая группировка , когда источник решает, какой компонент получит кортеж.:

Это особый вид группировки.Поток, сгруппированный таким образом, означает, что производитель кортежа решает, какая задача потребителя получит этот кортеж.Прямые группировки могут быть объявлены только для потоков, которые были объявлены как прямые потоки.Кортежи, отправленные в прямой поток, должны быть отправлены одним из методов [emitDirect] (javadocs / org / apache / storm / task / OutputCollector.html # emitDirect (int, int, java.util.List). Болт может получитьидентификаторы задач своих потребителей, используя предоставленный TopologyContext или отслеживая выходные данные метода emit в OutputCollector (который возвращает идентификаторы задач, в которые был отправлен кортеж).

Вы можете увидетьв этом примере используется здесь :

 collector.emitDirect(getWordCountIndex(word),new Values(word));

, где getWordCountIndex возвращает индекс компонента, в котором этот кортеж будет обрабатываться.

...