Альтернативой использованию emitDirect
, как описано в этого ответа , является реализация собственной группировки потоков. Сложность примерно та же, но она позволяет вам повторно использовать логику группировки по нескольким болтам.
Например, группировка случайных чисел в Storm реализована как CustomStreamGrouping
следующим образом:
public class ShuffleGrouping implements CustomStreamGrouping, Serializable {
private ArrayList<List<Integer>> choices;
private AtomicInteger current;
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
choices = new ArrayList<List<Integer>>(targetTasks.size());
for (Integer i : targetTasks) {
choices.add(Arrays.asList(i));
}
current = new AtomicInteger(0);
Collections.shuffle(choices, new Random());
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
int rightNow;
int size = choices.size();
while (true) {
rightNow = current.incrementAndGet();
if (rightNow < size) {
return choices.get(rightNow);
} else if (rightNow == size) {
current.set(0);
return choices.get(0);
}
} // race condition with another thread, and we lost. try again
}
}
Storm вызовет prepare
, чтобы сообщить вам идентификаторы задач, за которые отвечает ваша группа, а также некоторый контекст в топологии. Когда Storm испускает кортеж из болта / излива, где вы используете эту группировку, Storm вызовет chooseTasks
, что позволит вам определить, к каким задачам должен идти кортеж. Затем вы будете использовать группировку при построении топологии, как показано:
TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("spout", new MySpout(), 1);
tp.setBolt("bolt", new MyBolt())
.customGrouping("spout", new ShuffleGrouping());
Имейте в виду, что группировки должны быть Serializable
и потокобезопасными.