Преобразование файла CSV в Java объектов (POJO) и отправка его в очередь ActiveMQ - PullRequest
0 голосов
/ 26 марта 2020

Моя цель - прочитать файл csv, преобразовать его в Java Objects (POJO) и отправить Java Objects один за другим в очередь ActiveMQ. Ниже приведен код:

public void configure() throws Exception {
    from("file:src/main/resources?fileName=data.csv")               
    .unmarshal(bindy)
    .split(body())
    .to("file:src/main/resources/?fileName=equityfeeds.txt")
    .split().tokenize(",").streaming().to("jms:queue:javaobjects.upstream.queue");          
}

Проблемы: 1.При выполнении кода ни один файл (equityfeeds.txt) не создается, и никакие объекты не попадают в очередь. Что не так? Мне не нужно делать никакой обработки прямо сейчас. Мне просто нужно разархивировать csv в POJO и отправить Java Objects один за другим в очередь ActiveMQ.

EquityFeeds (POJO)

@CsvRecord(separator = ",",skipFirstLine = true)
public class EquityFeeds {

    @DataField(pos = 1) 
    private String externalTransactionId;

    @DataField(pos = 2)
    private String clientId;

    @DataField(pos = 3)
    private String securityId;

    @DataField(pos = 4)
    private String transactionType;

    @DataField(pos = 5, pattern = "dd/MM/YY")
    private Date transactionDate;

    @DataField(pos = 6)
    private float marketValue; 

    @DataField(pos = 7)
    private String priorityFlag;

Пожалуйста, помогите. Пожалуйста, скажите мне, где я иду не так.

@ pvpkiran: Ниже мой верблюжий код для производителя:

public void configure() throws Exception {
            from("file:src/main/resources?fileName=data.csv")               
                .unmarshal(bindy)
                .split(body())
                .streaming().to("jms:queue:javaobjects.upstream.queue");
}

Ниже мой потребительский код (с использованием JMS API):

@JmsListener(destination = "javaobjects.upstream.queue")
public void javaObjectsListener(final Message objectMessage) throws JMSException {
        Object messageData = null;
        if(objectMessage instanceof ObjectMessage) {
            ObjectMessage objMessage = (ObjectMessage) objectMessage;
            messageData = objMessage.getObject();
        }
        System.out.println("Object: "+messageData.toString());
    }

Я не использую Верблюд за употребление JMSMessage. В потребителя я использую JMS API для потребления сообщения. Также я не тестирую код. Сообщения поступили в ActiveMQ, и я использую JMS API (как указано выше) для получения сообщения. В терминале в получаю NullPointerException. Кроме того, в ActiveMQ.DLQ было отправлено 2 сообщения с сообщением об ошибке, приведенным ниже:

java .lang.Throwable: Delivery [7] превышает лимит политики повторной доставки: RedeliveryPolicy {destination = null, collisionAvoidanceFactor = 0,15, MaximumRedeliveries = 6, MaximumRedeliveryDelay = -1, initialRedeliveryDelay = 1000, useCollisionAvoidance = false, useExponentialBackOff = false, backOffMultiplier = 5.0, redeliveryDelay = 1000, preDisll * true = 10}: 026} 1026

1 Ответ

0 голосов
/ 26 марта 2020

Попробуй это. Это должно работать

from("file:src/main/resources?fileName=equityfeeds.csv")
                    .unmarshal(new BindyCsvDataFormat(EquityFeeds.class))
                    .split(body())
                    .streaming().to("jms:queue:javaobjects.upstream.queue");
// This route is for Testing
from("jms:queue:javaobjects.upstream.queue").to("bean:camelBeanComponent?method=processRoute"); 

И написать компонент потребительского компонента

@Component
public class CamelBeanComponent {
    public void processRoute(Exchange exchange) {
        System.out.println(exchange.getIn().getBody());
    }
}

Это напечатано (вам нужно добавить toString(), если вам нужен вывод, подобный этому)

EquityFeeds(externalTransactionId=SAPEXTXN1, clientId=GS, securityId=ICICI, transactionType=BUY, transactionDate=Sun Dec 30 00:00:00 CET 2012, marketValue=101.9, priorityFlag=Y)
EquityFeeds(externalTransactionId=SAPEXTXN2, clientId=AS, securityId=REL, transactionType=SELL, transactionDate=Sun Dec 30 00:00:00 CET 2012, marketValue=121.9, priorityFlag=N)

Если вы используете .split().tokenize(","), то каждое поле в каждой строке (не полная строка) преобразуется в объект EquityFeeds (с другими полями, равными нулю), отправляется как сообщение в очередь

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...