Я пытался заставить входящий SubscribeableChannel и исходящий MessageChannel работать в моем приложении для весенней загрузки.
Я успешно настроил канал kafka и успешно его протестировал.
Кроме того, я создал базовое приложение для весенней загрузки, которое проверяет добавление и получение данных из канала.
Проблема, с которой я сталкиваюсь, заключается в том, что когда я помещаю эквивалентный код в приложение, к которому он принадлежит, кажется, что сообщения никогда не отправляются и не принимаются. Отладкой трудно определить, что происходит, но единственное, что мне кажется другим, это название канала. В рабочем значении имя канала похоже на application.channel в нерабочем приложении, его localhost: 8080 / channel.
Мне было интересно, есть ли какая-то конфигурация весенней загрузки, блокирующая или изменяющая создание каналов в другой источник канала?
У кого-нибудь были подобные проблемы?
application.yml
spring:
datasource:
url: jdbc:h2:mem:dpemail;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
platform: h2
username: hello
password:
driverClassName: org.h2.Driver
jpa:
properties:
hibernate:
show_sql: true
use_sql_comments: true
format_sql: true
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
bindings:
email-in:
destination: email
contentType: application/json
email-out:
destination: email
contentType: application/json
E-mail
public class Email {
private long timestamp;
private String message;
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
Binding Config
@EnableBinding(EmailQueues.class)
public class EmailQueueConfiguration {
}
Интерфейс
public interface EmailQueues {
String INPUT = "email-in";
String OUTPUT = "email-out";
@Input(INPUT)
SubscribableChannel inboundEmails();
@Output(OUTPUT)
MessageChannel outboundEmails();
}
Контроллер
@RestController
@RequestMapping("/queue")
public class EmailQueueController {
private EmailQueues emailQueues;
@Autowired
public EmailQueueController(EmailQueues emailQueues) {
this.emailQueues = emailQueues;
}
@RequestMapping(value = "sendEmail", method = POST)
@ResponseStatus(ACCEPTED)
public void sendToQueue() {
MessageChannel messageChannel = emailQueues.outboundEmails();
Email email = new Email();
email.setMessage("hello world: " + System.currentTimeMillis());
email.setTimestamp(System.currentTimeMillis());
messageChannel.send(MessageBuilder.withPayload(email).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());
}
@StreamListener(EmailQueues.INPUT)
public void handleEmail(@Payload Email email) {
System.out.println("received: " + email.getMessage());
}
}
Я не уверен, что один из унаследованных проектов конфигурации, использующих Spring-Cloud, Spring-Cloud-Sleuth может препятствовать его работе, но даже когда я удаляю его, он все еще не работает. Но в отличие от моего приложения, которое работает с приведенным выше кодом, я никогда не вижу настроенного ConsumeConfig, например:
o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 100
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = consumer-2
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
(Эта конфигурация - то, что я вижу в моем основном приложении Spring Boot при запуске приведенного выше кода, и код работает для записи и чтения из канала kafka) ....
Я предполагаю, что в одной из библиотек, которые я использую для создания канала другого типа, имеется некоторая конфигурация загрузки, которую я просто не могу найти, что это за конфигурация.