Чтобы реализовать постепенное отключение, проверьте блокировку асинхронных вызовов или обработайте исключения? - PullRequest
2 голосов
/ 04 августа 2011

Я занимаюсь разработкой Java-приложения, которое в течение большей части времени, включая точку выключения, сталкивается с потоком входящих асинхронных вызовов из внешней среды. Во время нормальной работы эти входящие вызовы затем необходимо отправлять в другую среду, опять же асинхронно.

В данный момент мой модуль является «хорошим» гражданином и выполняет некоторую блокировку вокруг флага отключения, который после установки изящно прекратит отправку любых дальнейших исходящих вызовов.

Беспокойство вызывает то, что из-за того, что и входящие, и исходящие вызовы являются асинхронными, мне приходится заставлять каждую «рабочую» задачу выполнять два набора блокировок (см. Ниже), чтобы выполнить одинаковую проверку флага отключения ( EDIT *) 1006 *: Мне было указано в другом вопросе, что для использования семафоров требуется только один сбор / выпуск для каждого работника). Это работает, но есть много таких рабочих задач, и я беспокоюсь о совокупном снижении производительности. Профилирование появится в ближайшее время, когда структура будет немного расширена, но независимо от результата было бы хорошо следовать передовым методам.

Альтернативой является просто не делать блокировку с проверкой флага выключения и обрабатывать ожидаемые исключения, которые генерируются, когда внешние платформы отключаются до завершения обработки асинхронных вызовов. Я должен добавить, что при использовании этого подхода не будет пагубных эксплуатационных последствий. Оба метода приведут к чистому отключению.

Ваши идеи о том, какая практика лучше, пожалуйста? Тяжелая блокировка без исключений, в отличие от блокировки, но без исключения.

С блокировками код рабочего задания выглядит примерно так:

final private ReentrantReadWriteLock shutdownLock = new ReentrantReadWriteLock();
private boolean isShutdown;

private void workerTask()
{
   try
   {
      shutdownLock.readLock().lock();

      if (isShutdown)
         return;

      executeAsynchronously(new Runnable()
      {
         @Override
         final public void run()
         {
            try
            {
               shutdownLock.readLock().lock();

               if (isShutdown)
                  return;

               // Do something here.
            }
            finally
            {
               shutdownLock.readLock().unlock();
            }
         }
      });
   }
   finally
   {
      shutdownLock.readLock().unlock();
   }
}

Метод shutdown запрашивает shutdownLock.writeLock (), затем устанавливает флаг isShutdown.

Альтернатива без блокировки и ожидания исключений, генерируемых выключением, выглядит примерно так:

volatile private boolean isShutdown;

private void workerTask()
{
   try
   {
      executeAsynchronously(new Runnable()
      {
         @Override
         final public void run()
         {
            try
            {
               // Do something here.
            }
            catch (final FrameworkRuntimeException exception)
            {
               if ((! isShutdown) || (exception.type != 
                                      FrameworkRuntimeException.FrameworkIsDisposed))
                  throw exception;
            }
         }
      });
   }
   catch (final FrameworkRuntimeException exception)
   {
      if ((! isShutdown) || (exception.type != 
                             FrameworkRuntimeException.FrameworkIsDisposed))
         throw exception;
   }
}

Метод shutdown для этой реализации устанавливает для флага volatile isShutdown значение true.

Заранее спасибо за любые отзывы,

Russ

РЕДАКТИРОВАТЬ: Мне было полезно указать в другом вопросе, что я мог бы использовать семафор, чтобы избежать двойной блокировки в первом подходе, так что это не было бы так жестко, в конце концов, но вопрос все еще стоит.

Ответы [ 3 ]

1 голос
/ 21 июня 2013

В целом, я бы предпочел подход, при котором вы проверяете, что происходит отключение, а затем выполняете задачу.Если вы оптимистично, а затем отбрасываете исключения, которые, как вы знаете, вызваны отключением, вы рискуете ошибочно классифицировать ошибку и пропустить реальную проблему.

Что касается упрощения кода, вы можетеизбавиться от всех блокировок и просто убедиться, что ваш метод executeAsynchronously использует ExecutorService - тогда ваш метод shutdown просто вызывает shutdown в службе, создание задачи можно пропустить, если isShutdown вернет true иесли вам нужно подождать, пока задачи завершатся, прежде чем вернуться, вы можете использовать полезный метод awaitTermination.

0 голосов
/ 12 июля 2013

, если вы используете пружину ниже, код работает для корректного отключения. Вы можете изменить номера повторов.

package com.avea.vpspg.test.schedulers;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;

