Верблюжий CXF от URI до конечной точки JDBC - org.apache.cxf.binding.soap.SoapFault - PullRequest
0 голосов
/ 13 октября 2018

Я хочу получить строки данных из разных баз данных и вставить их в другую базу данных агрегации.Этот простой тест в Java-коде Camel работает нормально:

from("timer://Timer?period=60000")
        .setBody(constant("select * from cosdata"))
        .to("jdbc:vedasDataSource")
        .split(body())
        .setBody(simple("insert into acogsdata values('${body[id]}','${body[code]}','${body[value]}')"))
        .to("jdbc:quantumDataSource");

Что мне нравится в этом, так это возможность поддерживать код SQL в файлах свойств, чтобы отдельные знания схемы данных были отделены от транспорта (Camel).код (я не показываю это в этом примере, но сейчас я ожидаю, что это достижимо).Я хочу подключиться к различным источникам данных, которые будут иметь разные имена для полей и таблиц.

Я хотел бы расширить приведенный выше пример, чтобы данные направлялись в целевую базу данных через веб-сервис, который должентакже не требуется предварительное знание конкретной схемы, относящейся к источнику данных, т.е. я хотел бы избежать представления данных POJO.Поэтому после долгих исследований и испытаний я теперь задаюсь вопросом, возможно ли вообще это понятие (отделение структуры данных от транспортного уровня)?

Приведенный ниже код показывает общую идею, но дает исключение.Исключение подразумевает, что определение данных отсутствует.Это связано с маршалингом / демаршалингом через веб-сервис.Концептуально было бы неплохо, если бы маршруты Camel могли сохранить поведение первого примера, не нарушая добавление конечной точки веб-службы.

Следующий код в моей конфигурации построителя маршрутов показывает, чего я хотел бы достичь:

 final String quantumURI = "jdbc:postgresql://localhost:5432/quantum";
 DataSource quantumdataSource1 = setupQuantumDataSource(quantumURI);
 SimpleRegistry reg = new SimpleRegistry() ;
 reg.put("quantumDataSource",quantumdataSource1);
 CamelContext context = getContext();
    ((org.apache.camel.impl.DefaultCamelContext) context).setRegistry(reg);

 from(uri)
        .to("log:input")
        // send the request to the route to handle the operation
        .recipientList(simple("direct:${header.operationName}"));

    // upload
    from("direct:upload")
    .split().body()
    .setBody(simple("insert into acogsdata values('${body[id]}','${body[code]}','${body[value]}')"))
.to("jdbc:quantumDataSource")
.to("log:output");

Я хочу вставить каждую из «строк» ​​в базу данных.Тело в виде ArrayList выглядит следующим образом:

[{id=1, code=Meds, value=12.34}, {id=2, code=Meds, value=12376.39}, {id=3, code=Samples, value=8002.54}, {id=4, code=Misc, value=124.34}, {id=5, code=Fees, value=125.34}]

Эти данные поступают из оператора выбора источника данных, который затем отправляется клиенту в веб-службу:

Я получаю следующее исключение:

org.apache.cxf.binding.soap.SoapFault: Failed to invoke method: [id] on java.lang.String due to: java.lang.IndexOutOfBoundsException: Key: id not found in bean: [{id=1, code=Meds, value=12.34}, {id=2, code=Meds, value=12376.39}, {id=3, code=Samples, value=8002.54}, {id=4, code=Misc, value=124.34}, {id=5, code=Fees, value=125.34}] of type: java.lang.String using OGNL path [[id]]

Полная трассировка стека:

    org.apache.cxf.binding.soap.SoapFault: Failed to invoke method: [id] on java.lang.String due to: java.lang.IndexOutOfBoundsException: Key: id not found in bean: [{id=1, code=Meds, value=12.34}, {id=2, code=Meds, value=12376.39}, {id=3, code=Samples, value=8002.54}, {id=4, code=Misc, value=124.34}, {id=5, code=Fees, value=125.34}] of type: java.lang.String using OGNL path [[id]]
    at org.apache.cxf.binding.soap.interceptor.Soap11FaultInInterceptor.unmarshalFault(Soap11FaultInInterceptor.java:87)
    at org.apache.cxf.binding.soap.interceptor.Soap11FaultInInterceptor.handleMessage(Soap11FaultInInterceptor.java:53)
    at org.apache.cxf.binding.soap.interceptor.Soap11FaultInInterceptor.handleMessage(Soap11FaultInInterceptor.java:42)
    at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:308)
    at org.apache.cxf.interceptor.AbstractFaultChainInitiatorObserver.onMessage(AbstractFaultChainInitiatorObserver.java:112)
    at org.apache.cxf.binding.soap.interceptor.CheckFaultInterceptor.handleMessage(CheckFaultInterceptor.java:70)
    at org.apache.cxf.binding.soap.interceptor.CheckFaultInterceptor.handleMessage(CheckFaultInterceptor.java:35)
    at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:308)
    at org.apache.cxf.endpoint.ClientImpl.onMessage(ClientImpl.java:833)
    at org.apache.cxf.transport.http.HTTPConduit$WrappedOutputStream.handleResponseInternal(HTTPConduit.java:1695)
    at org.apache.cxf.transport.http.HTTPConduit$WrappedOutputStream.handleResponse(HTTPConduit.java:1572)
    at org.apache.cxf.transport.http.HTTPConduit$WrappedOutputStream.close(HTTPConduit.java:1373)
    at org.apache.cxf.transport.AbstractConduit.close(AbstractConduit.java:56)
    at org.apache.cxf.transport.http.HTTPConduit.close(HTTPConduit.java:673)
    at org.apache.cxf.interceptor.MessageSenderInterceptor$MessageSenderEndingInterceptor.handleMessage(MessageSenderInterceptor.java:63)
    at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:308)
    at org.apache.cxf.endpoint.ClientImpl.doInvoke(ClientImpl.java:537)
    at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:446)
    at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:361)
    at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:319)
    at org.apache.cxf.frontend.ClientProxy.invokeSync(ClientProxy.java:96)
    at org.apache.cxf.frontend.ClientProxy.invoke(ClientProxy.java:81)
    at com.sun.proxy.$Proxy34.upload(Unknown Source)
    at test1.VisionToWSRouteBuilder$1.process(VisionToWSRouteBuilder.java:31)
    at org.apache.camel.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:63)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
    at org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:197)
    at org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:79)
    at java.util.TimerThread.mainLoop(Timer.java:555)
    at java.util.TimerThread.run(Timer.java:505)
