Spring Kafka Stream не пишется - PullRequest
1 голос
/ 03 мая 2019

Я пишу приложение Spring Boot (2.1.4), пытающееся использовать Spring Cloud Streams для Kafka.

Я пытаюсь вести список датчиков по одной теме ("датчики"«).OTOH, у меня есть входящие данные по другой теме («данные»).Я пытаюсь добиться того, чтобы, когда я получаю данные для датчика, которого у меня еще нет, я хочу добавить его в список датчиков.

Для этого я создаю KTable<String, Sensor> изтема датчиков, сопоставьте тему температуры с чистыми данными датчика (в данном случае его именем) и выполните внешнее соединение с ValueJoiner, который сохраняет датчик, если он присутствует, в противном случае используйте датчик чтения.Затем я записываю результат обратно в тему датчиков.

KTable<String, Sensor> sensorTable = ...;
KStream<String, SensorData> sensorDataStream = ...;

// get sensors providing measurements
KTable<String, Sensor> sensorsFromData =
        sensorDataStream.groupByKey()
                .aggregate(
                        Sensor::new,
                        (k, v, s) -> {
                            s.setName(k);
                            return s;
                        },
                        Materialized.with(Serdes.String(), SensorSerde.SERDE));

// join both sensor tables, preferring the existing ones
KTable<String, Sensor> joinedSensorTable =
        sensorTable.outerJoin(
                sensorsFromData,
                // only use sensors from measurements if sensor not already present
                (ex, ft) -> (ex != null) ? ex : ft,
                Materialized.<String, Sensor, KeyValueStore<Bytes, byte[]>>as(SENSORS_TABLE)
                        .withKeySerde(Serdes.String()).withValueSerde(SensorSerde.SERDE));

// write to new topic for downstream services
joinedSensorTable.toStream();

Это прекрасно работает, если я создаю это с использованием StreamBuilder - т.е. если sensorTable и sensorDataStream происходят из чего-то вроде builder.table("sensors", Consumed.with(Serdes.String(), SensorSerde.SERDE)).

Однако я пытаюсь использовать Spring Stream Binding для этого, т.е. приведенный выше код обернут в

@Configuration
@EnableBinding(SensorTableBinding.class)
class StreamConfiguration {
    static final String SENSORS_TABLE = "sensors-table";

    @StreamListener
    @SendTo("sensorsOut")
    private KStream<String, Sensor> getDataFromData
            (@Input("sensors") KTable<String, Sensor> sensorTable,
                    @Input("data") KStream<String, SensorData> sensorDataStream) {
        // ...
        return joinedSensorTable.toStream();
    }
}

с

interface SensorTableBinding {
    @Input("sensors")
    KTable<String, Sensor> sensorStream();

    @Output("sensorsOut")
    KStream<String, Sensor> sensorOutput();

    @Input("data")
    KStream<String, SensorData> sensorDataStream();
}

Вот раздел весеннего потока application.properties:

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde

spring.cloud.stream.kafka.binder.brokers: ${spring.kafka.bootstrap-servers}
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset: latest

spring.cloud.stream.kafka.binder.bindings.sensors.group: sensor-service
spring.cloud.stream.kafka.binder.bindings.sensors.destination: sensors
spring.cloud.stream.kafka.binder.bindings.sensorsOut.destination: sensors

spring.cloud.stream.kafka.binder.data.group: sensor-service
spring.cloud.stream.kafka.binder.data.destination: data

Поток отлично инициализируется, и соединение выполняется (хранилище ключей-значений заполнено правильно), однако результирующий поток имеет видникогда не писал в тему "датчики".

Почему?Я что-то упустил?

Кроме того: я уверен, что есть лучший способ де / сериализации моих объектов из / в JSON с использованием существующего Serde, вместо того, чтобы объявлять свои собственные классы для добавления в обработку(SensorSerde / SensorDataSerde - это тонкая оболочка делегирования для ObjectMapper)?

1 Ответ

0 голосов
/ 04 мая 2019

Оказывается, что данные были написаны в конце концов, но не в ту тему, а именно sensorOut.

Причиной была конфигурация. Вместо

spring.cloud.stream.kafka.binder.bindings.sensors.destination: sensors
spring.cloud.stream.kafka.binder.bindings.sensorsOut.destination: sensors

темы настроены так:

spring.cloud.stream.bindings.sensors.destination: sensors
spring.cloud.stream.bindings.sensorsOut.destination: sensors

Для темы датчиков и данных это не имело значения, потому что имя привязки было таким же, как тема; но так как Spring не смог найти правильное место назначения для вывода, он использовал имя привязки sensorOut и записал туда данные.

Как примечание, вся настройка конфигурации вокруг них очень запутанная. Отдельные элементы документированы, но трудно сказать для каждого, к какому префиксу конфигурации они принадлежат. Просмотр исходного кода также не помогает, потому что на этом уровне передаются Map s с ключом, удаленным из префикса во время выполнения, поэтому очень трудно сказать, откуда поступают данные и что они будут содержать .

IMO, это действительно помогло бы передавать действительные @ConfigurationProperties -подобные классы данных, которые бы упростили понимание.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...