Я пишу приложение Spring Boot (2.1.4), пытающееся использовать Spring Cloud Streams для Kafka.
Я пытаюсь вести список датчиков по одной теме ("датчики"«).OTOH, у меня есть входящие данные по другой теме («данные»).Я пытаюсь добиться того, чтобы, когда я получаю данные для датчика, которого у меня еще нет, я хочу добавить его в список датчиков.
Для этого я создаю KTable<String, Sensor>
изтема датчиков, сопоставьте тему температуры с чистыми данными датчика (в данном случае его именем) и выполните внешнее соединение с ValueJoiner
, который сохраняет датчик, если он присутствует, в противном случае используйте датчик чтения.Затем я записываю результат обратно в тему датчиков.
KTable<String, Sensor> sensorTable = ...;
KStream<String, SensorData> sensorDataStream = ...;
// get sensors providing measurements
KTable<String, Sensor> sensorsFromData =
sensorDataStream.groupByKey()
.aggregate(
Sensor::new,
(k, v, s) -> {
s.setName(k);
return s;
},
Materialized.with(Serdes.String(), SensorSerde.SERDE));
// join both sensor tables, preferring the existing ones
KTable<String, Sensor> joinedSensorTable =
sensorTable.outerJoin(
sensorsFromData,
// only use sensors from measurements if sensor not already present
(ex, ft) -> (ex != null) ? ex : ft,
Materialized.<String, Sensor, KeyValueStore<Bytes, byte[]>>as(SENSORS_TABLE)
.withKeySerde(Serdes.String()).withValueSerde(SensorSerde.SERDE));
// write to new topic for downstream services
joinedSensorTable.toStream();
Это прекрасно работает, если я создаю это с использованием StreamBuilder
- т.е. если sensorTable
и sensorDataStream
происходят из чего-то вроде builder.table("sensors", Consumed.with(Serdes.String(), SensorSerde.SERDE))
.
Однако я пытаюсь использовать Spring Stream Binding для этого, т.е. приведенный выше код обернут в
@Configuration
@EnableBinding(SensorTableBinding.class)
class StreamConfiguration {
static final String SENSORS_TABLE = "sensors-table";
@StreamListener
@SendTo("sensorsOut")
private KStream<String, Sensor> getDataFromData
(@Input("sensors") KTable<String, Sensor> sensorTable,
@Input("data") KStream<String, SensorData> sensorDataStream) {
// ...
return joinedSensorTable.toStream();
}
}
с
interface SensorTableBinding {
@Input("sensors")
KTable<String, Sensor> sensorStream();
@Output("sensorsOut")
KStream<String, Sensor> sensorOutput();
@Input("data")
KStream<String, SensorData> sensorDataStream();
}
Вот раздел весеннего потока application.properties:
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.binder.brokers: ${spring.kafka.bootstrap-servers}
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset: latest
spring.cloud.stream.kafka.binder.bindings.sensors.group: sensor-service
spring.cloud.stream.kafka.binder.bindings.sensors.destination: sensors
spring.cloud.stream.kafka.binder.bindings.sensorsOut.destination: sensors
spring.cloud.stream.kafka.binder.data.group: sensor-service
spring.cloud.stream.kafka.binder.data.destination: data
Поток отлично инициализируется, и соединение выполняется (хранилище ключей-значений заполнено правильно), однако результирующий поток имеет видникогда не писал в тему "датчики".
Почему?Я что-то упустил?
Кроме того: я уверен, что есть лучший способ де / сериализации моих объектов из / в JSON с использованием существующего Serde, вместо того, чтобы объявлять свои собственные классы для добавления в обработку(SensorSerde
/ SensorDataSerde
- это тонкая оболочка делегирования для ObjectMapper
)?