Я пытаюсь отправить потоковые данные, прочитанные из Kafka, принять и преобразовать, а затем записать их в HDFS в Avro. В настоящее время он работает, если я задаю выходной путь для своего локального каталога для AvroIO, но не распознает каталог hdfs.
В настоящее время я запускаю Flink отдельно от Hadoop, но они оба запускаются локально. Hadoop находится в режиме псевдораспределения, поэтому единственный путь, которым он может быть, - это hdfs: // localhost: 9000 / (если я не ошибаюсь в этом). Flink запускается просто с помощью start-cluster.sh.
Я видел предложения людей об использовании FileSystems.matchNewResource (), но, похоже, это устаревшее решение, поскольку AvroIO сам проверяет, запускает ли эту строку самостоятельно из метода .to (), и при этом происходит сбой. точная строка (где выполняется .to ()) со словами «java.lang.IllegalArgumentException: не найдена файловая система для схемы hdfs».
Я попытался прочитать исходный код для AvroIO, и он говорит, что эта ошибкапроисходит, когда указан неверный путь к FileSystems.matchNewResourc () (который запускается методом .to () из AvroIO). Однако, когда я перечисляю файлы из hdfs: // localhost: 9000 /, он дает мне каталог, который я проверяю для выходного каталога, с которым работает Beam.
Здесь показано, что перечислено в разделе hdfs. при доступе через команду hdfs dfs:
$ bin/hdfs dfs -ls hdfs://localhost:9000/cereal/
Found 1 items
drwxr-xr-x - root supergroup 0 2019-10-24 11:06 hdfs://localhost:9000/cereal/data
Мой выходной каталог - "hdfs: // localhost: 9000 / cereal / data".
Вот сообщение об ошибке при запуске конвейера:
[WARNING]
java.lang.IllegalArgumentException: No filesystem found for scheme hdfs
at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal (FileSystems.java:463)
at org.apache.beam.sdk.io.FileSystems.matchNewResource (FileSystems.java:533)
at jj.flinkbeam.SensorData.main (SensorData.java:216)
at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:748)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 12.511 s
[INFO] Finished at: 2019-10-24T17:00:13-04:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project jj-flinkbeam: An exception occured while executing the Java class. No filesystem found for scheme hdfs -> [Help 1]
А вот (SensorData.java:216):
ResourceId outdir = FileSystems.matchNewResource(options.getOutput(), true);
Я изолировал его от AvroIO.write (), чтобы убедиться, что это FileSystems.matchNewResource (), который выбрасываетОшибка. Кроме того, даже если я записываю строку каталога в AvroIO.write (). To () напрямую, это выдает мне ту же ошибку. Если я изменю выходной каталог на «/ tmp /», я смогу без проблем найти файлы Avro, генерируемые из Beam.
Пример, на который я ссылался, можно найти здесь: https://github.com/gbif/beam-perf/blob/master/avro-to-avro/src/main/java/org/gbif/beam/perf/avrotoavro/BeamAvroIOTest.java
Кажется, этот пример не требует ничего сумасшедшего.
Вот также исходный код для FileSystems.matchNewResource (): https://github.com/apache/beam/blob/f67e3d8acef65dea6130e21b95c5991d4f23faaa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L529
Буду очень признателен, если кто-нибудь сможет помочь. Заранее спасибо.