У меня есть сценарий использования, когда у меня есть набор элементов DiagnosticRun
s, которые отправляются в мой кластер.Я хочу обрабатывать их поочередно (чтобы избежать конфликтов).Я пытаюсь использовать Hazelcast Queue
, защищенный Lock
, чтобы убедиться, что элементы обрабатываются по одному.Hazelcast работает во встроенном режиме в моем кластере.Если я зарегистрирую ItemListener
на Queue
, безопасно ли вызывать take()
на Queue
из метода itemAdded()
?Например:
@Component
public class DistributedQueueListener
{
public static final String DIAGNOSTICS_RUN_QUEUE_NAME = "diagnosticRun";
@Autowired
private HazelcastInstance hazelcast;
@Autowired
private ProductVersioningService productVersioningService;
private IQueue<DiagnosticRun> diagnosticRunQueue;
private ILock diagnosticRunLock;
private String diagnosticRunListenerId;
@PostConstruct
public void init()
{
diagnosticRunQueue = hazelcast.getQueue(DIAGNOSTICS_RUN_QUEUE_NAME);
diagnosticRunLock = hazelcast.getLock("diagnosticRunLock");
diagnosticRunListenerId = diagnosticRunQueue.addItemListener(new DiagnosticRunListener(), false);
}
@PreDestroy
public void stop()
{
diagnosticRunQueue.removeItemListener(diagnosticRunListenerId);
}
public class DiagnosticRunListener implements ItemListener<DiagnosticRun>
{
@Override
public void itemAdded(ItemEvent<diagnosticRun> item)
{
diagnosticRunLock.lock(5, TimeUnit.SECONDS);
try
{
DiagnosticRun diagnosticRun = diagnosticRunQueue.poll();
if(diagnosticRun != null)
{
productVersioningService.updateProductDeviceTable(diagnosticRun);
}
}
finally
{
diagnosticRunLock.unlock();
}
}
@Override
public void itemRemoved(ItemEvent<diagnosticRun> item)
{
}
}
}
Я не уверен, является ли потокобезопасным вызовить take()
на Queue
из этого места и потока.
Если это не разрешено, я 'Мне придется настроить свой собственный длительный цикл на poll()
Queue
.Я не уверен, каков наилучший способ настроить длительный поток в приложении Spring Boot.Предполагая, что приведенный выше метод не работает, будет ли приведенный ниже код безопасен для потоков?Или есть лучший способ сделать это?
@Component
public class DistributedQueueListener
{
public static final String DIAGNOSTIC_RUN_QUEUE_NAME = "diagnosticRun";
@Autowired
private HazelcastInstance hazelcast;
@Autowired
private ProductVersioningService productVersioningService;
private IQueue<diagnosticRun> diagnosticRunQueue;
private ILock diagnosticRunLock;
private ExecutorService executorService;
@PostConstruct
public void init()
{
diagnosticRunQueue = hazelcast.getQueue(DIAGNOSTIC_RUN_QUEUE_NAME);
diagnosticRunLock = hazelcast.getLock("diagnosticRunLock");
executorService = Executors.newFixedThreadPool(1);
executorService.submit(() -> listenToDiagnosticRuns());
}
@PreDestroy
public void stop()
{
executorService.shutdown();
}
private void listenToDiagnosticRuns()
{
while(!executorService.isShutdown())
{
diagnosticRunLock.lock(5, TimeUnit.SECONDS);
try
{
DiagnosticRun diagnosticRun = diagnosticRunQueue.poll(1L, TimeUnit.SECONDS);
productVersioningService.updateProductDeviceTable(diagnosticRun);
}
catch(InterruptedException e)
{
logger.error("Interrupted polling diagnosticRun queue", e);
}
finally
{
diagnosticRunLock.unlock();
}
}
}
}