Я хочу получить строки данных из разных баз данных и вставить их в другую базу данных агрегации.Этот простой тест в Java-коде Camel работает нормально:
from("timer://Timer?period=60000")
.setBody(constant("select * from cosdata"))
.to("jdbc:vedasDataSource")
.split(body())
.setBody(simple("insert into acogsdata values('${body[id]}','${body[code]}','${body[value]}')"))
.to("jdbc:quantumDataSource");
Что мне нравится в этом, так это возможность поддерживать код SQL в файлах свойств, чтобы отдельные знания схемы данных были отделены от транспорта (Camel).код (я не показываю это в этом примере, но сейчас я ожидаю, что это достижимо).Я хочу подключиться к различным источникам данных, которые будут иметь разные имена для полей и таблиц.
Я хотел бы расширить приведенный выше пример, чтобы данные направлялись в целевую базу данных через веб-сервис, который должентакже не требуется предварительное знание конкретной схемы, относящейся к источнику данных, т.е. я хотел бы избежать представления данных POJO.Поэтому после долгих исследований и испытаний я теперь задаюсь вопросом, возможно ли вообще это понятие (отделение структуры данных от транспортного уровня)?
Приведенный ниже код показывает общую идею, но дает исключение.Исключение подразумевает, что определение данных отсутствует.Это связано с маршалингом / демаршалингом через веб-сервис.Концептуально было бы неплохо, если бы маршруты Camel могли сохранить поведение первого примера, не нарушая добавление конечной точки веб-службы.
Следующий код в моей конфигурации построителя маршрутов показывает, чего я хотел бы достичь:
final String quantumURI = "jdbc:postgresql://localhost:5432/quantum";
DataSource quantumdataSource1 = setupQuantumDataSource(quantumURI);
SimpleRegistry reg = new SimpleRegistry() ;
reg.put("quantumDataSource",quantumdataSource1);
CamelContext context = getContext();
((org.apache.camel.impl.DefaultCamelContext) context).setRegistry(reg);
from(uri)
.to("log:input")
// send the request to the route to handle the operation
.recipientList(simple("direct:${header.operationName}"));
// upload
from("direct:upload")
.split().body()
.setBody(simple("insert into acogsdata values('${body[id]}','${body[code]}','${body[value]}')"))
.to("jdbc:quantumDataSource")
.to("log:output");
Я хочу вставить каждую из «строк» в базу данных.Тело в виде ArrayList выглядит следующим образом:
[{id=1, code=Meds, value=12.34}, {id=2, code=Meds, value=12376.39}, {id=3, code=Samples, value=8002.54}, {id=4, code=Misc, value=124.34}, {id=5, code=Fees, value=125.34}]
Эти данные поступают из оператора выбора источника данных, который затем отправляется клиенту в веб-службу:
Я получаю следующее исключение:
org.apache.cxf.binding.soap.SoapFault: Failed to invoke method: [id] on java.lang.String due to: java.lang.IndexOutOfBoundsException: Key: id not found in bean: [{id=1, code=Meds, value=12.34}, {id=2, code=Meds, value=12376.39}, {id=3, code=Samples, value=8002.54}, {id=4, code=Misc, value=124.34}, {id=5, code=Fees, value=125.34}] of type: java.lang.String using OGNL path [[id]]
Полная трассировка стека:
org.apache.cxf.binding.soap.SoapFault: Failed to invoke method: [id] on java.lang.String due to: java.lang.IndexOutOfBoundsException: Key: id not found in bean: [{id=1, code=Meds, value=12.34}, {id=2, code=Meds, value=12376.39}, {id=3, code=Samples, value=8002.54}, {id=4, code=Misc, value=124.34}, {id=5, code=Fees, value=125.34}] of type: java.lang.String using OGNL path [[id]]
at org.apache.cxf.binding.soap.interceptor.Soap11FaultInInterceptor.unmarshalFault(Soap11FaultInInterceptor.java:87)
at org.apache.cxf.binding.soap.interceptor.Soap11FaultInInterceptor.handleMessage(Soap11FaultInInterceptor.java:53)
at org.apache.cxf.binding.soap.interceptor.Soap11FaultInInterceptor.handleMessage(Soap11FaultInInterceptor.java:42)
at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:308)
at org.apache.cxf.interceptor.AbstractFaultChainInitiatorObserver.onMessage(AbstractFaultChainInitiatorObserver.java:112)
at org.apache.cxf.binding.soap.interceptor.CheckFaultInterceptor.handleMessage(CheckFaultInterceptor.java:70)
at org.apache.cxf.binding.soap.interceptor.CheckFaultInterceptor.handleMessage(CheckFaultInterceptor.java:35)
at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:308)
at org.apache.cxf.endpoint.ClientImpl.onMessage(ClientImpl.java:833)
at org.apache.cxf.transport.http.HTTPConduit$WrappedOutputStream.handleResponseInternal(HTTPConduit.java:1695)
at org.apache.cxf.transport.http.HTTPConduit$WrappedOutputStream.handleResponse(HTTPConduit.java:1572)
at org.apache.cxf.transport.http.HTTPConduit$WrappedOutputStream.close(HTTPConduit.java:1373)
at org.apache.cxf.transport.AbstractConduit.close(AbstractConduit.java:56)
at org.apache.cxf.transport.http.HTTPConduit.close(HTTPConduit.java:673)
at org.apache.cxf.interceptor.MessageSenderInterceptor$MessageSenderEndingInterceptor.handleMessage(MessageSenderInterceptor.java:63)
at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:308)
at org.apache.cxf.endpoint.ClientImpl.doInvoke(ClientImpl.java:537)
at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:446)
at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:361)
at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:319)
at org.apache.cxf.frontend.ClientProxy.invokeSync(ClientProxy.java:96)
at org.apache.cxf.frontend.ClientProxy.invoke(ClientProxy.java:81)
at com.sun.proxy.$Proxy34.upload(Unknown Source)
at test1.VisionToWSRouteBuilder$1.process(VisionToWSRouteBuilder.java:31)
at org.apache.camel.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:63)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:197)
at org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:79)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)
[Camel (camel-1) thread #1 - timer://Timer] WARN org.apache.camel.component.timer.TimerConsumer - Error processing exchange. Exchange[ID-Nelson-1539460289761-0-1]. Caused by: [org.apache.cxf.binding.soap.SoapFault - Failed to invoke method: [id] on java.lang.String due to: java.lang.IndexOutOfBoundsException: Key: id not found in bean: [{id=1, code=Meds, value=12.34}, {id=2, code=Meds, value=12376.39}, {id=3, code=Samples, value=8002.54}, {id=4, code=Misc, value=124.34}, {id=5, code=Fees, value=125.34}] of type: java.lang.String using OGNL path [[id]]]
org.apache.cxf.binding.soap.SoapFault: Failed to invoke method: [id] on java.lang.String due to: java.lang.IndexOutOfBoundsException: Key: id not found in bean: [{id=1, code=Meds, value=12.34}, {id=2, code=Meds, value=12376.39}, {id=3, code=Samples, value=8002.54}, {id=4, code=Misc, value=124.34}, {id=5, code=Fees, value=125.34}] of type: java.lang.String using OGNL path [[id]]
at org.apache.cxf.binding.soap.interceptor.Soap11FaultInInterceptor.unmarshalFault(Soap11FaultInInterceptor.java:87)
at org.apache.cxf.binding.soap.interceptor.Soap11FaultInInterceptor.handleMessage(Soap11FaultInInterceptor.java:53)
at org.apache.cxf.binding.soap.interceptor.Soap11FaultInInterceptor.handleMessage(Soap11FaultInInterceptor.java:42)
at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:308)
at org.apache.cxf.interceptor.AbstractFaultChainInitiatorObserver.onMessage(AbstractFaultChainInitiatorObserver.java:112)
at org.apache.cxf.binding.soap.interceptor.CheckFaultInterceptor.handleMessage(CheckFaultInterceptor.java:70)
at org.apache.cxf.binding.soap.interceptor.CheckFaultInterceptor.handleMessage(CheckFaultInterceptor.java:35)
at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:308)
at org.apache.cxf.endpoint.ClientImpl.onMessage(ClientImpl.java:833)
at org.apache.cxf.transport.http.HTTPConduit$WrappedOutputStream.handleResponseInternal(HTTPConduit.java:1695)
at org.apache.cxf.transport.http.HTTPConduit$WrappedOutputStream.handleResponse(HTTPConduit.java:1572)
at org.apache.cxf.transport.http.HTTPConduit$WrappedOutputStream.close(HTTPConduit.java:1373)
at org.apache.cxf.transport.AbstractConduit.close(AbstractConduit.java:56)
at org.apache.cxf.transport.http.HTTPConduit.close(HTTPConduit.java:673)
at org.apache.cxf.interceptor.MessageSenderInterceptor$MessageSenderEndingInterceptor.handleMessage(MessageSenderInterceptor.java:63)
at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:308)
at org.apache.cxf.endpoint.ClientImpl.doInvoke(ClientImpl.java:537)
at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:446)
at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:361)
at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:319)
at org.apache.cxf.frontend.ClientProxy.invokeSync(ClientProxy.java:96)
at org.apache.cxf.frontend.ClientProxy.invoke(ClientProxy.java:81)
at com.sun.proxy.$Proxy34.upload(Unknown Source)
at test1.VisionToWSRouteBuilder$1.process(VisionToWSRouteBuilder.java:31)
at org.apache.camel.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:63)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:197)
at org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:79)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)
Я ожидал, что разделение предоставит каждую строку для setBody (оператор вставки) - хорошо, так что еще многое предстоит изучить.
Вызов к WS осуществляется через интерфейс Java:
public interface Ingester {
String verify(String input);
String upload(Object data);//tried various permutations
}
Настройка клиента:
from("timer://Timer?period=60000")
.setBody(constant("select * from cosdata"))
.to("jdbc:vedasDataSource")
вызывается так внутри процессора:
ClientProxyFactoryBean factory = new ClientProxyFactoryBean();
factory.setServiceClass(Ingester.class);
factory.setAddress(URL);
Ingester client = factory.create();
String out = client.upload(exchange.getIn().getBody(String.class));
Я ожидалразделение обеспечит каждую строку для setBody (оператор вставки) - хорошо, так что еще многое предстоит узнать.Я дал кому-то свою книгу ...
Может кто-нибудь помочь с этим?Возможно, некоторые советы о том, как добиться разделения между структурой данных и транспортом.
Заранее спасибо.