Kafka-Streams Присоединяйтесь к 2 темам со значениями JSON |механизм противодавления? - PullRequest
0 голосов
/ 26 мая 2018

Я изучаю Kafka Streams и пытаюсь достичь следующего:

Создано 2 темы Kafka (скажем topic1, topic2) с нулевым ключом и JSONString в качестве значения.Данные из theme1 (без дубликатов) содержат несколько совпадающих записей в topic2.Т.е. topic1 имеет некоторые основные данные для генерации нового множественного потока данных при объединении с topic2.

Пример:

topic1={"name": "abc", "age":2}, {"name": "xyz", "age":3} and so on.
topic2={"name": "abc", "address"="xxxxxx"}, {"name": "abc", "address"="yyyyyy"}, {"name": "xyz", "address"="jjjjjj"}, {"name": "xyz", "address"="xxxkkkkk"}

Ожидаемый результат: {"name": "abc", "age":2, "address"="xxxxxx"}, {"name": "abc", "age":2, "address"="yyyyyy"}, {"name": "xyz", "age":3, "address"="jjjjjj"}, {"name": "xyz", "age":3, "address"="xxxkkkkk"}

Хотелось бысохранить / сохранить поток данных из topic1 для будущих ссылок, в то время как поток данных из topic2 просто используется для достижения вышеупомянутого варианта использования и не требует никакого сохранения / удержания.

У меня есть несколько вопросов: 1) Следует ли хранить / хранить поток данных topic1 в течение нескольких дней (возможно?), Чтобы можно было объединить входящий поток данных из topic2.Является ли это возможным?2) Что я должен использовать для достижения этого, KStream или KTable?3) Это называется механизмом противодавления?

Поддерживает ли Kafka Stream этот сценарий использования или мне нужно искать что-то еще?Пожалуйста, предложите.

Я пробовал часть кода с KStream с окном 5 минут, но похоже, что я не могу держать данные topic1 в потоке.

Пожалуйста, помогите мне с правильным выбором и присоединяйтесь.Я использую Kafka из экземпляра Confluent with Docker.

public void run() {
        final StreamsBuilder builder = new StreamsBuilder();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(new JsonSerializer(), new JsonDeserializer());
        final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);

        // Hold data from this topic to 30 days
        KStream<String, JsonNode> cs = builder.stream("topic1", consumed);
        cs.foreach((k,v) -> {
            System.out.println( k + " --->" + v);
        });

        // Data is involved in one time process.
        KStream<String, JsonNode> css = builder.stream("topic2", consumed);
        css.foreach((k,v) -> {
            System.out.println( k + " --->" + v);
        });

        KStream<String, JsonNode> resultStream = cs.leftJoin(css,
                valueJoiner,
                JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
                Joined.with(
                        Serdes.String(), /* key */
                        jsonSerde,       /* left value */
                        jsonSerde)       /* right value */
        );

        resultStream.foreach((k, v) -> {
            System.out.println("JOIN STREAM: KEY="+k+ ", VALUE=" + v);
        });

        KafkaStreams streams = new KafkaStreams(builder.build(), properties);
        streams.start();
    }

1 Ответ

0 голосов
/ 26 мая 2018

Объединения в Кафке всегда основаны на ключах.Таким образом, чтобы любое соединение работало, вам нужно извлечь поля, к которым вы хотите присоединиться, в ключ перед тем, как вы выполните фактическое соединение (единственное частичное исключение - это KStream-GlobalKTable join).В вашем примере кода вы не получите никаких результатов, потому что все записи имеют ключ null и по этой причине не могут быть объединены.

Для самого объединения кажется, что соединение KStream-KTable будетправильный выбор для вашего варианта использования.Чтобы это работало, вам нужно:

  1. правильно установить ключ соединения для topic1 и записать данные в дополнительную тему (назовем ее topic1Keyed)
  2. читать topic1Keyed как таблицу
  3. установить ключ соединения правильно для topic2
  4. соединение topic2 с KTable

Для получения полной информации оприсоединяйтесь к семантике, ознакомьтесь с этой записью в блоге: https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...