как читать сообщения от кафки топи c на основе ответа из другой темы - PullRequest
2 голосов
/ 06 февраля 2020

У меня есть две темы в Кафке: метаданные и мастер-данные. И я читаю данные в режиме реального времени, используя Kafka Listners. Есть 2 сценария ios:

  1. Я должен сначала прочитать метаданные topi c, а затем MasterData topi c.
  2. Возможно также, что нет новых Сообщение приходит в MetaData topi c, но сообщение добавляется в MasterData topi c. В таком случае потребление от MasterData topi c должно go впереди.

Ant подсказывает, как этого добиться ??

1 Ответ

1 голос
/ 06 февраля 2020

После разъяснений в комментариях я понимаю, чего вы хотите достичь:

У вас есть система, похожая на базу данных, и метаданные из kafka topi c - это используется для настройки структуры таблицы. Затем фактические данные поступают в мастер kafka topi c, который вы затем хотите вставить в таблицу. Другими словами: вы хотите определить структуру ваших данных с помощью kafka.

Я не уверен, если это то, что я бы назвал обычным вариантом использования для Kafka, так как он предназначен для интерфейс для обмена событиями, иначе данные. То, что вы пытаетесь сделать, это использовать систему как средство определения структуры данных. Но это только мое мнение.

Как говорилось ранее в комментариях, практически невозможно сказать, что в одной теме не будет сообщения c. Можно только сказать: сообщения «пока» нет. Вероятно, вы могли бы сделать следующее: когда приходит сообщение в master topi c, вы сначала проверяете meta topi c, если оно тоже есть. Если они есть, вы применяете изменения в структуре данных, а затем импортируете сообщение из главной страницы c.

Другой вариант - использовать потоки Kafka вместо необработанных потребителей для объединения двух тем: с join например.

В любом случае, для определения структуры данных обычно используется что-то вроде Avro, которое дает вам эволюцию схемы и многое другое. Затем напишите свое приложение, чтобы быть в курсе изменений схемы, и соответственно примените их к таблицам вашей базы данных.

ОБНОВЛЕНИЕ: Как решить с помощью Kafka Streams

Как упоминалось выше, использование Kafka Streams было бы возможным решением для вашего случая. Позвольте мне объяснить, что я имею в виду в некоторых потоках псевдо-кафки:

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.GlobalKTable;

import java.nio.charset.StandardCharsets;

/**
 * Just an incomplete Kafka Streams Code Demo to show how the problem could
 * be solved with this framework instead of using the consumers directly.
 * Kafka Streams Boilerplate code not included.
 */
public class Example {
    private static final String META_TOPIC = "meta";
    private static final String MASTER_TOPIC = "master";

    // You have to make sure, the meta data is stored under this
    // specific key in the meta topic.
    private static final byte[] META_KEY = MetaEvent.class.getName().getBytes(StandardCharsets.UTF_8);

    public static void main(String[] args) {
        new Example().createTopology();
    }

    public void createTopology() {

        final StreamsBuilder builder = new StreamsBuilder();

        final GlobalKTable<byte[], MetaEvent> metaTable = builder.globalTable(META_TOPIC);

        builder.<byte[], MasterEvent>stream(MASTER_TOPIC)
                .leftJoin(
                        metaTable,
                        (k, v) -> META_KEY,
                        MasterWithMeta::new)
                .foreach(this::handleEvent);
    }

    private void handleEvent(byte[] key, MasterWithMeta masterWithMeta) {
        // 1) check if meta has changed, if so, apply changes to database
        // 2) import master data to database
    }
}

class MasterWithMeta {
    private final MasterEvent master;
    private final MetaEvent meta;

    public MasterEvent getMaster() {
        return master;
    }

    public MetaEvent getMeta() {
        return meta;
    }

    public MasterWithMeta(MasterEvent master, MetaEvent meta) {
        this.master = master;
        this.meta = meta;
    }

    public static MasterWithMeta create(MasterEvent master, MetaEvent meta) {
        return new MasterWithMeta(master, meta);
    }
}

class MetaEvent {
    // ...
}

class MasterEvent {
    // ...
}

...