Существует новый (экспериментальный) IO-разъем для RabbitMQ, поставляемый с последней версией Apache Beam v2.9.0 .Разъем AMQP не будет работать для RabbitMQ.
Если вы используете Maven, добавьте в POM следующую зависимость
<!-- Beam MongoDB I/O -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-mongodb</artifactId>
<version>2.9.0</version>
</dependency>
, и вы можете использовать ее в конвейере, например
public class RabbitMQPipeline {
final static Logger log = LoggerFactory.getLogger(RabbitMQPipeline.class);
/**
* Mongo Pipeline options.
*/
public interface RabbitMQPipelineOptions extends PipelineOptions {
@Description("Path of the file to read from")
@Default.String("amqp://localhost")
@Required
String getUri();
void setUri(String uri);
}
/**
* @param args
*/
public static void main(String[] args) {
RabbitMQPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(RabbitMQPipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
PCollection<RabbitMqMessage> messages = pipeline
.apply(RabbitMqIO2.read().withUri(options.getUri()).withQueue("test"));
messages.apply(ParDo.of(new DoFn<RabbitMqMessage, String>() {
@ProcessElement
public void process(@Element RabbitMqMessage msg) {
System.out.println(msg.toString());
}
}));
pipeline.run().waitUntilFinish();
}
}
RabbitMqIO Javadoc содержит примеры использования читателя и писателя.
Слово предостережения
Существует известная ошибка , которая была исправлена, но запланирована к выпуску в v2.11.0, которая блокирует работу коннектора даже в самых простых сценариях.Исправление действительно простое (см. Проблему JIRA), но вам нужно будет пересобрать новую версию класса.Если вы хотите попробовать, убедитесь, что вы добавили следующую зависимость Maven
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
<version>1.5.2</version>
<scope>provided</scope>
</dependency>
и добавили следующую конфигурацию в плагин Maven Compiler
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<annotationProcessors>
<annotationProcessor>com.google.auto.value.processor.AutoValueProcessor</annotationProcessor>
</annotationProcessors>
</configuration>
</plugin>
Если вы используете Eclipse, сделайтеОбязательно установите плагин Maven m2-apt .Удачи!