Я пытаюсь настроить сокет TCP, который получает данные в формате name,value
в отдельных сообщениях. Эти сообщения приходят в среднем каждую секунду, иногда быстрее или медленнее.
Мне удалось настроить рабочую конфигурацию, но мне не хватает базового c понимания того, что на самом деле происходит в Spring Integration.
Мой файл конфигурации выглядит следующим образом:
@Configuration
@EnableIntegration
public class TCPSocketServerConfig
{
@Bean
public IntegrationFlow server(
final CSVProcessingService csvProcessingService,
@Value("${tcp.socket.server.port}") final int port
)
{
return IntegrationFlows.from(
Tcp.inboundAdapter(
Tcp.nioServer(port)
.deserializer(serializer())
.leaveOpen(true)
)
.autoStartup(true)
.outputChannel(queueChannel())
).transform(new ObjectToStringTransformer())
.handle(csvProcessingService)
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller()
{
return Pollers.fixedDelay(50, TimeUnit.MILLISECONDS).get();
}
@Bean
public MessageChannel queueChannel()
{
return MessageChannels.queue("queue", 50).get();
}
@Bean
public ByteArrayLfSerializer serializer()
{
final ByteArrayLfSerializer serializer = new ByteArrayLfSerializer();
serializer.setMaxMessageSize(10240);
return serializer;
}
}
И CSVProcessingService
выглядит так (сокращенно):
@Slf4j
@Service
public class CSVProcessingService
{
@ServiceActivator
public void process(final String message)
{
log.debug("DATA RECEIVED: \n" + message);
final CsvMapper csvMapper = new CsvMapper();
final CsvSchema csvSchema = csvMapper.schemaFor(CSVParameter.class);
if (StringUtils.contains(message, StringUtils.LF))
{
processMultiLineInput(message, csvMapper, csvSchema);
}
else
{
processSingleLineInput(message, csvMapper, csvSchema);
}
}
}
Мои цели для этой конфигурации следующие:
- получать сообщения на настроенном порт
- выдерживает более высокую нагрузку без потери сообщений
- десериализует сообщения
- помещает их в канал очереди
- (в идеале также регистрирует ошибки)
- канал очереди опрашивается каждые 50 мс, и сообщение из канала очереди передается в
ObjectToStringTransformer
- после преобразователя, преобразованное сообщение передается в
CSVProcessingService
для дальнейшей обработки
Правильно ли я достиг всех этих целей или ошибся, потому что неправильно понял Spring Integration? Можно ли как-то объединить Poller
и @ServiceActivator
?
Кроме того, у меня возникла проблема с визуализацией того, как мой сконфигурированный IntegrationFlow на самом деле «течет», возможно, кто-то может помочь мне лучше понять это.
РЕДАКТИРОВАТЬ:
Я переделал свою конфигурацию после комментария Артемс. Теперь это выглядит так:
@Configuration
@EnableIntegration
public class TCPSocketServerConfig
{
@Value("${tcp.socket.server.port}") int port;
@Bean
public IntegrationFlow server(
final CSVProcessingService csvProcessingService
)
{
return IntegrationFlows.from(
Tcp.inboundAdapter(
tcpNioServer()
)
.autoStartup(true)
.errorChannel(errorChannel())
)
.transform(new ObjectToStringTransformer())
.handle(csvProcessingService)
.get();
}
@Bean
public AbstractServerConnectionFactory tcpNioServer()
{
return Tcp.nioServer(port)
.deserializer(serializer())
.leaveOpen(true)
.taskExecutor(
new ThreadPoolExecutor(0, 20,
30L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new DefaultThreadFactory("TCP-POOL"))
).get();
}
@Bean
public MessageChannel errorChannel()
{
return MessageChannels.direct("errors").get();
}
@Bean
public IntegrationFlow errorHandling()
{
return IntegrationFlows.from(errorChannel()).log(LoggingHandler.Level.DEBUG).get();
}
@Bean
public ByteArrayLfSerializer serializer()
{
final ByteArrayLfSerializer serializer = new ByteArrayLfSerializer();
serializer.setMaxMessageSize(10240);
return serializer;
}
}
Я также удалил аннотацию @ServiceActivator
из метода CSVProcessingService#process
.