Flink задание с AMQSource не генерирует вывод - PullRequest
0 голосов
/ 28 февраля 2019

Я использовал соединитель AMQSource Apache Bahir, который слушает ActiveMQ, но когда я запускаю задание Flink для получения данных из ActiveMQ, не генерируется никакого вывода.

Например, соединитель прослушивает ActiveMQ, который содержит 4сообщений, но когда я запускаю задание Flink, данные не расходуются.

val brokerURL = "tcp://localhost:61616"
val destinationName = "TEST.FOO"
val filePath = "C:\\output" + System.currentTimeMillis + ".csv"

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new MemoryStateBackend(1000, false))


val config = new AMQSourceConfig.AMQSourceConfigBuilder[String]()
  .setConnectionFactory(new ActiveMQConnectionFactory(brokerURL))
  .setDestinationName(destinationName)
  .setDeserializationSchema(new SimpleStringSchema)
  .setDestinationType(DestinationType.QUEUE)
  .setRunningChecker(new RunningChecker).build
val amqSource = new AMQSource[String](config)

val stream = env.addSource(amqSource)

stream.map(/*Some MapFunction*/)

stream.writeAsText(filePath)

stream.print

env.execute

1 Ответ

0 голосов
/ 04 марта 2019

AMQSource ожидает сообщение в виде байтов, см. Код из метода run в AMQSource.class:

Message message = this.consumer.receive(1000L);
if (!(**message instanceof BytesMessage**)) {
LOG.warn("Active MQ source received non bytes message: {}", message);
return;
}

При создании данных в ActiveMQ вместо текстового сообщения:

val message = session.createTextMessage(text)

Использовать сообщение байтов:

val message = session.createBytesMessage()
message.writeBytes(text.getBytes)
...