как отправить json поток данных на несколько тем в kafka на основе полей ввода - PullRequest
1 голос
/ 02 марта 2020

Я должен потреблять json данных, поступающих в поток kafka и отправлять в разные темы (отличное сочетание идентификатора приложения и сущности) для дальнейшего использования.
topi c names:

    app1.entity1
    app1.entity2
    app2.entity1
    app2.entity2

Json Данные

    [
        {
            "appId": "app1",
            "entity": "entity1",
            "extractType": "txn",
            "status": "success",
            "fileId": "21151235"
        },
        {
            "appId": "app1",
            "entity": "entity2",
            "extractType": "txn",
            "status": "fail",
            "fileId": "2134234123"
        },
        {
            "appId": "app2",
            "entity": "entity3",
            "extractType": "payment",
            "status": "success",
            "fileId": "2312de23e"
        },
        {
            "appId": "app2",
            "entity": "entity3",
            "extractType": "txn",
            "status": "fail",
            "fileId": "asxs3434"
        }
    ]

TestInput. java

        private String appId;           
        private String entity ;             
        private String extractType;         
        private String status;          
        private String fileId; 

        setter/gtter

SpringBootConfig. java

      @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
      public KafkaStreamsConfiguration kStreamsConfigs(KafkaProperties kafkaProperties) {
          Map<String, Object> config = new HashMap<>();
          config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
          config.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getClientId());
          config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
          config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new JsonSerde<>(TestInput.class).getClass());
          config.put(JsonDeserializer.DEFAULT_KEY_TYPE, String.class);
          config.put(JsonDeserializer.DEFAULT_VALUE_TYPE, TestInput.class);
          return new KafkaStreamsConfiguration(config);
      }

      @Bean
      public KStream<String, TestInput> kStream(StreamsBuilder kStreamBuilder) {
          KStream<String, TestInput> stream = kStreamBuilder.stream(inputTopic);
                 // how to form key , group records and send to different topics
          return stream;
      }

Я много искал, но ничего не нашел рядом с которой публикует данные по темам динамически. Пожалуйста, помогите экспертам

1 Ответ

2 голосов
/ 02 марта 2020

Использование stream.branch()

См. https://www.confluent.io/blog/putting-events-in-their-place-with-dynamic-routing/

Далее давайте изменим требование. Вместо обработки всех событий в потоке каждый микросервис должен предпринимать действия только для подмножества соответствующих событий. Один из способов справиться с этим требованием - иметь микросервис, который подписывается на исходный поток со всеми событиями, проверяет каждую запись, а затем выполняет действия только с теми событиями, о которых он заботится, отбрасывая остальные. Однако в зависимости от приложения это может быть нежелательным или ресурсоемким.

Более чистым способом является предоставление службы отдельным потоком, который содержит только соответствующее подмножество событий, о которых заботится микросервис. Для достижения этого потоковое приложение может разветвлять исходный поток событий на разные подпотоки, используя метод KStream # branch (). Это приводит к новым темам Kafka, поэтому микросервис может напрямую подписаться на один из разветвленных потоков.

...

...