Apache Beam: возможно ли объединить сообщения RabbitMQ с ключом обмена и маршрутизации - PullRequest
0 голосов
/ 26 декабря 2018

Я определил конвейер в Apache Beam для приема сообщений данной очереди в брокере сообщений RabbitMQ.

Я определил ключ обмена и маршрутизации в RabbitMQ.

Я использовал AmqpIO.read () в Beam (версия 2.9.0), но я не нашел API для установки обмена иключ маршрутизации.

(После этого документа: https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/amqp/AmqpIO.html)

Есть ли возможность сделать это? Даже с любым другим плагином.

С уважением, Али

1 Ответ

0 голосов
/ 05 февраля 2019

Существует новый (экспериментальный) IO-разъем для RabbitMQ, поставляемый с последней версией Apache Beam v2.9.0 .Разъем AMQP не будет работать для RabbitMQ.

Если вы используете Maven, добавьте в POM следующую зависимость

<!-- Beam MongoDB I/O -->
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-mongodb</artifactId>
    <version>2.9.0</version>
</dependency>

, и вы можете использовать ее в конвейере, например

public class RabbitMQPipeline {

final static Logger log = LoggerFactory.getLogger(RabbitMQPipeline.class);

/**
 * Mongo Pipeline options.
 */
public interface RabbitMQPipelineOptions extends PipelineOptions {

    @Description("Path of the file to read from")
    @Default.String("amqp://localhost")
    @Required
    String getUri();

    void setUri(String uri);

}

/**
 * @param args
 */
public static void main(String[] args) {

    RabbitMQPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(RabbitMQPipelineOptions.class);

    Pipeline pipeline = Pipeline.create(options);

    PCollection<RabbitMqMessage> messages = pipeline
            .apply(RabbitMqIO2.read().withUri(options.getUri()).withQueue("test"));

    messages.apply(ParDo.of(new DoFn<RabbitMqMessage, String>() {
        @ProcessElement
        public void process(@Element RabbitMqMessage msg) {
            System.out.println(msg.toString());
        }
    }));

    pipeline.run().waitUntilFinish();
}

}

RabbitMqIO Javadoc содержит примеры использования читателя и писателя.

Слово предостережения

Существует известная ошибка , которая была исправлена, но запланирована к выпуску в v2.11.0, которая блокирует работу коннектора даже в самых простых сценариях.Исправление действительно простое (см. Проблему JIRA), но вам нужно будет пересобрать новую версию класса.Если вы хотите попробовать, убедитесь, что вы добавили следующую зависимость Maven

<dependency>
    <groupId>com.google.auto.value</groupId>
    <artifactId>auto-value</artifactId>
    <version>1.5.2</version>
    <scope>provided</scope>
</dependency>

и добавили следующую конфигурацию в плагин Maven Compiler

<plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.6.1</version>
        <configuration>
            <source>1.8</source>
            <target>1.8</target>
            <annotationProcessors>
        <annotationProcessor>com.google.auto.value.processor.AutoValueProcessor</annotationProcessor>
            </annotationProcessors>
        </configuration>
    </plugin>

Если вы используете Eclipse, сделайтеОбязательно установите плагин Maven m2-apt .Удачи!

...