Правильный подход, как добавить контекст из внешнего источника в записи в Kafka Streams - PullRequest
1 голос
/ 08 мая 2019

У меня есть записи, которые обрабатываются с помощью Kafka Streams (с помощью Processor API).Допустим, запись имеет city_id и некоторые другие поля.

В приложении Kafka Streams я хочу добавить текущую температуру в целевом городе к записи.
Temperature<->City пары хранятся, например, в.Postgres.

В приложении Java я могу подключиться к Postgres с помощью JDBC и собрать new HashMap<CityId, Temperature>, поэтому я могу искать температуру на основе city_id.Что-то вроде tempHM.get(record.city_id).

Есть несколько вопросов, как лучше всего к нему подойти:

Где инициировать данные контекста?

Первоначально я делал это в AbstractProcessor::init() но это кажется неправильным, поскольку он инициализируется для каждого потока, а также повторно инициализируется при перебалансировке.

Поэтому я переместил его до того, как построитель топологии потоков и процессоры будут построены с ним.Данные извлекаются только один раз независимо для всех экземпляров процессора.

Это правильный и допустимый подход.Это работает, но ...

HashMap<CityId, Temperature> tempHM = new HashMap<CityId, Temperature>;

// Connect to DB and initialize tempHM here

Topology topology = new Topology();

topology
    .addSource(SOURCE, stringDerializer, protoDeserializer, "topic-in")

    .addProcessor(TemperatureAppender.NAME, () -> new TemperatureAppender(tempHm), SOURCE)

    .addSink(SINK, "topic-out", stringSerializer, protoSerializer, TemperatureAppender.NAME)
;

Как обновить данные контекста?

Я хотел бы, например, обновлять данные о температуре каждые 15 минут.Я думал об использовании контейнера Hashmap вместо Hashmap, который бы справился с этим:

abstract class ContextContainer<T> {

    T context;
    Date lastRefreshAt;

    ContextContainer(Date now) {
        refresh(now);
    }

    abstract void refresh(Date now);

    abstract Duration getRefreshInterval();

    T get() {
        return context;
    }

    boolean isDueToRefresh(Date now) {
        return lastRefreshAt == null
            || lastRefreshAt.getTime() + getRefreshInterval().toMillis() < now.getTime();
    }
}

final class CityTemperatureContextContainer extends ContextContainer<HashMap> {

    CityTemperatureContextContainer(Date now) {
        super(now);
    }

    void refresh(Date now) {
        if (!isDueToRefresh(now)) {
            return;
        }

        HashMap context = new HashMap();
        // Connect to DB and get data and fill hashmap

        lastRefreshAt = now;
        this.context = context;
    }

    Duration getRefreshInterval() {
        return Duration.ofMinutes(15);
    }
}

это краткое понятие, написанное на SO textarea, может содержать некоторые синтаксические ошибки, но суть ясна, я надеюсь

затем передать его в процессор как .addProcessor(TemperatureAppender.NAME, () -> new TemperatureAppender(cityTemperatureContextContainer), SOURCE)

А в процессоре сделать

    public void init(final ProcessorContext context) {
        context.schedule(
            Duration.ofMinutes(1),
            PunctuationType.STREAM_TIME,
            (timestamp) -> { 
                cityTemperatureContextContainer.refresh(new Date(timestamp));
                tempHm = cityTemperatureContextContainer.get();
            }    
        );

        super.init(context);
    }

Есть ли лучший способ?Главный вопрос в том, чтобы найти правильную концепцию, я могу ее реализовать.По этой теме не так много ресурсов.

1 Ответ

4 голосов
/ 08 мая 2019

В приложении Kafka Streams я хочу добавить текущую температуру в целевом городе к записи. Temperature<->City пары хранятся, например, в. Postgres.

В приложении Java я могу подключиться к Postgres, используя JDBC, и создать новый HashMap<CityId, Temperature>, поэтому я могу искать температуру на основе city_id. Что-то вроде tempHM.get(record.city_id).

Лучшей альтернативой было бы использование Kafka Connect для встраивания ваших данных из Postgres в тему Kafka, чтение этой темы в KTable в вашем приложении с помощью Kafka Streams, а затем объединение этой KTable с другим вашим потоком ( поток записей "с city_id и некоторыми другими полями"). То есть вы будете выполнять KStream -to- KTable соединение.

Подумайте:

### Architecture view

DB (here: Postgres) --Kafka Connect--> Kafka --> Kafka Streams Application


### Data view

Postgres Table ----------------------> Topic --> KTable

Примеры разъемов для вашего варианта использования: https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc и https://www.confluent.io/hub/debezium/debezium-connector-postgresql.

Одним из преимуществ описанной выше настройки на основе Kafka Connect является то, что вам больше не нужно общаться напрямую из вашего Java-приложения (которое использует Kafka Streams) с вашей базой данных Postgres.

Еще одним преимуществом является то, что вам не нужно выполнять «пакетное обновление» ваших контекстных данных (вы упоминали каждые 15 минут) из вашей БД в ваше Java-приложение, потому что приложение получит последние изменения БД в режиме реального времени. автоматически через поток DB-> KConnect-> Kafka-> KStreams-app.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...