Использование neo4j для обогащения данных потока kafka - PullRequest
0 голосов
/ 16 октября 2019

У меня есть шина kafka, которая получает данные с датчиков iot. Количество сообщений, получаемых в минуту, близко к 1000. Мне нужно обогатить эти необработанные данные метаданными датчика, которые хранятся в neo4j. Пока что я делаю это с разъемом kafka-streams neo4j.

KStream<String, String> enrichedData = rawSensorStreamByUuid
                .leftJoin(masterSensorTable, (sensorData, masterSensorData) -> {
                    try {
                        Object sensorObj = jsonParser.parse(sensorData);
                        JSONObject sensorJsonObj = (JSONObject) sensorObj;
                        if (masterSensorData == null || masterSensorData.isEmpty()) {
                            masterSensorData = "{\"companyCode\":\"EMPTY\"}";
                        }
                        Object masterObj = jsonParser.parse(masterSensorData);
                        JSONObject masterJsonObj = (JSONObject) masterObj;
                        sensorJsonObj.put("masterSensorData", masterJsonObj);
                        return sensorJsonObj.toString();
                    } catch (ParseException e) {
                        logger.error("error parsing while joining: ", e);
                        throw new RuntimeException(e);
                    }
                })

Могу ли я подключиться к neo4j напрямую для такого обогащения для всех полученных данных? Может ли Neo4j справиться с такой нагрузкой?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...