Я разрабатываю параллельное Java-приложение, которое считывает данные с различных медицинских устройств, доступных в интрасети больницы.
Я прочитал «Параллелизм Java на практике - Брайан Гетц ...», чтобы понять, какделать вещи, но я думаю, что я все еще что-то упускаю.
Вот быстрая простая диаграмма того, что я пытаюсь сделать, и ниже приведен фрагмент кода ..
Рабочие потоки (экземпляры MedicalDeviceData) непрерывно считывают данные с медицинских устройств и делают их доступными для MedicalDeviceWorkManager, который, в свою очередь, передает их конечному пользователю.
Рабочие потоки продолжают считывать данные бесконечно (в идеале), и "нет" работызавершена »ситуация по моему сценарию.Более того, пользователь может выбрать «Запустить все устройства» или запустить определенное устройство или остановить устройство, как и когда пожелает.
Ниже приведен фрагмент кода (компилируется, но не тестируется) того, как я его реализую.
MedicalDeviceWorkManager - порождает рабочие потоки и управляет ими.
MedicalDeviceData - рабочий поток бесконечно получает данные от медицинских устройств и обновляет этот экземпляр этого класса.
В основном смотрите на startDevice, stopDevice и методы запуска.
Вы, очевидно, заметите, что я не использую ThreadPoolExecutor и Future и что я просто развернул здесь свою собственную реализацию.
Поскольку блок future.get блокируется до завершения работы, в моем случае это не имеет смысла, поскольку мой рабочий поток никогда не «завершает» задачу ... это просто бесконечно продолжающаяся задача ...
ВОПРОС: Как изменить показанную ниже реализацию на более стандартизированную, чтобы я мог лучше использовать пакет java.util.concurrent (ThreadPoolExecutor / Future).
Какой-нибудь другой лучший шаблон дизайна, на который мне стоит посмотреть?
public class MedicalDeviceWorkManager {
private ThreadGroup rootThreadGroup = null;
Hashtable<String, MedicalDeviceData> deviceObjs = new Hashtable<String, MedicalDeviceData>();
public void manageMedicalDevices() throws InterruptedException {
String[] allDevices={"Device1","Device2","Device3","Device4"};
//-- Start all threads to collect data
for(String deviceToStart:allDevices){
this.startDevice(deviceToStart);
}
//-- Stop all threads
for(String deviceToStop:allDevices){
this.stopDevice(deviceToStop);
}
//-- Start on request from user
String deviceToStart="Device1";
this.startDevice(deviceToStart);
//-- Stop on request from user.
String deviceToStop="Device1";
this.stopDevice(deviceToStop);
/*
* Get Data and give it to client
* This is happening via a separate TCP port
* */
while(true){
for(String deviceName:allDevices){
if(deviceObjs.get(deviceName)!=null){
ConcurrentHashMap<String,BigDecimal> devData=deviceObjs.get(deviceName).getCollectedData();
//--Loop and send data to client on TCP stream
;
}
}//-- loop the devices
}//-- infinite
}
//-- Start the device to start acquiring data using a worker thread
private void startDevice(String deviceName){
//-- Get Device instance
MedicalDeviceData thisDevice=deviceObjs.get(deviceName);
if(thisDevice==null){
thisDevice=new MedicalDeviceData(deviceName);
deviceObjs.put(deviceName, thisDevice);
}
//-- Create thread to start data acquisition
//-- Start if not being processed already (Handle what if thread hung scenario later)
if(this.getThread(deviceName)==null){
Thread t=new Thread(thisDevice);
t.setName(deviceName);
t.start();
}
}
//-- Stop the worker thread thats collecting the data.
private void stopDevice(String deviceName) throws InterruptedException {
deviceObjs.get(deviceName).setShutdownRequested(true);
Thread t=this.getThread(deviceName);
t.interrupt();
t.join(1000);
}
private Thread getThread( final String name ) {
if ( name == null )
throw new NullPointerException( "Null name" );
final Thread[] threads = getAllThreads( );
for ( Thread thread : threads )
if ( thread.getName( ).equals( name ) )
return thread;
return null;
}
private ThreadGroup getRootThreadGroup( ) {
if ( rootThreadGroup != null )
return rootThreadGroup;
ThreadGroup tg = Thread.currentThread( ).getThreadGroup( );
ThreadGroup ptg;
while ( (ptg = tg.getParent( )) != null )
tg = ptg;
return tg;
}
private Thread[] getAllThreads( ) {
final ThreadGroup root = getRootThreadGroup( );
final ThreadMXBean thbean = ManagementFactory.getThreadMXBean( );
int nAlloc = thbean.getThreadCount( );
int n = 0;
Thread[] threads;
do {
nAlloc *= 2;
threads = new Thread[ nAlloc ];
n = root.enumerate( threads, true );
} while ( n == nAlloc );
return java.util.Arrays.copyOf( threads, n );
}
}//-- MedicalDeviceWorkManager
public class MedicalDeviceData implements Runnable{
//-- Data Collected from medical device
private final ConcurrentHashMap<String,BigDecimal> collectedData=new ConcurrentHashMap<String,BigDecimal>();
//-- Set by Thread Manager to request a shutdown..after which it should interrupt the thread
private AtomicBoolean shutdownRequested;
//-- Simple data Counter
private AtomicInteger dataCounter=new AtomicInteger(0);
//-- Device Name
private String thisDeviceName;
public void run() {
//-- Initialize I/O for the device
;
while(!this.getShutdownRequested()){
try{
//-- just to compile the code
Thread.sleep(0);
//-- perform I/O operation to get data from medical device
;
//-- Add data into the ConcurrentHashMap...Both key and value are immutable.
collectedData.put("DataKey", new BigDecimal("9999"));
//-- data counter
dataCounter.getAndIncrement();
}
catch(InterruptedException ie){
if(this.getShutdownRequested()){
return;
}
//throw new InterruptedException();
}
}
}//-- run
public MedicalDeviceData(String thisDeviceName){
this.thisDeviceName=thisDeviceName;
}
/**
* @return the shutdownRequested
*/
public boolean getShutdownRequested() {
return this.shutdownRequested.get();
}
/**
* @param shutdownRequested the shutdownRequested to set
*/
public void setShutdownRequested(boolean shutdownRequested) {
this.shutdownRequested.set(shutdownRequested);
}
/**
* Both key and value are immutable, so ok to publish reference.
*
* @return the collectedData
*/
public ConcurrentHashMap<String, BigDecimal> getCollectedData() {
return collectedData;
}
/**
* @return the dataCounter
*/
public AtomicInteger getDataCounter() {
return dataCounter;
}
}