Повторяющиеся сообщения, опубликованные на Kafka Topi c Использование Spring-boot - PullRequest
0 голосов
/ 17 марта 2020

Я очень новичок в Springboot и Kafka. Работая над школьным заданием, где при использовании приложения Springboot нам нужно опубликовать sh Json данные на топике Кафки c. Мой файл публикации. java выглядит следующим образом:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.CMPE172.kafka.springbootkafkaproducerexample.model.User;

@RestController
@RequestMapping("kafka")
public class UserResource {
    @Autowired
    private KafkaTemplate<String, User> kafkaTemplate;
    private final static String TOPIC = "Kafka";

    @GetMapping("/publish/{name}")
    public String Post(@PathVariable("name") final String name) {

        kafkaTemplate.send(TOPIC, new User(name, "Technology", 12000L));
        return "Published successfully";
   }
}

Где пользователь - это обычный класс Java с конструктором, геттером и сеттером. Мой файл конфигурации выглядит следующим образом:

import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import com.CMPE172.kafka.springbootkafkaproducerexample.model.User;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Map;
import java.util.HashMap;

@Configuration
public class KafkaConfiguration {

    @Bean
    public ProducerFactory<String, User> produceFactory() {

        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG ,  "127.0.0.1:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG ,  StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG ,  JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
   public KafkaTemplate<String, User> kafkaTemplate() {
        return new KafkaTemplate<>(produceFactory());
    }

}

После успешного запуска серверов zookeeper и Kafka я создаю новый topi c, используя следующую команду:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions    
1 --topic Kafka

I затем запустите потребителя с помощью следующей команды:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Kafka --from-  
beginning

Затем я могу успешно опубликовать sh Json сообщений / объектов в топи c, просто перейдя на localhost: 8081 / kafka / publish / Адам (в данном случае публикуется Json объект с именем Адам, отдел "Технология" и зарплата 12000)

Проблема / ошибка: каждый раз, когда я публикую sh новое имя, имя, которое я опубликовал ранее отображается во второй раз. Например, если я go на localhost: 8081 / kafka / publish / Jim Json, файл, содержащий Адама, публикуется снова вместе с Джимом. Кроме того, если я перезагружаю все серверы, публикация новых данных Json, по-видимому, вызывает публикации имен, которые были опубликованы ранее до перезапуска сервера.

Короче говоря, просматривая онлайн-видео, каждая публикация должна публиковаться только 1 имя за раз и ничего больше. В моем случае, однако, я получаю повторяющиеся значения, публикуемые с каждой новой публикацией. Может ли кто-нибудь, пожалуйста, указать мне в правильном направлении? Вся помощь будет принята с благодарностью. Заранее спасибо!

1 Ответ

0 голосов
/ 17 марта 2020

Вы добавляете в журнал. Каждый запрос создает новое сообщение, а не перезаписывает старые.

Тот факт, что у вас есть --from-beginning, означает, что вы всегда будете сканировать каждое сообщение с топиками c с самого начала, а не ждать новых поступающие сообщения.

Если вы действительно считаете, что дублирующиеся сообщения публикуются, вы можете использовать OffsetShell для отображения последних смещений topi c

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...