Я тестирую InfluxDB для хранения временных рядов датчиков. Я использую клиентскую библиотеку influenxdb-java (версия 2.15) и локально запускаю InfluxDB 1.7.6 для тестирования.
Все мои очки хранятся в файлах .csv (по одному на датчик), которые сами хранятся в файлах .zip (по одному на набор данных). Мой код проходит через каждую строку каждого файла CSV. Очки записываются в пакетном режиме.
/**
* Get the connection to the database
*/
InfluxDB influxDB = InfluxDBFactory.connect("http://192.168.51.51:8086");
influxDB.query(new Query("CREATE DATABASE theia_in_situ"));
influxDB.setDatabase("theia_in_situ");
influxDB.enableBatch();
influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
/**
* Create batch point to write each measure of the time serie more efficiently
*/
BatchPoints batchPoints = BatchPoints
.database("theia_in_situ")
.build();
Для каждого файла данных CSV выполняется следующий метод:
public static void createAndImportTimeSeriesDocuments(InputStream txtFileIn, String observationId, String producerId,
InfluxDB influxDB, BatchPoints batchPoints) throws IOException, ParseException {
DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
/**
* Store the variable name
*/
String observedProperty = null;
try (BufferedReader br = new BufferedReader(new InputStreamReader(txtFileIn));) {
String line = null;
/**
* Read the headers
*/
while ((line = br.readLine()).substring(0, 1).equals("#")) {
if (line.substring(0, 15).equals("#Variable_name;")) {
observedProperty = line.split(";")[1];
}
}
/**
* Read the data
*/
while ((line = br.readLine()) != null) {
String[] lineSplitted = line.split(";", -1);
Point point = Point.measurement(observedProperty)
.tag("producerId", producerId)
.tag("observationId", observationId)
.time(df.parse(lineSplitted[1]).getTime(), TimeUnit.MILLISECONDS)
.addField("value", lineSplitted[5])
.addField("flag", lineSplitted[6])
.build();
batchPoints.point(point);
}
influxDB.write(batchPoints);
}
}
Я могу написать одно или несколько измерений, но довольно скоро я получаю следующее исключение:
Исключение в потоке "main" org.influxdb.InfluxDBIOException: java.net.SocketException: сброс соединения по пиру: ошибка записи сокета
в org.influxdb.impl.InfluxDBImpl.execute (InfluxDBImpl.java:812)
atg.influxdb.impl.InfluxDBImpl.write (InfluxDBImpl.java:463)
Я уже отключил max-concurrent-write-limit, max-enqueued-write-limit, enqueued-write-timeout (устанавливая каждое значение на 0 в /etc/influxdb/influxdb.conf
), как упоминалось здесь .
Несмотря на то, что эта проблема упоминается как часто задаваемые вопросы на странице Github, я не могу найти ни одной проблемы, воспроизводящей мою проблему.
Любая помощь будет оценена.