Читать RabbitMQ из Beam / DataFlow - PullRequest
0 голосов
/ 04 января 2019

Я пытаюсь запустить очередь RabbitMQ из луча / потока данных в потоковом режиме (чтобы он продолжал работать бесконечно.)

Минимальный пример кода, который я пытаюсь запустить:

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.rabbitmq.RabbitMqIO;
import org.apache.beam.sdk.io.rabbitmq.RabbitMqMessage;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

public class RabbitMqTest {

    public static void main(String[] args) {
        Pipeline pipeline = Pipeline.create();
        final String serverUri = "amqp://guest:guest@localhost:5672";

        pipeline
                .apply("Read RabbitMQ message", RabbitMqIO.read().withUri(serverUri).withQueue("my_queue"))
                .apply(ParDo.of(new DoFn<RabbitMqMessage, String>() {

                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        String message = new String(c.element().getBody());
                        System.out.println();
                        c.output(message);
                    }
                }));
        pipeline.run().waitUntilFinish();
    }
}

Однако происходит сбой:

Exception in thread "main" java.lang.NullPointerException
at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:169)
at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160)
at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Если я не передам withMaxReadTime() RabbitMqIO. Если я передаю withMaxReadTime(), он блокируется на X секунд, затем обрабатывает все сообщения, поступившие в течение этого времени, а затем завершает работу.

Как настроить потоковый поток, который продолжает работать из RabbitMQ в течение неопределенного времени?

Ответы [ 2 ]

0 голосов
/ 15 марта 2019

Это ошибка в Io, недавно исправленная .

0 голосов
/ 10 января 2019

У меня была похожая проблема с конвейером потока данных. При попытке запустить его в потоке данных я получил:

java.lang.NullPointerException
            org.apache.beam.runners.dataflow.worker.WindmillTimeUtils.harnessToWindmillTimestamp(WindmillTimeUtils.java:58)
            org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:400)
            org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1230)
            org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
            org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
            java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            java.lang.Thread.run(Thread.java:745)

Они проблема была RabbitMqIO использует метку времени из сообщений, поступающих из RabbitMq, например, для водяных знаков. Оказалось, что в сообщениях от RabbitMq в моем случае не была установлена ​​временная метка (она не установлена ​​по умолчанию в RabbitMq), и она была нулевой. Я исправил это, подготовив патч для занятий в Apache Beam. Я сделал изменение в конструкторе RabbitMqMessage. Теперь это выглядит так:

public RabbitMqMessage(String routingKey, QueueingConsumer.Delivery delivery) {
    this.routingKey = routingKey;
    body = delivery.getBody();
    contentType = delivery.getProperties().getContentType();
    contentEncoding = delivery.getProperties().getContentEncoding();
    headers = delivery.getProperties().getHeaders();
    deliveryMode = delivery.getProperties().getDeliveryMode();
    priority = delivery.getProperties().getPriority();
    correlationId = delivery.getProperties().getCorrelationId();
    replyTo = delivery.getProperties().getReplyTo();
    expiration = delivery.getProperties().getExpiration();
    messageId = delivery.getProperties().getMessageId();
    /*
      *** IMPORTANT ***
      Sometimes timestamp in RabbitMq message properties is 'null'. `RabbitMqIO` uses that value as
      watermark, when it is `null` it causes exceptions, 'null' has to be replaced with some value in this case current time
     */
    // timestamp = delivery.getProperties().getTimestamp();
    timestamp = delivery.getProperties().getTimestamp() == null ? new Date() : delivery.getProperties().getTimestamp();
    type = delivery.getProperties().getType();
    userId = delivery.getProperties().getUserId();
    appId = delivery.getProperties().getAppId();
    clusterId = delivery.getProperties().getClusterId();
}

и мне пришлось изменить метод advance() в RabbitMqIO, чтобы не использовать свойство timestamp, которое может быть нулевым:

@Override
    public boolean advance() throws IOException {
        try {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery(1000);
            if (delivery == null) {
                return false;
            }
            if (source.spec.useCorrelationId()) {
                String correlationId = delivery.getProperties().getCorrelationId();
                if (correlationId == null) {
                    throw new IOException(
                            "RabbitMqIO.Read uses message correlation ID, but received "
                                    + "message has a null correlation ID");
                }
                currentRecordId = correlationId.getBytes(StandardCharsets.UTF_8);
            }
            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
            checkpointMark.sessionIds.add(deliveryTag);

            current = new RabbitMqMessage(source.spec.routingKey(), delivery);
            /*
              *** IMPORTANT ***
              Sometimes timestamp in RabbitMq messages is 'null' stream in Dataflow fails because
              watermark is based on that value, 'null' has to be replaced with some value. `RabbitMqMessage` was changed
              to use `new Date()` in this situation and now timestamp can be taken from it
             */
            //currentTimestamp = new Instant(delivery.getProperties().getTimestamp());
            currentTimestamp = new Instant(current.getTimestamp());
            if (currentTimestamp.isBefore(checkpointMark.oldestTimestamp)) {
                checkpointMark.oldestTimestamp = currentTimestamp;
            }
        } catch (Exception e) {
            throw new IOException(e);
        }
        return true;
    }

После запуска моего конвейера я снова получил это исключение в другом месте. На этот раз это было вызвано не установленным значением по умолчанию для свойства oldestTimestamp в RabbitMQCheckpointMark. Я сделал следующее изменение, и теперь RabbitMQCheckpointMark выглядит так:

private static class RabbitMQCheckpointMark
        implements UnboundedSource.CheckpointMark, Serializable {
    transient Channel channel;
    /*
      *** IMPORTANT *** it should be initialized with some value because without it runner (e.g Dataflow) fails with 'NullPointerException'
      Example error:
        java.lang.NullPointerException
            org.apache.beam.runners.dataflow.worker.WindmillTimeUtils.harnessToWindmillTimestamp(WindmillTimeUtils.java:58)
            org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:400)
            org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1230)
            org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
            org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
            java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            java.lang.Thread.run(Thread.java:745)
     */
    Instant oldestTimestamp = new Instant(Long.MIN_VALUE);
    final List<Long> sessionIds = new ArrayList<>();

    @Override
    public void finalizeCheckpoint() throws IOException {
        for (Long sessionId : sessionIds) {
            channel.basicAck(sessionId, false);
        }
        channel.txCommit();
        oldestTimestamp = Instant.now();
        sessionIds.clear();
    }
}

Все эти изменения исправили мой конвейер, и теперь он работает как положено. Я надеюсь, что вы найдете это полезным.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...