influenx-java: org.influxdb.InfluxDBIOException: java.net.SocketException: сброс соединения по пиру: ошибка записи сокета - PullRequest
0 голосов
/ 25 апреля 2019

Я тестирую InfluxDB для хранения временных рядов датчиков. Я использую клиентскую библиотеку influenxdb-java (версия 2.15) и локально запускаю InfluxDB 1.7.6 для тестирования.

Все мои очки хранятся в файлах .csv (по одному на датчик), которые сами хранятся в файлах .zip (по одному на набор данных). Мой код проходит через каждую строку каждого файла CSV. Очки записываются в пакетном режиме.

/**
 * Get the connection to the database
 */
InfluxDB influxDB = InfluxDBFactory.connect("http://192.168.51.51:8086");
influxDB.query(new Query("CREATE DATABASE theia_in_situ"));
influxDB.setDatabase("theia_in_situ");
influxDB.enableBatch();
influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
/**
 * Create batch point to write each measure of the time serie more efficiently
 */
BatchPoints batchPoints = BatchPoints
        .database("theia_in_situ")
        .build();

Для каждого файла данных CSV выполняется следующий метод:

public static void createAndImportTimeSeriesDocuments(InputStream txtFileIn, String observationId, String producerId,
        InfluxDB influxDB, BatchPoints batchPoints) throws IOException, ParseException {
    DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
    /**
     * Store the variable name
     */
    String observedProperty = null;
    try (BufferedReader br = new BufferedReader(new InputStreamReader(txtFileIn));) {
        String line = null;
        /**
         * Read the headers
         */
        while ((line = br.readLine()).substring(0, 1).equals("#")) {
            if (line.substring(0, 15).equals("#Variable_name;")) {
                observedProperty = line.split(";")[1];
            }
        }
        /**
         * Read the data
         */
        while ((line = br.readLine()) != null) {
            String[] lineSplitted = line.split(";", -1);
            Point point = Point.measurement(observedProperty)
                    .tag("producerId", producerId)
                    .tag("observationId", observationId)
                    .time(df.parse(lineSplitted[1]).getTime(), TimeUnit.MILLISECONDS)
                    .addField("value", lineSplitted[5])
                    .addField("flag", lineSplitted[6])
                    .build();
            batchPoints.point(point);
        }
        influxDB.write(batchPoints);
    }
}

Я могу написать одно или несколько измерений, но довольно скоро я получаю следующее исключение:

Исключение в потоке "main" org.influxdb.InfluxDBIOException: java.net.SocketException: сброс соединения по пиру: ошибка записи сокета в org.influxdb.impl.InfluxDBImpl.execute (InfluxDBImpl.java:812) atg.influxdb.impl.InfluxDBImpl.write (InfluxDBImpl.java:463)

Я уже отключил max-concurrent-write-limit, max-enqueued-write-limit, enqueued-write-timeout (устанавливая каждое значение на 0 в /etc/influxdb/influxdb.conf), как упоминалось здесь . Несмотря на то, что эта проблема упоминается как часто задаваемые вопросы на странице Github, я не могу найти ни одной проблемы, воспроизводящей мою проблему.

Любая помощь будет оценена.

1 Ответ

0 голосов
/ 25 апреля 2019

Это исключение возникает при попытке записи BatchPoint в пакетном режиме.

Клиент influenx-java хранит ваши записи во внутреннем буфер и асинхронно сбрасывает их в InfluxDB при фиксированной очистке интервал для достижения хорошей производительности на стороне клиента и сервера.

Вот обновленный фрагмент кода.

/**
 * Read the data
 */
while ((line = br.readLine()) != null) {
    String[] lineSplitted = line.split(";", -1);
    Point point = Point.measurement(observedProperty)
            .tag("producerId", producerId)
            .tag("observationId", observationId)
            .time(df.parse(lineSplitted[1]).getTime(), TimeUnit.MILLISECONDS)
            .addField("value", lineSplitted[5])
            .addField("flag", lineSplitted[6])
            .build();
    influxDB.write(point);
  //  batchPoints.point(point);
}
//influxDB.write(batchPoints);
...