Потоки Kafka: чтение из ВСЕХ разделов в каждом экземпляре приложения - PullRequest
0 голосов
/ 11 декабря 2018

При использовании KTable потоки Kafka не позволяют экземплярам читать из нескольких разделов определенной темы, когда количество экземпляров / потребителей равно количеству разделов.Я пытался добиться этого с помощью GlobalKTable, проблема в том, что данные будут перезаписаны, к ним также нельзя применить агрегирование.

Давайте предположим, что у меня есть тема с именем «data_in» с 3 разделами (P1, P2,Р3).Когда я запускаю 3 экземпляра (I1, I2, I3) потокового приложения Kafka, я хочу, чтобы каждый экземпляр считывал данные со всех разделов «data_in».Я имею в виду, что I1 может читать из P1, P2 и P3, I2 может читать из P1, P2 и P3, I2 и так далее.

РЕДАКТИРОВАТЬ: Имейте в виду, что производитель может опубликовать двапохожие идентификаторы на два разных раздела в data_in.Поэтому при запуске двух разных экземпляров GlobalKtable будет перезаписан.

Пожалуйста, как этого добиться?Это часть моего кода

private KTable<String, theDataList> globalStream() {

    // KStream of records from data-in topic using String and theDataSerde deserializers
    KStream<String, Data> trashStream = getBuilder().stream("data_in",Consumed.with(Serdes.String(), SerDes.theDataSerde));

    // Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
    KGroupedStream<String, Data> KGS = trashStream.groupByKey();

    Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
    materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);

    // Return a KTable
    return KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
        if (!value.getValideData())
            aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
        else
            aggregate.getList().add(value);
        return aggregate;
    }, materialized);
}

1 Ответ

0 голосов
/ 11 декабря 2018

Либо измените количество разделов входной темы «data_in» на 1 раздел, либо используйте GlobalKtable для получения данных со всех разделов в теме, а затем вы можете присоединиться к нему в своем потоке.При этом экземпляры ваших приложений больше не должны находиться в другой группе потребителей.

Код будет выглядеть следующим образом:

private GlobalKTable<String, theDataList> globalStream() {

   // KStream of records from data-in topic using String and theDataSerde deserializers
  KStream<String, Data> trashStream = getBuilder().stream("data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));

  thrashStream.to("new_data_in"); // by sending to an other topic you're forcing a repartition on that topic

  KStream<String, Data> newTrashStream = getBuilder().stream("new_data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));

  // Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
  KGroupedStream<String, Data> KGS = newTrashStream.groupByKey();

  Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
  materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);

// Return a KTable
  KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
      if (!value.getValideData())
          aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
      else
        aggregate.getList().add(value);
      return aggregate;
  }, materialized)
  .to("agg_data_in");

  return getBuilder().globalTable("agg_data_in");
}

РЕДАКТИРОВАТЬ: я отредактировал код выше, чтобы принудительно перераспределить натема под названием "new_data_in".

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...