Я очень новичок в Springboot и Kafka. Работая над школьным заданием, где при использовании приложения Springboot нам нужно опубликовать sh Json данные на топике Кафки c. Мой файл публикации. java выглядит следующим образом:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.CMPE172.kafka.springbootkafkaproducerexample.model.User;
@RestController
@RequestMapping("kafka")
public class UserResource {
@Autowired
private KafkaTemplate<String, User> kafkaTemplate;
private final static String TOPIC = "Kafka";
@GetMapping("/publish/{name}")
public String Post(@PathVariable("name") final String name) {
kafkaTemplate.send(TOPIC, new User(name, "Technology", 12000L));
return "Published successfully";
}
}
Где пользователь - это обычный класс Java с конструктором, геттером и сеттером. Мой файл конфигурации выглядит следующим образом:
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import com.CMPE172.kafka.springbootkafkaproducerexample.model.User;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Map;
import java.util.HashMap;
@Configuration
public class KafkaConfiguration {
@Bean
public ProducerFactory<String, User> produceFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG , "127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, User> kafkaTemplate() {
return new KafkaTemplate<>(produceFactory());
}
}
После успешного запуска серверов zookeeper и Kafka я создаю новый topi c, используя следующую команду:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions
1 --topic Kafka
I затем запустите потребителя с помощью следующей команды:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Kafka --from-
beginning
Затем я могу успешно опубликовать sh Json сообщений / объектов в топи c, просто перейдя на localhost: 8081 / kafka / publish / Адам (в данном случае публикуется Json объект с именем Адам, отдел "Технология" и зарплата 12000)
Проблема / ошибка: каждый раз, когда я публикую sh новое имя, имя, которое я опубликовал ранее отображается во второй раз. Например, если я go на localhost: 8081 / kafka / publish / Jim Json, файл, содержащий Адама, публикуется снова вместе с Джимом. Кроме того, если я перезагружаю все серверы, публикация новых данных Json, по-видимому, вызывает публикации имен, которые были опубликованы ранее до перезапуска сервера.
Короче говоря, просматривая онлайн-видео, каждая публикация должна публиковаться только 1 имя за раз и ничего больше. В моем случае, однако, я получаю повторяющиеся значения, публикуемые с каждой новой публикацией. Может ли кто-нибудь, пожалуйста, указать мне в правильном направлении? Вся помощь будет принята с благодарностью. Заранее спасибо!