ActiveMQ Duplicate Message Detection (Wildfly, Java EE) - PullRequest
0 голосов
/ 26 сентября 2018

Во-первых, простите за мой английский!У меня есть проект с JMS и ActiveMQ

@Stateless
@LocalBean
public class Producer {

public void produceMessage(List<String> entityIds) {
  try {
    InitialContext initialContext = new InitialContext();
    ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup("java:/JmsXA");
    Destination destination = (Destination) initialContext.lookup("jms/queue/cachedAttrs");

    Connection connection = connectionFactory.createConnection();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    MessageProducer producer = session.createProducer(destination);
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);

    entityIds.forEach(entityId -> {                 
                TextMessage message = session.createTextMessage(entityId);
                message.setStringProperty("_AMQ_DUPL_ID", entityId);
                producer.send(message);                     
        }
    );

    connection.close();
    session.close();
    initialContext.close();
  } catch (Exception e) {
    throw new IllegalStateException("..." + e.getMessage());
  }
 }
}

@MessageDriven(
  name = "Consumer",
  activationConfig = {
    @ActivationConfigProperty(propertyName = "destination", propertyValue = 
"jms/queue/cachedAttrs"),
    @ActivationConfigProperty(propertyName = "destinationType", propertyValue = 
  "javax.jms.Queue"),
    @ActivationConfigProperty(propertyName = "useDLQ", propertyValue = "false")
  }
)
public class Consumer implements MessageListener {  

@PersistenceContext
private EntityManager entityManager;

@Override
public void onMessage(Message message) {
  TextMessage id = (TextMessage) message;
}
}

Producer.produceMessage () может вызывать во многих местах проекта и в одно и то же время.Мне нужно проверить дубликат ID и не вызывать Consumer for ID, если этот идентификатор содержится в очереди.

Я читаю https://activemq.apache.org/artemis/docs/1.0.0/duplicate-detection.html и делаю это

message.setStringProperty("_AMQ_DUPL_ID", entityId);

Вызывающий источник:

producer.produceMessage(Arrays.asList("1", "2", "1", "3", "2"));

И получение исключения:

Причина: ActiveMQDuplicateIdException [errorType = DUPLICATE_ID_REJECTED message = Обнаружено повторяющееся сообщение - сообщение не будет направлено.Информация о сообщении: ServerMessage [messageID = 266288062339, длительный = истина, userID = 0c03aadc-c07a-11e8-9fb7-775c9c2bdfeb, приоритет = 4, bodySize = 225, отметка времени = вторник, 25 сентября 11:18:25 GMT + 07: 00 2018,expiration = 0, durable = true, address = jms.queue.deviceCachedAttrs, properties = TypedProperties [__ AMQ_CID = f627a880-c079-11e8-9fb7-775c9c2bdfeb, _AMQ_DUPL_ID = 1]] @ 1895765804] atm..protocol.core.impl.ChannelImpl.sendBlocking (ChannelImpl.java:406) в org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.sendBlocking (ChannelImpl.java:304) в org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext.xaPrepare (ActiveMQSessionContext.java:457) в org.apache.activemq.artemis.core.client.impl.ClientSessionImpl.prepare (ClientSessionImpl.java:1018 *

Что я делаю не так?И могу ли я @ Inject моего производителя в сервис или мне нужно создать новый объект?Thnx!

1 Ответ

0 голосов
/ 26 сентября 2018

Из того, что я могу сказать, ты не сделал ничего плохого.Исходя из того, что вы сказали, брокер должен отклонять дубликаты сообщений.Вы установили дубликат ID в сообщении, и посредник обнаружил дубликат сообщения, который он отклонил.Обратите внимание, что исключение говорит: «Обнаружено повторяющееся сообщение - сообщение не будет перенаправлено».

...