Следующий класс - моя рабочая вертикаль, в которой я хочу выполнить код блокировки при получении сообщения от шины событий на канале с именем events-config.
Цель состоит в том, чтобы генерировать и публиковать сообщения json на неопределенный срок, пока я не получу сообщение об остановке операции на канале config-событий.
Я использую executeBlocking для достижения желаемой функциональности.Однако, поскольку я выполняю операцию блокировки бесконечно, Vertx блокирует предупреждения о дампе проверщика потоков.
Вопрос:
- Есть ли способ отключить заблокированную проверку цепей только для определенной вершины ??
- Придерживается ли приведенный ниже коднаилучшая практика выполнения бесконечного цикла на основе необходимости в Vertx?Если нет, можете ли вы предложить лучший способ сделать это?
public class WorkerVerticle extends AbstractVerticle {
Logger logger = LoggerFactory.getLogger(WorkerVerticle.class);
private MessageConsumer<Object> mConfigConsumer;
AtomicBoolean shouldPublish = new AtomicBoolean(true);
private JsonGenerator json = new JsonGenerator();
@Override
public void start() {
mConfigConsumer = vertx.eventBus().consumer("events-config", message -> {
String msgBody = (String) message.body();
if (msgBody.contains(PublishOperation.START_PUBLISH.getName()) && !mJsonGenerator.isPublishOnGoing()) {
logger.info("Message received to start producing data onto kafka " + msgBody);
vertx.<Void>executeBlocking(voidFutureHandler -> {
Integer numberOfMessagesToBePublished = 100000;
if (numberOfMessagesToBePublished <= 0) {
logger.info("Skipping message publish :"+numberOfMessagesToBePublished);
return; // is it best way to do it ??
}
publishData(numberOfMessagesToBePublished);
},false, voidAsyncResult -> logger.info("Blocking publish operation is terminated"));
} else if (msgBody.contains(PublishOperation.STOP_PUBLISH.getName()) && mJsonGenerator.isPublishOnGoing()) {
logger.info("Message received to terminate " + msgBody);
mJsonGenerator.terminatePublish();
}
});
}
private void publishData(){
while(shouldPublish.get()){
//code to generate json indefinitely until some one reset shouldPublish variable
}
}
}