Java Concurrency - лучший дизайн-подход для управления жизненным циклом потока (запуск / остановка) - PullRequest
2 голосов
/ 09 июня 2011

Я разрабатываю параллельное Java-приложение, которое считывает данные с различных медицинских устройств, доступных в интрасети больницы.

Я прочитал «Параллелизм Java на практике - Брайан Гетц ...», чтобы понять, какделать вещи, но я думаю, что я все еще что-то упускаю.

Вот быстрая простая диаграмма того, что я пытаюсь сделать, и ниже приведен фрагмент кода ..

Рабочие потоки (экземпляры MedicalDeviceData) непрерывно считывают данные с медицинских устройств и делают их доступными для MedicalDeviceWorkManager, который, в свою очередь, передает их конечному пользователю.
Рабочие потоки продолжают считывать данные бесконечно (в идеале), и "нет" работызавершена »ситуация по моему сценарию.Более того, пользователь может выбрать «Запустить все устройства» или запустить определенное устройство или остановить устройство, как и когда пожелает.

Ниже приведен фрагмент кода (компилируется, но не тестируется) того, как я его реализую.

MedicalDeviceWorkManager - порождает рабочие потоки и управляет ими.

MedicalDeviceData - рабочий поток бесконечно получает данные от медицинских устройств и обновляет этот экземпляр этого класса.

В основном смотрите на startDevice, stopDevice и методы запуска.

Вы, очевидно, заметите, что я не использую ThreadPoolExecutor и Future и что я просто развернул здесь свою собственную реализацию.

Поскольку блок future.get блокируется до завершения работы, в моем случае это не имеет смысла, поскольку мой рабочий поток никогда не «завершает» задачу ... это просто бесконечно продолжающаяся задача ...

ВОПРОС: Как изменить показанную ниже реализацию на более стандартизированную, чтобы я мог лучше использовать пакет java.util.concurrent (ThreadPoolExecutor / Future).

Какой-нибудь другой лучший шаблон дизайна, на который мне стоит посмотреть?

public class MedicalDeviceWorkManager {

  private ThreadGroup rootThreadGroup = null;
  Hashtable<String, MedicalDeviceData> deviceObjs = new Hashtable<String, MedicalDeviceData>();

  public void manageMedicalDevices() throws InterruptedException  {

    String[] allDevices={"Device1","Device2","Device3","Device4"};

    //-- Start all threads to collect data
    for(String deviceToStart:allDevices){
      this.startDevice(deviceToStart);
    }

    //-- Stop all threads 
    for(String deviceToStop:allDevices){
      this.stopDevice(deviceToStop);
    }

    //-- Start on request from user
    String deviceToStart="Device1";
    this.startDevice(deviceToStart);

    //-- Stop on request from user.
    String deviceToStop="Device1";
    this.stopDevice(deviceToStop);

    /* 
     * Get Data and give it to client 
     * This is happening via a separate TCP port
     * */
    while(true){
      for(String deviceName:allDevices){
        if(deviceObjs.get(deviceName)!=null){

          ConcurrentHashMap<String,BigDecimal> devData=deviceObjs.get(deviceName).getCollectedData();

          //--Loop and send data to client on TCP stream
          ;
        }
      }//-- loop the devices
    }//-- infinite
  }

  //-- Start the device to start acquiring data using a worker thread
  private void startDevice(String deviceName){
    //-- Get Device instance
    MedicalDeviceData thisDevice=deviceObjs.get(deviceName);
    if(thisDevice==null){
      thisDevice=new MedicalDeviceData(deviceName);
      deviceObjs.put(deviceName, thisDevice);
    }

    //-- Create thread to start data acquisition 
    //-- Start if not being processed already (Handle what if thread hung scenario later)
    if(this.getThread(deviceName)==null){
      Thread t=new Thread(thisDevice);
      t.setName(deviceName);
      t.start();          
    }
  }


  //-- Stop the worker thread thats collecting the data.
  private void stopDevice(String deviceName) throws InterruptedException {
    deviceObjs.get(deviceName).setShutdownRequested(true);
    Thread t=this.getThread(deviceName);
    t.interrupt();
    t.join(1000);
  }

  private Thread getThread( final String name ) {
    if ( name == null )
        throw new NullPointerException( "Null name" );
    final Thread[] threads = getAllThreads( );
    for ( Thread thread : threads )
        if ( thread.getName( ).equals( name ) )
            return thread;
    return null;
  }

  private ThreadGroup getRootThreadGroup( ) {
      if ( rootThreadGroup != null )
          return rootThreadGroup;
      ThreadGroup tg = Thread.currentThread( ).getThreadGroup( );
      ThreadGroup ptg;
      while ( (ptg = tg.getParent( )) != null )
          tg = ptg;
      return tg;
  } 

