Можете ли вы получить доступ к очереди Hazelcast из ItemListener? - PullRequest
0 голосов
/ 22 сентября 2018

У меня есть сценарий использования, когда у меня есть набор элементов DiagnosticRun s, которые отправляются в мой кластер.Я хочу обрабатывать их поочередно (чтобы избежать конфликтов).Я пытаюсь использовать Hazelcast Queue, защищенный Lock, чтобы убедиться, что элементы обрабатываются по одному.Hazelcast работает во встроенном режиме в моем кластере.Если я зарегистрирую ItemListener на Queue, безопасно ли вызывать take() на Queue из метода itemAdded()?Например:

@Component      
public class DistributedQueueListener
{
    public static final String DIAGNOSTICS_RUN_QUEUE_NAME = "diagnosticRun";

    @Autowired
    private HazelcastInstance hazelcast;

    @Autowired
    private ProductVersioningService productVersioningService;

    private IQueue<DiagnosticRun> diagnosticRunQueue;
    private ILock diagnosticRunLock;
    private String diagnosticRunListenerId;

    @PostConstruct
    public void init()
    {
        diagnosticRunQueue = hazelcast.getQueue(DIAGNOSTICS_RUN_QUEUE_NAME);
        diagnosticRunLock = hazelcast.getLock("diagnosticRunLock");
        diagnosticRunListenerId = diagnosticRunQueue.addItemListener(new DiagnosticRunListener(), false);
    }

    @PreDestroy
    public void stop()
    {
        diagnosticRunQueue.removeItemListener(diagnosticRunListenerId);
    }

    public class DiagnosticRunListener implements ItemListener<DiagnosticRun>
    {
        @Override
        public void itemAdded(ItemEvent<diagnosticRun> item)
        {
            diagnosticRunLock.lock(5, TimeUnit.SECONDS);
            try
            {
                DiagnosticRun diagnosticRun = diagnosticRunQueue.poll();
                if(diagnosticRun != null)
                {
                    productVersioningService.updateProductDeviceTable(diagnosticRun);
                }
            }
            finally
            {
                diagnosticRunLock.unlock();
            }
        }

        @Override
        public void itemRemoved(ItemEvent<diagnosticRun> item)
        {
        }   
    }       
}

Я не уверен, является ли потокобезопасным вызовить take() на Queue из этого места и потока.

Если это не разрешено, я 'Мне придется настроить свой собственный длительный цикл на poll() Queue.Я не уверен, каков наилучший способ настроить длительный поток в приложении Spring Boot.Предполагая, что приведенный выше метод не работает, будет ли приведенный ниже код безопасен для потоков?Или есть лучший способ сделать это?

@Component      
public class DistributedQueueListener
{
    public static final String DIAGNOSTIC_RUN_QUEUE_NAME = "diagnosticRun";

    @Autowired
    private HazelcastInstance hazelcast;

    @Autowired
    private ProductVersioningService productVersioningService;

    private IQueue<diagnosticRun> diagnosticRunQueue;
    private ILock diagnosticRunLock;

    private ExecutorService executorService;

    @PostConstruct
    public void init()
    {
        diagnosticRunQueue = hazelcast.getQueue(DIAGNOSTIC_RUN_QUEUE_NAME);
        diagnosticRunLock = hazelcast.getLock("diagnosticRunLock");

        executorService = Executors.newFixedThreadPool(1);
        executorService.submit(() -> listenToDiagnosticRuns());
    }

    @PreDestroy
    public void stop()
    {
        executorService.shutdown();
    }

    private void listenToDiagnosticRuns()
    {
        while(!executorService.isShutdown())
        {
            diagnosticRunLock.lock(5, TimeUnit.SECONDS);
            try
            {
                DiagnosticRun diagnosticRun = diagnosticRunQueue.poll(1L, TimeUnit.SECONDS);
                productVersioningService.updateProductDeviceTable(diagnosticRun);
            }
            catch(InterruptedException e)
            {
                logger.error("Interrupted polling diagnosticRun queue", e);
            }
            finally
            {
                diagnosticRunLock.unlock();
            }
        }
    }
}

1 Ответ

0 голосов
/ 24 сентября 2018

Сначала я укажу, что я не совсем эксперт, над какими потоками они выполняются, и когда это так, некоторые могут не согласиться, но вот мои мысли по этому поводу, так что, пожалуйста, присоединяйтесь, так как это выглядит как интересный случай.,Ваше первое решение смешивает потоки событий Hazelcast с потоками операций.Фактически вы запускаете три операции, которые будут вызваны в результате одного события.Если вы добавите произвольную задержку в свой вызов updateProcductDeviceTable, вы увидите, что в конце концов он замедлится, но через некоторое время возобновит работу.Это приведет к тому, что ваша локальная очередь событий будет накапливаться во время вызова операций.Вы можете поместить все, что вы делаете, в отдельный поток, который вы можете «разбудить» в #itemAdded или, если можете позволить себе немного задержаться, сделать то, что вы делаете в своем втором решении.Однако я бы внес пару изменений в метод

listenToDiagnosticsRuns ():

private void listenToDiagnosticRuns()
{
    while(!executorService.isShutdown())
    {
        if(diagnosticRunQueue.peek() != null)
       {
           diagnosticRunLock.lock(5, TimeUnit.SECONDS);
           try
           {
               DiagnosticRun diagnosticRun = diagnosticRunQueue.poll(1L, TimeUnit.SECONDS);
               if(diagnosticRun != null)
               {
                   productVersioningService.updateProductDeviceTable(diagnosticRun);
               }
           }
           catch(InterruptedException e)
           {
               logger.error("Interrupted polling diagnosticRun queue", e);
           }
           finally
           {
               diagnosticRunLock.unlock();
           }
        } // peek != null
        else
        {
            try
            {
                Thread.sleep(5000);
            }
            catch (InterruptedException e)
            {
                 //do nothing
            }
        }
    }
}
...