Как динамически обрабатывать потоки в кафке и отправлять в другую тему - PullRequest
2 голосов
/ 15 января 2020

Я создаю приложение для обработки потоков. Это должно создать соединение потока kafka. Когда приходит сообщение, я должен выполнить следующие обязательные действия:

  • проверить тип сообщения
  • обработать сообщение, вызвав службу specificTypeProcessing
  • в конце укажите c темы, которые определяются на основе типа сообщения

    public java.util.function.Consumer<KStream<String, String>> process() {
        String topic;
        return input ->
                input.map((key, value) -> {
                    //check type and ask object from factory
                    try {
                        JSONObject msg = Util.getObjectMapper().readValue(value, JSONObject.class);
                        String type = msg.get("type").toString();
                        if(type.equalsIgnoreCase("test")){
                            //processing started
                            msgTypeHandlerFactory
                                    .createInstance(type)
                                    .enrichAndRelay(msg);
                            System.out.println("IN");
                        }
                        else{
                            input.to("notStream");
                            System.out.println("OUT");
                        }
    
                    } catch (JsonProcessingException e) {
                        e.printStackTrace();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return KeyValue.pair(key, value);
    
                })
                .to("output_topic");
    
    }
    

    Проблема с приведенным выше кодом заключается в том, что я использую функцию карты, которая дает мне возможность использовать. Функция to () для отправки потока. Что я хочу, чтобы проверить каждое сообщение на тип и затем обработать, отправить в другой поток соответственно. Для этого я должен использовать функцию foreach, которая не дает мне функцию .to (), поэтому я должен создать другого Kafka Producer для выполнения этой работы.

Требование:

  1. Каждое сообщение должно быть обработано и отправлено перед обработкой следующего сообщения с помощью функции потока и без использования другого производителя кафки
  2. Если требование выполнено, я смогу отправить сообщение темы, которые будут решаться динамически в соответствии с типом.

Ответы [ 2 ]

2 голосов
/ 15 января 2020
  1. Каждое сообщение должно быть обработано и отправлено перед обработкой следующего сообщения с помощью функции потока и без использования другого производителя кафки

Это произойдет в любом случае по умолчанию.

Если требование одного будет выполнено, тогда я смогу отправлять сообщения по темам, которые будут определяться динамически в соответствии с типом.

Во-первых, чтобы упростить этап обработки. события в зависимости от их типа, посмотрите на branch(). Функция branch() позволяет предоставлять фиксированное количество предикатов для маршрутизации сообщений в различные подпотоки. Затем вы можете независимо обрабатывать эти подпотоки, например, с помощью функции map(). Наконец, вы можете затем отправить каждый подпоток в отдельный topi c с помощью to().

KStream<String, Event>[] branches = events.branch(
    (id, event) -> event.getTransactionValue() >= FRAUD_LIMIT,
    (id, event) -> event.getTransactionValue() < FRAUD_LIMIT);
branches[0].map(...).to(suspiciousTransactionsTopicName);
branches[1].map(...).to(validatedTransactionsTopicName);

Вы также можете принимать действительно динамические c решения о маршрутизации в to() на основе что бы ни было в полезной нагрузке события. Здесь имя выходных данных topi c получено из данных события.

myStream.to(
  (eventId, event, record) -> "topic-prefix-" + event.methodOfYourEventLikeGetTypeName()
);

Кроме того, если для решения о маршрутизации dynamici c требуется информация, которая недоступна непосредственно в событии, вы можете воспользоваться одним из вариантов. Нужно динамически обогатить исходное событие информацией, связанной с маршрутизацией (например, путем объединения потока исходных событий с таблицей с информацией, связанной с маршрутизацией), а затем выполнить динамическую c маршрутизацию через to(). Подробнее см. https://www.confluent.io/blog/putting-events-in-their-place-with-dynamic-routing/.

1 голос
/ 15 января 2020

Если вы хотите проверить типы, вы, по сути, filter указываете события, соответствующие этим типам.

Следовательно, вам не нужна карта или foreach, вам повезет больше с filter(...).to(topic}

    final ObjectMapper mapper = Util.getObjectMapper();
    KStream notTestEvents = input.filter((key, value) -> {
        //check type and ask object from factory
        try {
            JSONObject msg = mapper.readValue(value, JSONObject.class); // You should probably use JSONDeserializer instead, which does this for you
            String type = msg.get("type").toString();
            System.out.println("OUT");
            return !type.equalsIgnoreCase("test");     
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    );
    notTestEvents.to("notStream");

Другой вариант - ветвление

KStream<String, String>[] branches = events.branch(
    (k, v) -> { 
       return !mapper
          .readValue(value, JSONObject.class)
          .get("type").toString();
          .equalsIgnoreCase("test")
    },
    (k, v) -> true
);
branches[0].map(...).to("notStream");
branches[1].map(...).to("output_topic");
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...