Apache Beam работает на Flink для отправки Avro в HDFS - PullRequest
0 голосов
/ 25 октября 2019

Я пытаюсь отправить потоковые данные, прочитанные из Kafka, принять и преобразовать, а затем записать их в HDFS в Avro. В настоящее время он работает, если я задаю выходной путь для своего локального каталога для AvroIO, но не распознает каталог hdfs.

В настоящее время я запускаю Flink отдельно от Hadoop, но они оба запускаются локально. Hadoop находится в режиме псевдораспределения, поэтому единственный путь, которым он может быть, - это hdfs: // localhost: 9000 / (если я не ошибаюсь в этом). Flink запускается просто с помощью start-cluster.sh.

Я видел предложения людей об использовании FileSystems.matchNewResource (), но, похоже, это устаревшее решение, поскольку AvroIO сам проверяет, запускает ли эту строку самостоятельно из метода .to (), и при этом происходит сбой. точная строка (где выполняется .to ()) со словами «java.lang.IllegalArgumentException: не найдена файловая система для схемы hdfs».

Я попытался прочитать исходный код для AvroIO, и он говорит, что эта ошибкапроисходит, когда указан неверный путь к FileSystems.matchNewResourc () (который запускается методом .to () из AvroIO). Однако, когда я перечисляю файлы из hdfs: // localhost: 9000 /, он дает мне каталог, который я проверяю для выходного каталога, с которым работает Beam.

Здесь показано, что перечислено в разделе hdfs. при доступе через команду hdfs dfs:

$ bin/hdfs dfs -ls hdfs://localhost:9000/cereal/
Found 1 items
drwxr-xr-x   - root supergroup          0 2019-10-24 11:06 hdfs://localhost:9000/cereal/data

Мой выходной каталог - "hdfs: // localhost: 9000 / cereal / data".

Вот сообщение об ошибке при запуске конвейера:

[WARNING] 
java.lang.IllegalArgumentException: No filesystem found for scheme hdfs
    at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal (FileSystems.java:463)
    at org.apache.beam.sdk.io.FileSystems.matchNewResource (FileSystems.java:533)
    at jj.flinkbeam.SensorData.main (SensorData.java:216)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:748)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  12.511 s
[INFO] Finished at: 2019-10-24T17:00:13-04:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project jj-flinkbeam: An exception occured while executing the Java class. No filesystem found for scheme hdfs -> [Help 1]

А вот (SensorData.java:216):

ResourceId outdir = FileSystems.matchNewResource(options.getOutput(), true);

Я изолировал его от AvroIO.write (), чтобы убедиться, что это FileSystems.matchNewResource (), который выбрасываетОшибка. Кроме того, даже если я записываю строку каталога в AvroIO.write (). To () напрямую, это выдает мне ту же ошибку. Если я изменю выходной каталог на «/ tmp /», я смогу без проблем найти файлы Avro, генерируемые из Beam.

Пример, на который я ссылался, можно найти здесь: https://github.com/gbif/beam-perf/blob/master/avro-to-avro/src/main/java/org/gbif/beam/perf/avrotoavro/BeamAvroIOTest.java

Кажется, этот пример не требует ничего сумасшедшего.

Вот также исходный код для FileSystems.matchNewResource (): https://github.com/apache/beam/blob/f67e3d8acef65dea6130e21b95c5991d4f23faaa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L529

Буду очень признателен, если кто-нибудь сможет помочь. Заранее спасибо.

...