У меня есть Java-процесс, который реализует Runnable и используется для подписки / извлечения из / для издателя / толкача zeromq, сбора данных на некоторое время, и как только определенное количество или записи достигаются, вставляет их в базу данных. Все работает нормально, но когда издатель / пушер перестает работать, обработка процесса и сбор данных немедленно прекращаются. После перезапуска издателя все работает нормально, но записи, собранные в памяти, не вставляются, потому что процесс немедленно останавливается. Это приводит к потере данных при перезапуске издателя, что является моей проблемой.
Я уже пробовал проверять прерывание потока, окружая код различными операторами try / catch и т. Д. Я действительно заблудился в отношении того, какие другие решения я могу попытаться реализовать, какие-либо предложения?
Вот соответствующая часть кода:
String address = "tcp://" + meta.getExporterAddress() + ":" + meta.getExporterPort();
ZMQ.Context context = ZMQ.context(10);
ZMQ.Socket socket;
if (exporterMode.toUpperCase().equals("STREAMER")) {
socket = context.socket(ZMQ.PULL);
socket.connect(address);
this.log.info("connected ZMQJob as PULL: " + exporterMode);
} else {
// BROKER
socket = context.socket(ZMQ.SUB);
socket.connect(address);
socket.subscribe(ZMQ.SUBSCRIPTION_ALL);
this.log.info("connected ZMQJob as SUB: " + exporterMode);
}
this.log.info("Started ZMQJob for " + meta.getTargetTableName());
long startTime = System.currentTimeMillis();
while (!Thread.currentThread().isInterrupted()) {
jsonString = Snappy.uncompress(socket.recv(0));
BufferedReader bufReader = new BufferedReader(new StringReader(new String(jsonString)));
while ((line = bufReader.readLine()) != null) {
jsonLines.add(line);
}
if (jsonLines.size() >= threshold || System.currentTimeMillis() > startTime + timeout * 1000) {
//db related code used for inserting here
}
}
Я думаю, что, возможно, использование другого условия для цикла while может быть решением, но я не уверен, какое условие.