Не совсем понятно, о чем вы спрашиваете, поэтому я предполагаю, что вы имеете в виду, что вам нужен весенний интеграционный прокси между вашими клиентами и серверами. Что-то вроде:
iot-device -> spring server -> message-transformation -> spring client -> back-end-server
Если это так, вы можете реализовать ClientConnectionIdAware
фабрику клиентских соединений, которая включает стандартную фабрику.
В потоке интеграции свяжите входящий заголовок ip_connectionId
в сообщении с потоком (в ThreadLocal
).
Затем в фабрике клиентских соединений найдите соответствующее исходящее соединение на карте, используя значение ThreadLocal
; если не найден (или закрыт), создайте новый и сохраните его на карте для дальнейшего использования.
Реализация ApplictionListener
(или @EventListener
) для прослушивания TcpConnectionCloseEvent
s от фабрики соединений с сервером и close()
соответствующего исходящего соединения.
Это звучит как классное улучшение, так что подумайте над тем, чтобы добавить его обратно в фреймворк.
EDIT
Версия 5.0 добавила ThreadAffinityClientConnectionFactory
, которая работала бы из коробки с TcpNetServerConnectionFactory
, так как каждое соединение получает свой собственный поток.
С TcpNioServerConnectionFactory
вам потребуется дополнительная логика для динамического связывания соединения с потоком для каждого запроса.
EDIT2
@SpringBootApplication
public class So51200675Application {
public static void main(String[] args) {
SpringApplication.run(So51200675Application.class, args).close();
}
@Bean
public ApplicationRunner runner() {
return args -> {
Socket socket = SocketFactory.getDefault().createSocket("localhost", 1234);
socket.getOutputStream().write("foo\r\n".getBytes());
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
System.out.println(reader.readLine());
socket.close();
};
}
@Bean
public Map<String, String> fromToConnectionMappings() {
return new ConcurrentHashMap<>();
}
@Bean
public Map<String, String> toFromConnectionMappings() {
return new ConcurrentHashMap<>();
}
@Bean
public IntegrationFlow proxyInboundFlow() {
return IntegrationFlows.from(Tcp.inboundAdapter(serverFactory()))
.transform(Transformers.objectToString())
.<String, String>transform(s -> s.toUpperCase())
.handle((p, h) -> {
mapConnectionIds(h);
return p;
})
.handle(Tcp.outboundAdapter(threadConnectionFactory()))
.get();
}
@Bean
public IntegrationFlow proxyOutboundFlow() {
return IntegrationFlows.from(Tcp.inboundAdapter(threadConnectionFactory()))
.transform(Transformers.objectToString())
.<String, String>transform(s -> s.toUpperCase())
.enrichHeaders(e -> e
.headerExpression(IpHeaders.CONNECTION_ID, "@toFromConnectionMappings.get(headers['"
+ IpHeaders.CONNECTION_ID + "'])").defaultOverwrite(true))
.handle(Tcp.outboundAdapter(serverFactory()))
.get();
}
private void mapConnectionIds(Map<String, Object> h) {
try {
TcpConnection connection = threadConnectionFactory().getConnection();
String mapping = toFromConnectionMappings().get(connection.getConnectionId());
String incomingCID = (String) h.get(IpHeaders.CONNECTION_ID);
if (mapping == null || !(mapping.equals(incomingCID))) {
System.out.println("Adding new mapping " + incomingCID + " to " + connection.getConnectionId());
toFromConnectionMappings().put(connection.getConnectionId(), incomingCID);
fromToConnectionMappings().put(incomingCID, connection.getConnectionId());
}
}
catch (Exception e) {
e.printStackTrace();
}
}
@Bean
public ThreadAffinityClientConnectionFactory threadConnectionFactory() {
return new ThreadAffinityClientConnectionFactory(clientFactory()) {
@Override
public boolean isSingleUse() {
return false;
}
};
}
@Bean
public AbstractServerConnectionFactory serverFactory() {
return Tcp.netServer(1234).get();
}
@Bean
public AbstractClientConnectionFactory clientFactory() {
AbstractClientConnectionFactory clientFactory = Tcp.netClient("localhost", 1235).get();
clientFactory.setSingleUse(true);
return clientFactory;
}
@Bean
public IntegrationFlow serverFlow() {
return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1235)))
.transform(Transformers.objectToString())
.<String, String>transform(p -> p + p)
.get();
}
@Bean
public ApplicationListener<TcpConnectionCloseEvent> closer() {
return e -> {
if (fromToConnectionMappings().containsKey(e.getConnectionId())) {
String key = fromToConnectionMappings().remove(e.getConnectionId());
toFromConnectionMappings().remove(key);
System.out.println("Removed mapping " + e.getConnectionId() + " to " + key);
threadConnectionFactory().releaseConnection();
}
};
}
}
EDIT3
У меня отлично работает с MapJsonSerializer
.
@SpringBootApplication
public class So51200675Application {
public static void main(String[] args) {
SpringApplication.run(So51200675Application.class, args).close();
}
@Bean
public ApplicationRunner runner() {
return args -> {
Socket socket = SocketFactory.getDefault().createSocket("localhost", 1234);
socket.getOutputStream().write("{\"foo\":\"bar\"}\n".getBytes());
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
System.out.println(reader.readLine());
socket.close();
};
}
@Bean
public Map<String, String> fromToConnectionMappings() {
return new ConcurrentHashMap<>();
}
@Bean
public Map<String, String> toFromConnectionMappings() {
return new ConcurrentHashMap<>();
}
@Bean
public MapJsonSerializer serializer() {
return new MapJsonSerializer();
}
@Bean
public IntegrationFlow proxyRequestFlow() {
return IntegrationFlows.from(Tcp.inboundAdapter(serverFactory()))
.<Map<String, String>, Map<String, String>>transform(m -> {
m.put("foo", m.get("foo").toUpperCase());
return m;
})
.handle((p, h) -> {
mapConnectionIds(h);
return p;
})
.handle(Tcp.outboundAdapter(threadConnectionFactory()))
.get();
}
@Bean
public IntegrationFlow proxyReplyFlow() {
return IntegrationFlows.from(Tcp.inboundAdapter(threadConnectionFactory()))
.<Map<String, String>, Map<String, String>>transform(m -> {
m.put("foo", m.get("foo").toLowerCase() + m.get("foo"));
return m;
})
.enrichHeaders(e -> e
.headerExpression(IpHeaders.CONNECTION_ID, "@toFromConnectionMappings.get(headers['"
+ IpHeaders.CONNECTION_ID + "'])").defaultOverwrite(true))
.handle(Tcp.outboundAdapter(serverFactory()))
.get();
}
private void mapConnectionIds(Map<String, Object> h) {
try {
TcpConnection connection = threadConnectionFactory().getConnection();
String mapping = toFromConnectionMappings().get(connection.getConnectionId());
String incomingCID = (String) h.get(IpHeaders.CONNECTION_ID);
if (mapping == null || !(mapping.equals(incomingCID))) {
System.out.println("Adding new mapping " + incomingCID + " to " + connection.getConnectionId());
toFromConnectionMappings().put(connection.getConnectionId(), incomingCID);
fromToConnectionMappings().put(incomingCID, connection.getConnectionId());
}
}
catch (Exception e) {
e.printStackTrace();
}
}
@Bean
public ThreadAffinityClientConnectionFactory threadConnectionFactory() {
return new ThreadAffinityClientConnectionFactory(clientFactory()) {
@Override
public boolean isSingleUse() {
return false;
}
};
}
@Bean
public AbstractServerConnectionFactory serverFactory() {
return Tcp.netServer(1234)
.serializer(serializer())
.deserializer(serializer())
.get();
}
@Bean
public AbstractClientConnectionFactory clientFactory() {
AbstractClientConnectionFactory clientFactory = Tcp.netClient("localhost", 1235)
.serializer(serializer())
.deserializer(serializer())
.get();
clientFactory.setSingleUse(true);
return clientFactory;
}
@Bean
public IntegrationFlow backEndEmulatorFlow() {
return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1235)
.serializer(serializer())
.deserializer(serializer())))
.<Map<String, String>, Map<String, String>>transform(m -> {
m.put("foo", m.get("foo") + m.get("foo"));
return m;
})
.get();
}
@Bean
public ApplicationListener<TcpConnectionCloseEvent> closer() {
return e -> {
if (fromToConnectionMappings().containsKey(e.getConnectionId())) {
String key = fromToConnectionMappings().remove(e.getConnectionId());
toFromConnectionMappings().remove(key);
System.out.println("Removed mapping " + e.getConnectionId() + " to " + key);
threadConnectionFactory().releaseConnection();
}
};
}
}
и
Добавление нового сопоставления localhost: 56998: 1234: 55c822a4-4252-45e6-9ef2-79263391f4be на localhost: 1235: 56999: 3d520ca9-2f3a-44c3-b05f-e59695b8c1b0
{ "Foo": "barbarBARBAR"}
Удалено отображение localhost: 56998: 1234: 55c822a4-4252-45e6-9ef2-79263391f4be на localhost: 1235: 56999: 3d520ca9-2f3a-44c3-b05f-e59695b8c1b0