  private Thread[] getAllThreads( ) {
    final ThreadGroup root = getRootThreadGroup( );
    final ThreadMXBean thbean = ManagementFactory.getThreadMXBean( );
    int nAlloc = thbean.getThreadCount( );
    int n = 0;
    Thread[] threads;
    do {
        nAlloc *= 2;
        threads = new Thread[ nAlloc ];
        n = root.enumerate( threads, true );
    } while ( n == nAlloc );
    return java.util.Arrays.copyOf( threads, n );
  }

}//-- MedicalDeviceWorkManager




public class MedicalDeviceData implements Runnable{

  //-- Data Collected from medical device
  private final ConcurrentHashMap<String,BigDecimal> collectedData=new ConcurrentHashMap<String,BigDecimal>();

  //-- Set by Thread Manager to request a shutdown..after which it should interrupt the thread
  private AtomicBoolean shutdownRequested;

  //-- Simple data Counter
  private AtomicInteger dataCounter=new AtomicInteger(0);

  //-- Device Name
  private String thisDeviceName;

  public void run() {

    //-- Initialize I/O for the device
    ;

    while(!this.getShutdownRequested()){
      try{
        //-- just to compile the code
        Thread.sleep(0);

        //-- perform I/O operation to get data from medical device
        ;

        //-- Add data into the ConcurrentHashMap...Both key and value are immutable.
        collectedData.put("DataKey", new BigDecimal("9999"));

        //-- data counter
        dataCounter.getAndIncrement();

      }
      catch(InterruptedException ie){
        if(this.getShutdownRequested()){
          return;
        }
        //throw new InterruptedException();
      }
    }

  }//-- run

  public MedicalDeviceData(String thisDeviceName){
    this.thisDeviceName=thisDeviceName;
  }

  /**
   * @return the shutdownRequested
   */
  public boolean getShutdownRequested() {
    return this.shutdownRequested.get();
  }


  /**
   * @param shutdownRequested the shutdownRequested to set
   */
  public void setShutdownRequested(boolean shutdownRequested) {
    this.shutdownRequested.set(shutdownRequested);
  }


  /**
   * Both key and value are immutable, so ok to publish reference.
   * 
   * @return the collectedData
   */
  public ConcurrentHashMap<String, BigDecimal> getCollectedData() {
    return collectedData;
  }


  /**
   * @return the dataCounter
   */
  public AtomicInteger getDataCounter() {
    return dataCounter;
  }

}

Ответы [ 4 ]

2 голосов
/ 09 июня 2011

Так что пул потоков может или не может быть лучшим использованием здесь, потому что нет реальной абстракции работы.Однако это может быть интересным вариантом использования для расширения Thread.Я бы справился с общением с помощью простого jucLock и jucCondition.Наконец, я бы делегировал остановку и запуск определенным классам делегирования, которые принимают тип Runnable в качестве отдельных постоянных единиц работы.

Единственное отличие, которое я хотел бы использовать, - это использовать остановку для выключения и паузу для приостановки, при которой запуск просто продолжит работу.Например, это может выглядеть примерно так:

public class MedicalDeviceWorkManager {

   private ConcurrentHashMap<String, DelegatingThread> devices = new ConcurrentHashMap<...>();

   public synchronized void registerDevice(String device, Runnable singleUnitOfWork){
         DelegatingThread worker = new DelegatingThread(singleUnitOfWork);
         devices.put(device,worker);

         worker.start();

   } 
   public void startDevice(String device){
        devices.get(device).startDevice();
   }    
   public void stopDevice(String device){
        devices.get(device).stopDevice();
   }    
   public void pauseDevice(String device){
        devices.get(device).pauseDevice();
   }    

   private static class DelegatingThread extends Thread{
      final Lock lock = new ReentrantLock();
      final Condition condition = lock.newCondition();
      final Runnable r;
      boolean paused = true;
      boolean stopped =false;
      private DelegatingThread (Runnable r){ this.r = r; }
      @Override
      public void run(){
          while(true){

             lock.lock(); 
             while(paused)
              condition.await(); 

             if(stopped) return;

             lock.unlock();

             r.run();
          }
      }
      private void startDevice(){
            lock.lock(); 
            paused = false;
            condition.signal(); 
            lock.unlock();
      }
      private void pauseDevice(){
            lock.lock(); 
            if(!stopped)
               paused = true;
            lock.unlock();
      }
      private void stopDevice(){
            lock.lock(); 
            stopped= true;
            paused=false;
            condition.signal();
            lock.unlock();
      }
   } 
}

Я понимаю, что это много кода для демонстрации, но здесь действуют те же принципы, что и в пуле потоков.Вы всегда будете создавать один поток для каждого устройства и использовать его при необходимости.Runnable будет единственной функцией, которую вы запускаете между циклом while.

Кроме того, я не включил семантику try / finally для краткости.

1 голос
/ 10 июня 2011

Комментарий Роба заставил меня задуматься о возможном альтернативном решении. Вы можете использовать ExecutorService, в которой запуск будет представлен указанной службе. Если исполняемый файл завершается, а пауза или остановка не были выбраны, то вызываемый объект снова отправляется в службу. Таким образом, теперь у вас фактически будет только заранее определенное количество потоков, работающих с последовательной согласованностью при выполнении работы.

