Java runnable умирает при отключении издателя zeromq - PullRequest
0 голосов
/ 17 января 2019

У меня есть Java-процесс, который реализует Runnable и используется для подписки / извлечения из / для издателя / толкача zeromq, сбора данных на некоторое время, и как только определенное количество или записи достигаются, вставляет их в базу данных. Все работает нормально, но когда издатель / пушер перестает работать, обработка процесса и сбор данных немедленно прекращаются. После перезапуска издателя все работает нормально, но записи, собранные в памяти, не вставляются, потому что процесс немедленно останавливается. Это приводит к потере данных при перезапуске издателя, что является моей проблемой.

Я уже пробовал проверять прерывание потока, окружая код различными операторами try / catch и т. Д. Я действительно заблудился в отношении того, какие другие решения я могу попытаться реализовать, какие-либо предложения?

Вот соответствующая часть кода:

    String address = "tcp://" + meta.getExporterAddress() + ":" + meta.getExporterPort();
    ZMQ.Context context = ZMQ.context(10);
    ZMQ.Socket socket;

    if (exporterMode.toUpperCase().equals("STREAMER")) {
        socket = context.socket(ZMQ.PULL);
        socket.connect(address);
        this.log.info("connected ZMQJob as PULL: " + exporterMode);
    } else {
        // BROKER
        socket = context.socket(ZMQ.SUB);
        socket.connect(address);
        socket.subscribe(ZMQ.SUBSCRIPTION_ALL);
        this.log.info("connected ZMQJob as SUB: " + exporterMode);
    }

    this.log.info("Started ZMQJob for " + meta.getTargetTableName());

    long startTime = System.currentTimeMillis();
    while (!Thread.currentThread().isInterrupted()) {
        jsonString = Snappy.uncompress(socket.recv(0));
        BufferedReader bufReader = new BufferedReader(new StringReader(new String(jsonString)));

        while ((line = bufReader.readLine()) != null) {
            jsonLines.add(line);
        }

        if (jsonLines.size() >= threshold || System.currentTimeMillis() > startTime + timeout * 1000) {
            //db related code used for inserting here
        }
        }

Я думаю, что, возможно, использование другого условия для цикла while может быть решением, но я не уверен, какое условие.

...