Как программно перезапустить работника для топологии в Apache Storm - PullRequest
0 голосов
/ 13 октября 2018

Я столкнулся с проблемой в Apache Storm

Сценарий проблемы :

  1. Когда небольшие данные отправляются в Storm, данные должным образом обрабатываются топологией (имеет только 1 рабочий поток) и отдать дальше, чтобы сохранить в MongoDB.
  2. Но когда данные огромны, они обрабатывают данные и сохраняются в БД, но впоследствии не будут принимать другие данные, большие или маленькие.

Текущий обходной путь :

мы перезапускаем работника из Storm UI.

Вопрос :

Можем ли мы перезапустить тополога программно?

Ответы [ 2 ]

0 голосов
/ 17 октября 2018

Никогда не будет хорошим решением перезапустить рабочих, вы можете потерять несколько кортежей.Лучше всего использовать функциональность надежности сообщений Storm, как ответил Рахим.

Однако, кроме шторма надежности сообщений, имеется механизм внутреннего обратного давления, это означает, что, когда носики вводят больше данных, чем болтыв состоянии обработки, носик будет автоматически замедляться.

Чтобы включить это, вам нужно сначала, как сказал Рахим, включить активацию.Это означает, что если ваша топология проста:

Излив -> Болт

Излив будет:

public void nextTuple(){
  ...
  _collector.emit(new Values(tuple), tupleId);
}

@Override
publci void ack(Object msgId) { super.ack(msgId); }

Где tupleId просто будет инкрементным счетчиком count++.Таким образом, вы объявляете Storm новый кортеж, ожидающий подтверждения.

Тем временем в последовательном болте и во всех последовательных болтах в топологии или, по крайней мере, до того, который вызовет узкое место, вы напишите:

public void execute(Tuple tuple){
  ...
  _collector.emit(tuple, new Values(newTuple));
  _collector.ack(tuple);
}

Таким образом, вы заметите Storm, что кортеж полностью обработан.

В вашем основном методе, где вы объявляете построителя топологии, вы, как минимум, не последнийнеобходимо определить максимальное количество кортежей, которое будет ожидать Spout:

Config conf = new Config();
conf.setMaxSpoutPending(100);

Таким образом, Spout начнет создавать новые кортежи, ожидающие их подтверждения, если (в этом случае) число ожидающихесли количество кортежей превышает 100, то spout перестанет вызывать метод nextTuple, ожидая их подтверждения и затем генерируя новые.

NB: значение 100 является лишь примером, вам, возможно, придется немного его настроитьчтобы оптимизировать его под вашу ситуацию.

Ссылок, которыми поделился Рахим, должно быть достаточно, чтобы понять механизм, во всяком случае, если вы хотите углубиться в тВ реализацию я добавляю эту ссылку:

0 голосов
/ 13 октября 2018

Storm имеет два типа болтов: IRichBolt и IBasicBolt.Если вы внедряете IBasicBolt, вы должны также реализовать Acknowledge.Также вы должны отправить Ack в свой болт, чтобы предотвратить блокировку. Эти ссылки хороши:

http://storm.apache.org/releases/1.0.6/Concepts.html http://storm.apache.org/releases/1.2.2/Guaranteeing-message-processing.html

...