Я использую Spring Boot 2.0.2 и Spring Kafka.Также я использую Kafka Docker image 1.1.0 из следующего репозитория: https://hub.docker.com/r/wurstmeister/kafka/tags/
Это мои конфиги Kafka:
@Configuration
@EnableKafka
public class KafkaConfig {
}
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String consumerGroupId;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, (int) TimeUnit.MINUTES.toMillis(10));
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(String.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, Post> postConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(postConsumerFactory());
return factory;
}
}
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 15000000);
return props;
}
@Bean
public ProducerFactory<String, Post> postProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Post> postKafkaTemplate() {
return new KafkaTemplate<>(postProducerFactory());
}
}
это Kafka application.properties
:
#Kafka
spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=postfenix
kafka.topic.posts.create=posts.create
Это мой прослушиватель сообщений:
@Component
public class PostConsumer {
static final Logger logger = LoggerFactory.getLogger(PostConsumer.class);
@KafkaListener(topics = "${kafka.topic.posts.create}", containerFactory = "postKafkaListenerContainerFactory")
public void createPost(ConsumerRecord<String, Post> consumerRecord) {
Post post = consumerRecord.value();
logger.info("Received message for post creation: {}", post);
}
}
Я также реализовал PostService
, который должен отправить Post
в тему Кафки:
@Service
public class PostServiceImpl implements PostService {
static final Logger logger = LoggerFactory.getLogger(PostServiceImpl.class);
@Value("${kafka.topic.posts.create}")
private String kafkaTopicPostsCreate;
@Autowired
private KafkaTemplate<String, Post> postKafkaTemplate;
@Override
public void sendPost(Post post) {
postKafkaTemplate.send(kafkaTopicPostsCreate, post);
logger.info("Message sent to the post creation queue: {}", post);
}
}
У меня естьтакже реализован тест SpringBoot:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { TestApplication.class })
public class PostServiceIT {
@Autowired
private PostService postService;
@Autowired
private MessageRepository messageRepository;
@Before
public void setUp() {
messageRepository.deleteAll();
}
@Test
public void testCreatePost() throws InterruptedException {
assertEquals(0, messageRepository.findAll().size());
Post post = new Post();
...
postService.sendPost(post);
await().atMost(60, SECONDS).pollDelay(1000, MILLISECONDS).until(() -> messageRepository.findAll().size() == 1);
}
}
Это журнал:
2018-06-09 16:12:37.547 INFO 17824 --- [ main] org.quartz.impl.StdSchedulerFactory : Quartz scheduler 'schedulerFactoryBean' initialized from an externally provided properties instance.
2018-06-09 16:12:37.547 INFO 17824 --- [ main] org.quartz.impl.StdSchedulerFactory : Quartz scheduler version: 2.3.0
2018-06-09 16:12:37.548 INFO 17824 --- [ main] org.quartz.core.QuartzScheduler : JobFactory set to: org.springframework.scheduling.quartz.AdaptableJobFactory@7a3e5cd3
2018-06-09 16:12:38.967 INFO 17824 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483547
2018-06-09 16:12:38.997 INFO 17824 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [127.0.0.1:9093]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = postfenix
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 600000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer
2018-06-09 16:12:39.095 INFO 17824 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.1.0
2018-06-09 16:12:39.095 INFO 17824 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fdcf75ea326b8e07
2018-06-09 16:12:39.100 INFO 17824 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
2018-06-09 16:12:39.104 INFO 17824 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647
2018-06-09 16:12:39.104 INFO 17824 --- [ main] o.s.s.quartz.SchedulerFactoryBean : Starting Quartz Scheduler now
2018-06-09 16:12:39.104 INFO 17824 --- [ main] org.quartz.core.QuartzScheduler : Scheduler schedulerFactoryBean_$_NON_CLUSTERED started.
2018-06-09 16:12:39.111 INFO 17824 --- [SchedulerThread] c.n.quartz.mongodb.dao.TriggerDao : Found 0 triggers which are eligible to be run.
2018-06-09 16:12:39.119 INFO 17824 --- [ main] com.postfenix.domain.post.PostServiceIT : Started PostServiceIT in 5.094 seconds (JVM running for 5.74)
2018-06-09 16:12:39.121 INFO 17824 --- [ main] c.p.d.configuration.TestApplication : Initializing application...
2018-06-09 16:12:39.258 INFO 17824 --- [ main] org.mongodb.driver.connection : Opened connection [connectionId{localValue:4, serverValue:4}] to localhost:27018
2018-06-09 16:12:39.338 WARN 17824 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-1, groupId=postfenix] Error while fetching metadata with correlation id 2 : {posts.create=LEADER_NOT_AVAILABLE}
2018-06-09 16:12:39.339 INFO 17824 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : Cluster ID: BYqDmOq_SDCll0ILZI_KoA
2018-06-09 16:12:39.392 INFO 17824 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [127.0.0.1:9093]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 15000000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer
2018-06-09 16:12:39.419 INFO 17824 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.1.0
2018-06-09 16:12:39.419 INFO 17824 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fdcf75ea326b8e07
2018-06-09 16:12:39.437 WARN 17824 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {posts.create=LEADER_NOT_AVAILABLE}
2018-06-09 16:12:39.437 INFO 17824 --- [ad | producer-1] org.apache.kafka.clients.Metadata : Cluster ID: BYqDmOq_SDCll0ILZI_KoA
2018-06-09 16:12:39.454 WARN 17824 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-1, groupId=postfenix] Error while fetching metadata with correlation id 4 : {posts.create=LEADER_NOT_AVAILABLE}
2018-06-09 16:12:39.565 WARN 17824 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 3 : {posts.create=LEADER_NOT_AVAILABLE}
2018-06-09 16:12:39.590 WARN 17824 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-1, groupId=postfenix] Error while fetching metadata with correlation id 6 : {posts.create=LEADER_NOT_AVAILABLE}
2018-06-09 16:12:39.704 INFO 17824 --- [ main] c.p.domain.service.post.PostServiceImpl : Message sent to the post creation queue: Post [chatId=@name, parseMode=HTML]
2018-06-09 16:12:40.229 INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=postfenix] Discovered group coordinator 10.0.75.1:9093 (id: 2147482646 rack: null)
2018-06-09 16:12:40.232 INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=postfenix] Revoking previously assigned partitions []
2018-06-09 16:12:40.233 INFO 17824 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: []
2018-06-09 16:12:40.233 INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=postfenix] (Re-)joining group
2018-06-09 16:12:40.295 INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=postfenix] Successfully joined group with generation 1
2018-06-09 16:12:40.297 INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=postfenix] Setting newly assigned partitions [posts.create-0]
2018-06-09 16:12:40.313 INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-1, groupId=postfenix] Resetting offset for partition posts.create-0 to offset 1.
2018-06-09 16:12:40.315 INFO 17824 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [posts.create-0]
В данный момент мой тест не проходит по следующей строке:
await().atMost(60, SECONDS).pollDelay(1000, MILLISECONDS).until(() -> messageRepository.findAll().size() == 1);
, потому что послепри первом тестовом запуске сообщение по какой-то причине не доставляется методу PostConsumer.createPost
.Но если я проведу этот же тест второй раз на том же экземпляре док-станции Kafka, сообщение с предыдущего теста будет успешно доставлено в PostConsumer.createPost
.Что я делаю не так и почему сообщение не доставляется после первого запуска теста и как это исправить?
ОБНОВЛЕНО
Это мое обновленное KafkaConsumerConfig
:
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, Post> postConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(postConsumerFactory());
return factory;
}
}
Сейчас у меня есть 2 ошибки компиляции в kafkaListenerContainerFactory
и postConsumerFactory
методах, потому что метод consumerConfigs()
отсутствует, а метод consumerFactory
в kafkaListenerContainerFactory
требует KafkaProperties
.