Как обрабатывать многострочные строковые сообщения с помощью @ServiceActivator в Spring-Boot - PullRequest
0 голосов
/ 15 февраля 2019

Я создаю приложение с весенней загрузкой для обработки сообщений от устройств регистрации, отправляемых через TCP-соединение.Я нашел элегантное решение для обработки однострочных сообщений, однако я пытаюсь заставить его работать с 3-строчными сообщениями, отправляемыми устройством.

Я пытался использовать @Aggregator, @ReleaseStrategy и @AggregationStrategyдля накопления сообщений, приходящих с того же ip, но @ServiceActivator по-прежнему получает отдельные строки.

@Aggregator
public List<String> aggregatingMethod(Message<?> message) {
    List<String> result = new ArrayList<>();
    String line = message.getPayload().toString();
    result.add(line);
    return result;
}

@ReleaseStrategy
public boolean releaseChecker(List<String> messages) {
  return messages.size() == 3;
}

@CorrelationStrategy
public String correlateBy(Message<?> message) {
  return message.getHeaders().get("ip_address").toString();
}

@ServiceActivator(inputChannel = "serviceChannel")
public void service(List<String> in) {
    System.out.println("Message");
    for(String line: in){           
        System.out.println(line.toString());
    }       
    String clientData = in.get(0) + in.get(1);
if (clientData != null && !clientData.isEmpty() && clientData.length() 
        > 30 && clientData.charAt(0) == '#') {
    // send the message to RabbitMQ
    log.info("Received <" + clientData + ">");
    messageService.sendMessage(clientData);
    log.info("Added to the queue " + 
        SpringBootRabbitMQApplication.ELD_MESSAGE_QUEUE);
} else {
    log.error("Message is null or in wrong format format: " + 
        clientData);
    // reader.close();
}
}

Это пример, который я нашел для однострочного пакета сообщений com.example;

import java.net.Socket;

import javax.net.SocketFactory;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter;
import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory;
import org.springframework.integration.transformer.ObjectToStringTransformer;
import org.springframework.messaging.MessageChannel;

@SpringBootApplication
public class So39290834Application {

public static void main(String[] args) throws Exception {
    ConfigurableApplicationContext context = SpringApplication.run(So39290834Application.class, args);
    Socket socket = SocketFactory.getDefault().createSocket("localhost", 9999);
    socket.getOutputStream().write("foo\r\n".getBytes());
    socket.close();
    Thread.sleep(1000);
    context.close();
}

@Bean
public TcpNetServerConnectionFactory cf() {
    return new TcpNetServerConnectionFactory(9999);
}

@Bean
public TcpReceivingChannelAdapter inbound(AbstractServerConnectionFactory cf) {
    TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
    adapter.setConnectionFactory(cf);
    adapter.setOutputChannel(tcpIn());
    return adapter;
}

@Bean
public MessageChannel tcpIn() {
    return new DirectChannel();
}

@Transformer(inputChannel = "tcpIn", outputChannel = "serviceChannel")
@Bean
public ObjectToStringTransformer transformer() {
    return new ObjectToStringTransformer();
}

@ServiceActivator(inputChannel = "serviceChannel")
public void service(String in) {
    System.out.println(in);
}

}

Сейчас я использую ThreadPoolExecutor и обрабатываю tcp-соединение старомодным способом:

public static void main(String[] args) throws IOException {
    SpringApplication.run(SpringBootRabbitMQApplication.class, args);
    // create the socket server object
    server = new ServerSocket(port);

    while (true) {
        try {
            // socket object to receive incoming client requests
            final Socket s = server.accept();
            taskExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    boolean messageIsValid = true;
                    while (messageIsValid) {
                        try {
                            // obtaining input and out streams
                            BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));

                            String clientData = "";
                            // read message string
                            clientData = reader.readLine();
                            clientData += reader.readLine();
                            reader.readLine();
                            log.info("Read message from port: " + clientData);

                            if (clientData != null && !clientData.isEmpty() && clientData.length() > 30
                                    && clientData.charAt(0) == '#') {
                                // send the message to RabbitMQ
                                messageService.sendMessage(clientData);
                            } else {
                                log.error("Message is null or in wrong format format: " + clientData + "; Client address: " + s.getInetAddress());
                                messageIsValid = false;
                            }
                        } catch (IOException e) {
                            log.error("Couldn't read the message from socket");
                            e.printStackTrace();
                            messageIsValid = false;
                        }
                    }                       
                }                   
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

И он работает, как и ожидалось, но я хотел бы заставить его работать, используя@ ServiceActivator.

...