У меня есть записи, которые обрабатываются с помощью Kafka Streams (с помощью Processor API).Допустим, запись имеет city_id
и некоторые другие поля.
В приложении Kafka Streams я хочу добавить текущую температуру в целевом городе к записи.
Temperature<->City
пары хранятся, например, в.Postgres.
В приложении Java я могу подключиться к Postgres с помощью JDBC и собрать new HashMap<CityId, Temperature>
, поэтому я могу искать температуру на основе city_id
.Что-то вроде tempHM.get(record.city_id)
.
Есть несколько вопросов, как лучше всего к нему подойти:
Где инициировать данные контекста?
Первоначально я делал это в AbstractProcessor::init()
но это кажется неправильным, поскольку он инициализируется для каждого потока, а также повторно инициализируется при перебалансировке.
Поэтому я переместил его до того, как построитель топологии потоков и процессоры будут построены с ним.Данные извлекаются только один раз независимо для всех экземпляров процессора.
Это правильный и допустимый подход.Это работает, но ...
HashMap<CityId, Temperature> tempHM = new HashMap<CityId, Temperature>;
// Connect to DB and initialize tempHM here
Topology topology = new Topology();
topology
.addSource(SOURCE, stringDerializer, protoDeserializer, "topic-in")
.addProcessor(TemperatureAppender.NAME, () -> new TemperatureAppender(tempHm), SOURCE)
.addSink(SINK, "topic-out", stringSerializer, protoSerializer, TemperatureAppender.NAME)
;
Как обновить данные контекста?
Я хотел бы, например, обновлять данные о температуре каждые 15 минут.Я думал об использовании контейнера Hashmap вместо Hashmap, который бы справился с этим:
abstract class ContextContainer<T> {
T context;
Date lastRefreshAt;
ContextContainer(Date now) {
refresh(now);
}
abstract void refresh(Date now);
abstract Duration getRefreshInterval();
T get() {
return context;
}
boolean isDueToRefresh(Date now) {
return lastRefreshAt == null
|| lastRefreshAt.getTime() + getRefreshInterval().toMillis() < now.getTime();
}
}
final class CityTemperatureContextContainer extends ContextContainer<HashMap> {
CityTemperatureContextContainer(Date now) {
super(now);
}
void refresh(Date now) {
if (!isDueToRefresh(now)) {
return;
}
HashMap context = new HashMap();
// Connect to DB and get data and fill hashmap
lastRefreshAt = now;
this.context = context;
}
Duration getRefreshInterval() {
return Duration.ofMinutes(15);
}
}
это краткое понятие, написанное на SO textarea, может содержать некоторые синтаксические ошибки, но суть ясна, я надеюсь
затем передать его в процессор как .addProcessor(TemperatureAppender.NAME, () -> new TemperatureAppender(cityTemperatureContextContainer), SOURCE)
А в процессоре сделать
public void init(final ProcessorContext context) {
context.schedule(
Duration.ofMinutes(1),
PunctuationType.STREAM_TIME,
(timestamp) -> {
cityTemperatureContextContainer.refresh(new Date(timestamp));
tempHm = cityTemperatureContextContainer.get();
}
);
super.init(context);
}
Есть ли лучший способ?Главный вопрос в том, чтобы найти правильную концепцию, я могу ее реализовать.По этой теме не так много ресурсов.