Разница между Kafka Consumer и Spark-Kafka-Consumer - PullRequest
1 голос
/ 23 апреля 2019

У меня есть тема kafka, по которой я отправляю данные через Kafka Producer. Теперь на стороне потребителя есть два варианта со мной.

1. Использование KafkaConsumer - ниже приведен код kafkaConsumer, который считывает данные из темы и работает нормально.

  @EnableKafka
@Configuration
@PropertySource("kaafka.properties")
public class RawEventKafkaConsumer {

    private static final Logger logger = LoggerFactory.getLogger(RawEventKafkaConsumer.class);
    @Autowired
    private DataModelServiceImpl dataModelServiceImpl;

    private PolicyExecutor policyExecutor;

    public RawEventKafkaConsumer() {
         policyExecutor = new PolicyExecutor();
    }


    @Value("${spring.kafka.topic}")
    private String rawEventTopicName;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootStrapServer;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;

    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;

    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keyDeserializer;

    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueDeserializer;



    @Bean
    public DefaultKafkaConsumerFactory<String, BaseDataModel> rawEventConsumer() {
        Map<String, Object> consumerProperties = new HashMap<>();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        return new DefaultKafkaConsumerFactory<>(consumerProperties);
    }

    @Bean(name="kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, BaseDataModel> kafkaListenerContainerFactory() {
         logger.info("kafkaListenerContainerFactory called..");
        ConcurrentKafkaListenerContainerFactory<String, BaseDataModel> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(rawEventConsumer());
        return factory;
    }

    @KafkaListener(topics = "rawEventTopic",  containerFactory = "kafkaListenerContainerFactory")
    public void listen(String baseDataModel) {

        ObjectMapper mapper = new ObjectMapper();
        BaseDataModel csvDataModel;
        try {
            csvDataModel = mapper.readValue(baseDataModel, BaseDataModel.class);

            //saving the datamodel in elastic search.
            //dataModelServiceImpl.save(csvDataModel);
            System.out.println("Message received " + csvDataModel.toString());
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

}

2. Использовать kafkaTopic данные с помощью Spark Stream - код ниже -

 @Service
    public class RawEventSparkStreamConsumer {

        private final Logger logger = LoggerFactory.getLogger(RawEventSparkStreamConsumer.class);

        @Autowired
        private DataModelServiceImpl dataModelServiceImpl;


        @Autowired
        private JavaStreamingContext streamingContext;

        @Autowired
        private JavaInputDStream<ConsumerRecord<String, String>> messages;


        @PostConstruct
        private void sparkRawEventConsumer() {

            ExecutorService executor = Executors.newSingleThreadExecutor();
            executor.execute(()->{
                messages.foreachRDD((rdd) -> {
                    System.out.println("RDD coming *************************______________________________---------------------.." + rdd.count());
                    rdd.foreach(record -> {
                        System.out.println("Data is comming...." + record);
                    });
                });

                streamingContext.start();

                try {
                    streamingContext.awaitTermination();
                } catch (InterruptedException e) { // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            });

        }
    }

И потребительский кафка, и потоковый Spark считывают данные из темы успешно. Теперь у меня вопрос: если оба делают одно и то же (читают данные из темы), тогда

  1. В чем разница между ними?
  2. Также я столкнулся с еще одной проблемой: оба класса, которые используют kafka и Spark, находятся в одной и той же базе кода, поэтому, если я использую оба, тогда код kafkaConsumer не работает.

Спасибо.

1 Ответ

2 голосов
/ 23 апреля 2019

Короткий ответ: вам требуется кластер Spark для распределенного запуска кода Spark по сравнению с тем, что Kafka Consumer просто запускается в одной JVM, и вы запускаете несколько экземпляров одного и того же приложения вручную для его масштабирования.

Другими словами, вы бы запустили их по-другому. spark-submit против java -jar. Я не верю, что с помощью весенних изменений

Другое отличие состоит в том, что «простой потребитель» имеет больше контроля над конфигурациями Kafka, и вы получаете одну запись за раз. В Spark RDD может быть много событий, и все они должны иметь одну и ту же «схему», если только вам не нужна сложная логика синтаксического анализа, которую труднее писать с объектами RDD, чем со значениями ConsumerRecord, которые извлекаются для вас.


В общем, я не думаю, что это хорошая идея объединять их.

И если они читают из одной и той же темы, то протокол Kafka Consumer может назначать только одного потребителя на раздел ... Не ясно, сколько разделов в вашей теме, но это может объяснить, почему один будет работать, но не другой

...