Как правильно настроить входящий адаптер TCP с QueueChannel и ServiceActivator - PullRequest
2 голосов
/ 17 марта 2020

Я пытаюсь настроить сокет TCP, который получает данные в формате name,value в отдельных сообщениях. Эти сообщения приходят в среднем каждую секунду, иногда быстрее или медленнее.

Мне удалось настроить рабочую конфигурацию, но мне не хватает базового c понимания того, что на самом деле происходит в Spring Integration.

Мой файл конфигурации выглядит следующим образом:

@Configuration
@EnableIntegration
public class TCPSocketServerConfig
{
    @Bean
    public IntegrationFlow server(
        final CSVProcessingService csvProcessingService,
        @Value("${tcp.socket.server.port}") final int port
    )
    {
        return IntegrationFlows.from(
            Tcp.inboundAdapter(
                Tcp.nioServer(port)
                   .deserializer(serializer())
                   .leaveOpen(true)
            )
               .autoStartup(true)
               .outputChannel(queueChannel())
        ).transform(new ObjectToStringTransformer())
         .handle(csvProcessingService)
         .get();
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata defaultPoller()
    {
        return Pollers.fixedDelay(50, TimeUnit.MILLISECONDS).get();
    }

    @Bean
    public MessageChannel queueChannel()
    {
        return MessageChannels.queue("queue", 50).get();
    }

    @Bean
    public ByteArrayLfSerializer serializer()
    {
        final ByteArrayLfSerializer serializer = new ByteArrayLfSerializer();

        serializer.setMaxMessageSize(10240);

        return serializer;
    }
}

И CSVProcessingService выглядит так (сокращенно):

@Slf4j
@Service
public class CSVProcessingService
{
    @ServiceActivator
    public void process(final String message)
    {
        log.debug("DATA RECEIVED: \n" + message);
        final CsvMapper csvMapper = new CsvMapper();
        final CsvSchema csvSchema = csvMapper.schemaFor(CSVParameter.class);

        if (StringUtils.contains(message, StringUtils.LF))
        {
            processMultiLineInput(message, csvMapper, csvSchema);
        }
        else
        {
            processSingleLineInput(message, csvMapper, csvSchema);
        }
    }
}

Мои цели для этой конфигурации следующие:

  • получать сообщения на настроенном порт
  • выдерживает более высокую нагрузку без потери сообщений
  • десериализует сообщения
  • помещает их в канал очереди
  • (в идеале также регистрирует ошибки)
  • канал очереди опрашивается каждые 50 мс, и сообщение из канала очереди передается в ObjectToStringTransformer
  • после преобразователя, преобразованное сообщение передается в CSVProcessingService для дальнейшей обработки

Правильно ли я достиг всех этих целей или ошибся, потому что неправильно понял Spring Integration? Можно ли как-то объединить Poller и @ServiceActivator?

Кроме того, у меня возникла проблема с визуализацией того, как мой сконфигурированный IntegrationFlow на самом деле «течет», возможно, кто-то может помочь мне лучше понять это.

РЕДАКТИРОВАТЬ:

Я переделал свою конфигурацию после комментария Артемс. Теперь это выглядит так:

@Configuration
@EnableIntegration
public class TCPSocketServerConfig
{
    @Value("${tcp.socket.server.port}") int port;

    @Bean
    public IntegrationFlow server(
        final CSVProcessingService csvProcessingService
    )
    {
        return IntegrationFlows.from(
            Tcp.inboundAdapter(
                tcpNioServer()
            )
               .autoStartup(true)
               .errorChannel(errorChannel())
        )
         .transform(new ObjectToStringTransformer())
         .handle(csvProcessingService)
         .get();
    }

    @Bean
    public AbstractServerConnectionFactory tcpNioServer()
    {
        return Tcp.nioServer(port)
                  .deserializer(serializer())
                  .leaveOpen(true)
                  .taskExecutor(
                      new ThreadPoolExecutor(0, 20,
                                             30L, TimeUnit.SECONDS,
                                             new SynchronousQueue<>(),
                                             new DefaultThreadFactory("TCP-POOL"))
                  ).get();
    }

    @Bean
    public MessageChannel errorChannel()
    {
        return MessageChannels.direct("errors").get();
    }

    @Bean
    public IntegrationFlow errorHandling()
    {
        return IntegrationFlows.from(errorChannel()).log(LoggingHandler.Level.DEBUG).get();
    }

    @Bean
    public ByteArrayLfSerializer serializer()
    {
        final ByteArrayLfSerializer serializer = new ByteArrayLfSerializer();

        serializer.setMaxMessageSize(10240);

        return serializer;
    }
}

Я также удалил аннотацию @ServiceActivator из метода CSVProcessingService#process.

1 Ответ

3 голосов
/ 17 марта 2020

Не уверен, что вас смущает, но ваша конфигурация и логика c выглядят хорошо.

Вы можете упустить тот факт, что вам не нужен QueueChannel между, так как AbstractConnectionFactory.processNioSelections() уже многопоточный, и он планирует задачу для чтения сообщения из сокета. Итак, вам нужно только настроить соответствующий Executor для Tcp.nioServer(). Хотя это все равно Executors.newCachedThreadPool() по умолчанию.

С другой стороны, в оперативной памяти QueueChannel вы действительно можете потерять сообщения, потому что они уже прочитаны из сети.

Когда вы делаете Java DSL, вы должны рассмотреть возможность использования poller() опция на конечной точке. @Poller будет работать с @ServiceActivator, если у вас есть атрибут inputChannel, но использование его в handle() переопределит inputChannel, поэтому ваш @Poller не будет применен. Не путайте себя с микшированием Java DSL и конфигурацией аннотаций!

Все остальное хорошо в вашей конфигурации.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...