[Camel (camel-1) thread #1 - timer://Timer] WARN org.apache.camel.component.timer.TimerConsumer - Error processing exchange. Exchange[ID-Nelson-1539460289761-0-1]. Caused by: [org.apache.cxf.binding.soap.SoapFault - Failed to invoke method: [id] on java.lang.String due to: java.lang.IndexOutOfBoundsException: Key: id not found in bean: [{id=1, code=Meds, value=12.34}, {id=2, code=Meds, value=12376.39}, {id=3, code=Samples, value=8002.54}, {id=4, code=Misc, value=124.34}, {id=5, code=Fees, value=125.34}] of type: java.lang.String using OGNL path [[id]]]
org.apache.cxf.binding.soap.SoapFault: Failed to invoke method: [id] on java.lang.String due to: java.lang.IndexOutOfBoundsException: Key: id not found in bean: [{id=1, code=Meds, value=12.34}, {id=2, code=Meds, value=12376.39}, {id=3, code=Samples, value=8002.54}, {id=4, code=Misc, value=124.34}, {id=5, code=Fees, value=125.34}] of type: java.lang.String using OGNL path [[id]]
    at org.apache.cxf.binding.soap.interceptor.Soap11FaultInInterceptor.unmarshalFault(Soap11FaultInInterceptor.java:87)
    at org.apache.cxf.binding.soap.interceptor.Soap11FaultInInterceptor.handleMessage(Soap11FaultInInterceptor.java:53)
    at org.apache.cxf.binding.soap.interceptor.Soap11FaultInInterceptor.handleMessage(Soap11FaultInInterceptor.java:42)
    at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:308)
    at org.apache.cxf.interceptor.AbstractFaultChainInitiatorObserver.onMessage(AbstractFaultChainInitiatorObserver.java:112)
    at org.apache.cxf.binding.soap.interceptor.CheckFaultInterceptor.handleMessage(CheckFaultInterceptor.java:70)
    at org.apache.cxf.binding.soap.interceptor.CheckFaultInterceptor.handleMessage(CheckFaultInterceptor.java:35)
    at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:308)
    at org.apache.cxf.endpoint.ClientImpl.onMessage(ClientImpl.java:833)
    at org.apache.cxf.transport.http.HTTPConduit$WrappedOutputStream.handleResponseInternal(HTTPConduit.java:1695)
    at org.apache.cxf.transport.http.HTTPConduit$WrappedOutputStream.handleResponse(HTTPConduit.java:1572)
    at org.apache.cxf.transport.http.HTTPConduit$WrappedOutputStream.close(HTTPConduit.java:1373)
    at org.apache.cxf.transport.AbstractConduit.close(AbstractConduit.java:56)
    at org.apache.cxf.transport.http.HTTPConduit.close(HTTPConduit.java:673)
    at org.apache.cxf.interceptor.MessageSenderInterceptor$MessageSenderEndingInterceptor.handleMessage(MessageSenderInterceptor.java:63)
    at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:308)
    at org.apache.cxf.endpoint.ClientImpl.doInvoke(ClientImpl.java:537)
    at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:446)
    at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:361)
    at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:319)
    at org.apache.cxf.frontend.ClientProxy.invokeSync(ClientProxy.java:96)
    at org.apache.cxf.frontend.ClientProxy.invoke(ClientProxy.java:81)
    at com.sun.proxy.$Proxy34.upload(Unknown Source)
    at test1.VisionToWSRouteBuilder$1.process(VisionToWSRouteBuilder.java:31)
    at org.apache.camel.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:63)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
    at org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:197)
    at org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:79)
    at java.util.TimerThread.mainLoop(Timer.java:555)
    at java.util.TimerThread.run(Timer.java:505)

Я ожидал, что разделение предоставит каждую строку для setBody (оператор вставки) - хорошо, так что еще многое предстоит изучить.

Вызов к WS осуществляется через интерфейс Java:

public interface Ingester {
  String verify(String input);
  String upload(Object data);//tried various permutations
}

Настройка клиента:

from("timer://Timer?period=60000")
.setBody(constant("select * from cosdata"))
.to("jdbc:vedasDataSource") 

вызывается так внутри процессора:

ClientProxyFactoryBean factory = new ClientProxyFactoryBean();
factory.setServiceClass(Ingester.class);
factory.setAddress(URL);
Ingester client = factory.create();
String out = client.upload(exchange.getIn().getBody(String.class));

Я ожидалразделение обеспечит каждую строку для setBody (оператор вставки) - хорошо, так что еще многое предстоит узнать.Я дал кому-то свою книгу ...

Может кто-нибудь помочь с этим?Возможно, некоторые советы о том, как добиться разделения между структурой данных и транспортом.

Заранее спасибо.

...