Как мне сделать JMS-транспорт для CXF-репоста с ошибочными сообщениями QPid? С другой стороны, есть ли лучшее решение, которое я пропускаю? - PullRequest
3 голосов
/ 08 декабря 2011

Я пишу веб-сервис электронной почты для моей компании.Одним из основных требований является гарантированная доставка, поэтому у нас есть тонкий HTTP-слой по JMS-транспорту, использующий постоянную очередь QPid.

Одна из проблем, с которыми я сталкиваюсь, - это обработка ошибок во время обработки.Если я просто откатываю транзакцию при появлении ошибки, сообщение отправляется в начало очереди.Если ошибка является достаточно распространенной, это может заблокировать всю очередь до тех пор, пока кто-нибудь не вмешается вручную, и мы хотели бы избежать этого, отправив сообщение обратно в голову, чтобы сообщения могли быть обработаны за это время.

Однако,в этом мои проблемы.Во-первых, в то время как AMQP имеет механизм «отклонять и запрашивать» сообщение атомарно вместо его подтверждения, JMS, похоже, не имеет никакого аналога для этой функции, поэтому единственный способ получить к нему доступ - это приведение, которое связывает меня сконкретная реализация JMS.Кроме того, JMS-транспорт для CXF, похоже, не имеет каких-либо средств переопределения или внедрения поведения на транспортном уровне, что означает, что я застрял либо в написании агента байт-кода, либо в изменении кода и перекомпиляции просто для получения нужного мне поведения.

Чтобы обойти эту проблему, мне понравилась идея реализовать обработчик ошибок в CXF, который просто реконструирует сообщение JMS из сообщения CXF и помещает его в очередь.Но тогда я не могу использовать транзакционный сеанс, потому что ошибка вызывает откат, который я не могу переопределить, и тогда я получу копию сообщения на голове (от отката) и на хвосте (из очереди).И я не могу использовать CLIENT_ACKNOWLEDGE, потому что JMS-транспорт подтверждает сообщение перед тем, как отправит его на обработку, что означает, что если сервер выйдет из строя в неподходящее время, я могу потерять сообщение.

* 1010В общем, пока я застрял, соглашаясь с поведением JMS-транспорта по умолчанию, кажется, что невозможно получить желаемое поведение (пересылка сообщений с ошибками) без ущерба для целостности данных.

Сотрудник предложилполностью отказаться от JMS-транспорта и напрямую вызвать очередь.Реализация сервиса будет тогда каркасным классом, который существует исключительно для помещения сообщений в очередь, а другой процесс будет реализовывать прослушиватель сообщений.Для меня это решение является неоптимальным, потому что я теряю элегантность независимого веб-сервиса и теряю некоторую масштабируемость, связывая свою реализацию с базовой технологией.

Я также рассмотрел просто написание транспорта CXFдля AMQP с использованием клиентской библиотеки RabbitMQ.Это займет больше времени, но это будет то, что наша компания могла бы продолжать использовать в будущем, и, возможно, что-то, что можно было бы внести обратно в проект CXF.Тем не менее, я не в восторге от этой идеи из-за количества времени, затрачиваемого на написание, запуск и тестирование кода.

Вот мой beans.xml для CXF:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jaxrs="http://cxf.apache.org/jaxrs"
    xmlns:jms="http://cxf.apache.org/transports/jms"
    xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="
    http://www.springframework.org/schema/beans     http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/util      http://www.springframework.org/schema/util/spring-util.xsd
    http://www.springframework.org/schema/context   http://www.springframework.org/schema/context/spring-context.xsd
    http://cxf.apache.org/jaxrs                     http://cxf.apache.org/schemas/jaxrs.xsd
    http://cxf.apache.org/transports/jms            http://cxf.apache.org/schemas/configuration/jms.xsd">

    <import resource="classpath:META-INF/cxf/cxf.xml" />
    <import resource="classpath:META-INF/cxf/cxf-extension-soap.xml" />
    <import resource="classpath:META-INF/cxf/cxf-servlet.xml" />

    <context:component-scan base-package="com.edo" />

    <bean id="jmsConnectionFactory" class="org.apache.qpid.client.AMQConnectionFactory">
        <constructor-arg name="broker" value="tcp://localhost:5672"/>
        <constructor-arg name="username" value="guest"/>
        <constructor-arg name="password" value="guest"/>
        <constructor-arg name="clientName" value=""/>
        <constructor-arg name="virtualHost" value=""/>
    </bean>

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" p:explicitQosEnabled="true" p:deliveryMode="1" p:timeToLive="5000" p:connectionFactory-ref="jmsConnectionFactory" p:sessionTransacted="false" p:sessionAcknowledgeModeName="CLIENT_ACKNOWLEDGE" />
    <bean id="jmsConfig" class="org.apache.cxf.transport.jms.JMSConfiguration" p:connectionFactory-ref="jmsConnectionFactory" p:wrapInSingleConnectionFactory="false" p:jmsTemplate-ref="jmsTemplate" p:timeToLive="500000" p:sessionTransacted="false" p:concurrentConsumers="1" p:maxSuspendedContinuations="0" p:maxConcurrentConsumers="1" />

    <jms:destination id="jms-destination-bean" name="{http://test.jms.jaxrs.edo.com/}HelloWorldImpl.jms-destination">
        <jms:address jndiConnectionFactoryName="ConnectionFactory" jmsDestinationName="TestQueue">
            <jms:JMSNamingProperty name="java.naming.factory.initial" value="org.apache.activemq.jndi.ActiveMQInitialContextFactory"/>
            <jms:JMSNamingProperty name="java.naming.provider.url" value="tcp://localhost:5672"/>
        </jms:address>
        <jms:jmsConfig-ref>jmsConfig</jms:jmsConfig-ref>
    </jms:destination>

    <jaxrs:server id="helloWorld" address="/HelloWorld" transportId="http://cxf.apache.org/transports/jms">
      <jaxrs:serviceBeans>
        <ref bean="helloWorldBean"/>
      </jaxrs:serviceBeans>
      <jaxrs:inInterceptors>
        <bean class="com.edo.jaxrs.jms.test.FlowControlInInterceptor" p:periodMs="1000" p:permitsPerPeriod="18" />
      </jaxrs:inInterceptors>
      <jaxrs:providers>
        <bean class="org.apache.cxf.jaxrs.provider.JSONProvider">
          <property name="produceMediaTypes" ref="jsonTypes"/>
          <property name="consumeMediaTypes" ref="jsonTypes"/>
        </bean>
      </jaxrs:providers>
    </jaxrs:server>

    <bean id="http-jms-config" class="com.edo.jaxrs.jms.test.HttpOverJmsConfig" 
        p:jmsFactory-ref="jmsConnectionFactory" 
        p:jmsDestinationName="TestQueue" />

    <util:list id="jsonTypes">
      <value>application/json</value>
      <value>application/jettison</value>
    </util:list>

</beans>

Есть что-то простое, что мне не хватает?Или есть лучший способ обойти это, чтобы обойти проблему?

1 Ответ

0 голосов
/ 13 декабря 2011

Итак - я беру совет моего коллеги и не использую транспорт JMS для веб-службы.Вместо этого мы собираемся создать тонкий слой веб-службы поверх Spring Integration.Это должно позволить нам детализировать контроль, в котором мы нуждаемся, без ненужной демонстрации уровня обмена сообщениями.

...