Редактировать
Этот вопрос уже прошел несколько итераций, поэтому не стесняйтесь просматривать ревизии, чтобы увидеть некоторую справочную информацию об истории и попытках.
Я использую CompletionService вместе с ExecutorService и Callable, чтобы одновременно вызывать несколько функций на нескольких различных веб-сервисах через код, сгенерированный CXF. Все эти сервисы предоставляют различную информацию для единого набора информации, которую я ' использую для моего проекта. Однако службы могут не отвечать в течение длительного периода времени без исключения, что продлевает время ожидания для объединенного набора информации.
Чтобы противостоять этому, я запускаю все сервисные вызовы одновременно, и через несколько минут хотел бы завершить все вызовы, которые еще не завершены, и, предпочтительно, записать, какие из них еще не были выполнены, либо из вызываемого объекта. или с помощью подробного исключения.
Вот очень упрощенный код, иллюстрирующий то, что я уже делаю:
private Callable<List<Feature>> getXXXFeatures(final WiwsPortType port,
final String accessionCode) {
return new Callable<List<Feature>>() {
@Override
public List<Feature> call() throws Exception {
List<Feature> features = new ArrayList<Feature>();
//getXXXFeatures are methods of the WS Proxy
//that can take anywhere from second to never to return
for (RawFeature raw : port.getXXXFeatures(accessionCode)) {
Feature ft = convertFeature(raw);
features.add(ft);
}
if (Thread.currentThread().isInterrupted())
log.error("XXX was interrupted");
return features;
}
};
}
И код, который одновременно запускает WS, вызывает:
WiwsPortType port = new Wiws().getWiws();
List<Future<List<Feature>>> ftList = new ArrayList<Future<List<Feature>>>();
//Counting wrapper around CompletionService,
//so I could implement ccs.hasRemaining()
CountingCompletionService<List<Feature>> ccs =
new CountingCompletionService<List<Feature>>(threadpool);
ftList.add(ccs.submit(getXXXFeatures(port, accessionCode)));
ftList.add(ccs.submit(getYYYFeatures(port accessionCode)));
ftList.add(ccs.submit(getZZZFeatures(port, accessionCode)));
List<Feature> allFeatures = new ArrayList<Feature>();
while (ccs.hasRemaining()) {
//Low for testing, eventually a little more lenient
Future<List<Feature>> polled = ccs.poll(5, TimeUnit.SECONDS);
if (polled != null)
allFeatures.addAll(polled.get());
else {
//Still jobs remaining, but unresponsive: Cancel them all
int jobsCanceled = 0;
for (Future<List<Feature>> job : ftList)
if (job.cancel(true))
jobsCanceled++;
log.error("Canceled {} feature jobs because they took too long",
jobsCanceled);
break;
}
}
Проблема с этим кодом заключается в том, что Callables фактически не отменяются при ожидании возврата port.getXXXFeatures (...), но каким-то образом продолжают работать. Как видно из операторов if (Thread.currentThread().isInterrupted()) log.error("XXX was interrupted");
, флаг прерывания равен , установленному после возврата port.getFeatures, он доступен только после нормального завершения вызова Webservice, а не прерывался, когда я вызвал Cancel.
Может кто-нибудь сказать мне, что я делаю неправильно, и как я могу остановить текущий вызов CXF Webservice после определенного периода времени и зарегистрировать эту информацию в моем приложении?
С уважением, Тим