Можно ли повторно получить GlobalKTable? - PullRequest
4 голосов
/ 17 апреля 2019

Я хочу перезаписать GlobalKTable (возможно, при его инициализации, поскольку я считаю, что они читаются только после создания).

Возможно ли это?

У меня есть две темы, с которыми я работаю в приложении Spring / Java Kafka Streams. Первое не уплотняется, второе есть. Оба используют Avro для своих ключей и значений.

Приложение выполняет потоковую передачу записей из первой (не уплотненной) темы и присоединяет дополнительные данные из уплотненной темы через KStream#leftJoin. Уплотненная тема была перенесена в приложение в виде GlobalKTable, созданной с помощью StreamsBuilder#globalTable() и должна оставаться такой (мне нужны все записи из всех разделов темы, доступных в каждом экземпляре приложения).

Я знаю, что говорят о поддержке соединений не первичного ключа (https://issues.apache.org/jira/browse/KAFKA-3705),, но, насколько мне известно, я пока не могу этого сделать ...

@Configuration
@EnableKafkaStreams
public class StreamsConfig {

  @Autowired
  private MyCustomSerdes serdes;

  @Bean
  public KStream<AvroKeyOne, AvroValueOne> reKeyJoin(StreamsBuilder streamsBuilder) {

    GlobalKTable<AvroKeyOne, AvroValueOne> globalTable = streamsBuilder.globalTable("topicOne", Consumed.with(
      serdes.getAvroKeyOne()
      serdes.getAvroValueOne()
    ));

    KStream<AvroKeyTwo, AvroValueOne> kStream = streamsBuilder.stream("topicTwo", Consumed.with(
      serdes.getAvroKeyTwo(),
      serdes.getAvroValueOne()
    ));

    kStream.join(
      globalTable,
       /**
        * the KeyValueMapper. I need to rekey the Global table as well to the
        * corresponding String (which it's data will have) if I want this join
        * to return results
        */
      (streamKey, streamValue) -> {return streamKey.getNewStringKey()},
      (/**ValueJoiner Deal**/)
    );
  }

}

1 Ответ

2 голосов
/ 17 апреля 2019

Я хочу перезаписать GlobalKTable (возможно, при его инициализации, так как я считаю, что они читаются только после создания).

Возможно ли это?

Прямой поддержки этому сегодня нет. Вы уже упоминали о предстоящей работе, такой как добавление поддержки в глобальные таблицы для соединений без первичного ключа , но это пока недоступно.

Что вы могли бы сделать сегодня: вы можете переназначить (переразбить) исходную тему Kafka в новую тему, а затем прочитать переназначенную тему в свой глобальный KTable. Может быть, это вариант для вас.

...