Большая часть кода остается прежней, только базовое исполнение будет отличаться.

public class MedicalDeviceWorkManager {

   private ConcurrentHashMap<String, DelegatedWorker > devices = new ConcurrentHashMap<...>();

   private final ExecutorService worker = Executors.newFixedThreadPool(10);

   public void registerDevice(String device, Runnable singleUnitOfWork){
         devices.put(device,new DelegatedWorker (singleUnitOfWork));
   } 

   public void startDevice(String device){
        devices.get(device).startDevice();
   }    
   public void stopDevice(String device){
        devices.remove(device).stopDevice();
   }    
   public void pauseDevice(String device){
        devices.get(device).pauseDevice();
   }    
   private class DelegatedWorker { 
      private final Runnable r;
      private volatile Future<?> running;
      DelegatedWorker (Runnable r) {this.r =r ;}

       public void startDevice(){
            if(running != null){
               running = worker.submit(new Callable<Object>(){

                public Object call(){
                     if(Thread.interrupted())return null;
                        r.run();
                     if(Thread.interrupted())return null;

                     return worker.submit(this);
                   }
              });
             }           
       }    
       public void stopDevice(){
            pauseDevice();
       }    
       public void pauseDevice(){
          if(running!=null){ 
              running.cancel(true);
              running = null;
           }
       }    
     }
   }

Имейте в виду, что вместо изменчивого будущего вам может потребоваться тот, который вы заблокируете из-за составной функции, включенной в pauseDevice()

1 голос
/ 09 июня 2011

Это много кода, поэтому я должен признаться, что я его не прочитал, но из того, что я прочитал, я думаю, вы могли бы воспользоваться этим подходом.

a) Рассматривать каждое устройство как «актера», т. Е. Иметь один поток, который взаимодействует с реальными устройствами. Этот поток будет считывать данные с устройств, и всякий раз, когда происходит обновление, отправляйте его в "manager-service". Для этого вы можете найти AbstractService и ExecutorService.newSingleThreadScheduledExecutor () . (AbstractService дает вам удобный интерфейс для запуска / остановки устройства. Это будет отлично для ваших требований). Назовем их «девайс-сервис».

б) Иметь «менеджер-сервис», который хранит набор «сервис-устройств» в коллекции. "manager-service" должен быть снова актером, имеющим свои собственные потоки (один поток звучит нормально из того, что я прочитал). Служба executor в менеджере-сервисе будет извлекать из своей очереди обновления от устройств-сервисов и делать все необходимое (отправлять и т. Д.). Эта очередь заполняется сервисами устройства и опрашивается сервисом менеджера (классический шаблон производитель / потребитель).

Итак, вот так:

class Manager extends AbstractService {
    final ConcurrentMap<Device, DeviceService> deviceServices = // blah
    final ExecutorService exec = // single thread exec

    //called by API user
    newDevice(Device device){
        DeviceService service = new DeviceService(this,device);
        deviceServices.put(device,service);
        service.start();
    }

    //called only by device-services
    void notifyUpdate(final Device device, final Data data){
        Runnable whatever = new Runnable(){
            public void run(){
                sendStuff.(device.name(), data);
            }
        }
        exec.execute(whatever);
    }

    // blah blah blah blah....

    // called by API user
    public void stop(Device device){
        deviceServices.get(device).stop();
    }

и

class DeviceService extends AbstractService {
    final Manager mng;
    final Device device;
    final ScheduledExecutorService poller = // single thread
    DeviceService(Manager mng, Device device){
        //blah
    }

    // schedule a task that polls device and calls "notifyUpdate" of mng
    // Whenever there is an update.
    void start(){
        poller.scheduleWithFixedDelay(
           // poll device, and then call mng.notifyUpdate(device, data)

    // blah blah blah blah....
1 голос
/ 09 июня 2011

Я бы поставил под сомнение ваше заключение о том, что пакет для параллелизма не предназначен для ваших нужд.Мой опыт показывает, что то, что есть (спасибо Дугу Ли и др.), До абсурда завершено.

Вся идея пула потоков и интерфейса Future состоит в том, чтобы потоки выполняли задачи, которые потенциально являются небольшой частью большего целого .Эта идея лежит в основе NIO, а теперь и продолжений в Servlet 3.0.Подумайте о загрузке файла.При использовании сервлета 2.x поток блокируется для управления каждым байтом и не может ничего делать между пакетами.С продолжениями каждый пакет является частью работы, и потоки могут делать между ними другие вещи.

У меня была возможность использовать проект JSpider несколько лет назад.Он был написан со старым кодом, до JDK 5. Я боролся с ним, затем выбросил и сделал новый, используя 5 и threadpools и Future.Это небольшой объем кода, и он прекрасно работает.

Для меня очевидная адаптация заключается в том, что вы должны читать куски данных.Сделайте размер чанков настраиваемым.Вы можете подумать, что это звучит менее эффективно.Гарантировать вам это не будет.Даже скромный контроллер UART на модемах имел буфер.Заполните его, затем переместите содержимое, затем заполните его снова.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...