У меня есть шина kafka, которая получает данные с датчиков iot. Количество сообщений, получаемых в минуту, близко к 1000. Мне нужно обогатить эти необработанные данные метаданными датчика, которые хранятся в neo4j. Пока что я делаю это с разъемом kafka-streams neo4j.
KStream<String, String> enrichedData = rawSensorStreamByUuid
.leftJoin(masterSensorTable, (sensorData, masterSensorData) -> {
try {
Object sensorObj = jsonParser.parse(sensorData);
JSONObject sensorJsonObj = (JSONObject) sensorObj;
if (masterSensorData == null || masterSensorData.isEmpty()) {
masterSensorData = "{\"companyCode\":\"EMPTY\"}";
}
Object masterObj = jsonParser.parse(masterSensorData);
JSONObject masterJsonObj = (JSONObject) masterObj;
sensorJsonObj.put("masterSensorData", masterJsonObj);
return sensorJsonObj.toString();
} catch (ParseException e) {
logger.error("error parsing while joining: ", e);
throw new RuntimeException(e);
}
})
Могу ли я подключиться к neo4j напрямую для такого обогащения для всех полученных данных? Может ли Neo4j справиться с такой нагрузкой?