Какой должен быть лучший способ отфильтровать сообщение кафки - PullRequest
0 голосов
/ 27 марта 2020

Я использую данные из kafka topi c, который включает код города. Я должен фильтровать данные только для определенных кодов города. Может ли кто-нибудь предложить лучший подход для решения этой проблемы.

Вот мой код слушателя выглядит так. Рекомендуется ли анализировать данные в объект (как я сопоставил полезную нагрузку с объектом TEST) и фильтровать данные на основе значения, которое мне нужно отфильтровать, или Кафка предоставляет любые другие библиотеки, которые я могу использовать в процессе фильтрации ,

Метод прослушивания Kafka

@Service
public class Listener{

    @KafkaListener(topics = "#{@topicName}")
        public void listen(String payload) throws IOException {

            LOGGER.info("received payload from topic='{}'", payload);
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

            TEST test = objectMapper.readValue(payload,TEST.class);

        }
}

My Kafka Класс конфигурации:

@Configuration
public class Config {


    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, applicationConfiguration.getKafkaBootStrap());
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, applicationConfiguration.getKafkaKeyDeserializer());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, applicationConfiguration.getKafkaValueDeserializer());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, applicationConfiguration.getKafkaGroupId());
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, applicationConfiguration.getKafkaAutoOffsetReset());
        return properties;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }
    @Bean
    public Listener receiver() {
        return new Listener();
    }

}

Ответы [ 2 ]

1 голос
/ 27 марта 2020

См. Фильтрация сообщений .

Проект Spring для Apache Kafka также предоставляет некоторую помощь с помощью класса FilteringMessageListenerAdapter, который может обернуть ваш MessageListener. Этот класс принимает реализацию RecordFilterStrategy, в которой вы реализуете метод фильтра, чтобы сигнализировать, что сообщение является дубликатом и должно быть отброшено. У этого есть дополнительное свойство, называемое ackDiscarded, которое указывает, должен ли адаптер подтверждать удаленную запись. По умолчанию это значение false.

При использовании @KafkaListener установите RecordFilterStrategy (и, возможно, ackDiscarded) на фабрике контейнеров, чтобы слушатель был обернут в соответствующий адаптер фильтрации.

/**
 * Set the record filter strategy.
 * @param recordFilterStrategy the strategy.
 */
public void setRecordFilterStrategy(RecordFilterStrategy<? super K, ? super V> recordFilterStrategy) {
    this.recordFilterStrategy = recordFilterStrategy;
}
/**
 * Implementations of this interface can signal that a record about
 * to be delivered to a message listener should be discarded instead
 * of being delivered.
 *
 * @param <K> the key type.
 * @param <V> the value type.
 *
 * @author Gary Russell
 *
 */
public interface RecordFilterStrategy<K, V> {

    /**
     * Return true if the record should be discarded.
     * @param consumerRecord the record.
     * @return true to discard.
     */
    boolean filter(ConsumerRecord<K, V> consumerRecord);

}
1 голос
/ 27 марта 2020

То, что вы сделали, в порядке.
Если ваша полезная нагрузка содержит много данных, кроме кода области, и вы беспокоитесь о долгом разборе, вы можете отфильтровать сообщения перед выполнением всего анализа в объекте TEST, добавив код области как заголовок

В более поздних версиях Kafka (после 0.11) предлагаются пользовательские заголовки ( KIP-82 )

Если вы хотите реализовать это самостоятельно (или если вы используете более старую версию Kafka), вы можете добавить заголовок к полезной нагрузке вашего сообщения, скажем, в качестве первых 4 байтов сообщения, они будут представлять код города и могут быть извлечены очень быстро до процесса синтаксического анализа.
Полезная нагрузка нового сообщения:

([header-4-bytes],[original-payload-n-bytes])

Так что сделайте свой фильтр на основе заголовка, и если вы обнаружите, что это код города, который вам нужен, создайте свой объект TEST на основе остальной части сообщения (обрежьте первые 4 байта для удаления заголовка).

Кафка не предоставляет каких-либо опций фильтрации, которые могли бы вам помочь, хотя он может отправлять сообщения с ключами в вашем источнике, поэтому, если ваш ключ является кодом города, Кафка гарантирует, что все сообщения с одинаковыми кодами города отправляются к тому же разделу, возможно, может помочь для вашей производительности, если используется правильно.
Производитель также может отправлять сообщения в определенные c разделы, поэтому, если вы знали, что у вас есть фиксированные номера кодов городов, вы также можете определить topi c с номером раздела, равным количеству уникальных кодов зоны, и отправьте каждый код зоны в другой раздел, затем используйте своего Потребителя для доступа только к тем разделам с кодами зоны, которые вы ищете, но в большинстве случаев может быть излишним.

...