import com.avea.vpspg.core.VasProvLogger;

@Component
class ContextClosedHandler implements ApplicationListener<ContextClosedEvent> , ApplicationContextAware,BeanPostProcessor{


private ApplicationContext context;

public Logger logger = XProvLogger.getInstance().x;

public void onApplicationEvent(ContextClosedEvent event) {


    Map<String, ThreadPoolTaskScheduler> schedulers = context.getBeansOfType(ThreadPoolTaskScheduler.class);

    for (ThreadPoolTaskScheduler scheduler : schedulers.values()) {         
        scheduler.getScheduledExecutor().shutdown();
        try {
            scheduler.getScheduledExecutor().awaitTermination(20000, TimeUnit.MILLISECONDS);
            if(scheduler.getScheduledExecutor().isTerminated() || scheduler.getScheduledExecutor().isShutdown())
                logger.info("Scheduler "+scheduler.getThreadNamePrefix() + " has stoped");
            else{
                logger.info("Scheduler "+scheduler.getThreadNamePrefix() + " has not stoped normally and will be shut down immediately");
                scheduler.getScheduledExecutor().shutdownNow();
                logger.info("Scheduler "+scheduler.getThreadNamePrefix() + " has shut down immediately");
            }
        } catch (IllegalStateException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    Map<String, ThreadPoolTaskExecutor> executers = context.getBeansOfType(ThreadPoolTaskExecutor.class);

    for (ThreadPoolTaskExecutor executor: executers.values()) {
        int retryCount = 0;
        while(executor.getActiveCount()>0 && ++retryCount<51){
            try {
                logger.info("Executer "+executor.getThreadNamePrefix()+" is still working with active " + executor.getActiveCount()+" work. Retry count is "+retryCount);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        if(!(retryCount<51))
            logger.info("Executer "+executor.getThreadNamePrefix()+" is still working.Since Retry count exceeded max value "+retryCount+", will be killed immediately");
        executor.shutdown();
        logger.info("Executer "+executor.getThreadNamePrefix()+" with active " + executor.getActiveCount()+" work has killed");
    }
}


@Override
public void setApplicationContext(ApplicationContext context)
        throws BeansException {
    this.context = context;

}


@Override
public Object postProcessAfterInitialization(Object object, String arg1)
        throws BeansException {
    return object;
}


@Override
public Object postProcessBeforeInitialization(Object object, String arg1)
        throws BeansException {
    if(object instanceof ThreadPoolTaskScheduler)
        ((ThreadPoolTaskScheduler)object).setWaitForTasksToCompleteOnShutdown(true);
    if(object instanceof ThreadPoolTaskExecutor)
        ((ThreadPoolTaskExecutor)object).setWaitForTasksToCompleteOnShutdown(true);
    return object;
}
0 голосов
/ 04 августа 2011

Хорошо, так что я думаю, что это здесь должно работать и не препятствовать чрезмерным накладным расходам времени выполнения (обратите внимание, что сейчас 4:30 утра, так что лучше перепроверьте;)) Также обратите внимание, что добавление хороших блоков кода try {} finally {} было бы неплохой идеей, но для ясности опущено.

public static final AtomicInteger activeConnections = new AtomicInteger();
public static volatile boolean shutdown = false;

public static void shutdown() {
    shutdown = true;
    while (activeConnections.get() > 0) {
        synchronized(activeConnections) {
            try {
                activeConnections.wait();
            }
            catch(InterruptedException e) {
            }
        }
    }
    // proceed shutdown
}

public static void run() {
    if (shutdown) return;
    activeConnections.incrementAndGet();
    if (shutdown) {
        leave();
        return;
    }
    // do stuff
    leave();
}

private static void leave() {
    int outstandingConnections = activeConnections.decrementAndGet();
    if (shutdown && outstandingConnections == 0) {
        synchronized(activeConnections) {
            activeConnections.notifyAll();
        }
    }       
}

Как только установлен флаг завершения работы, новый поток не начинает работать. Каждый поток увеличивает целое число при взаимодействии с внешней структурой и уменьшает его по завершении. Завершение работы может продолжаться только после того, как поток больше не связывается - обратите внимание, что, поскольку сначала установлен флаг выключения, новый поток больше не будет запускаться.

Таким образом, вы получаете довольно легкий AtomicInteger (который реализован в виде цикла CAS, вы не можете получить намного меньше накладных расходов) и энергозависимый барьер памяти.

Теперь я все еще жду своего первого комментария и говорю, что проще, эффективнее и короче ловить исключения, но мне понравилась проблема:)

...