Я добавил Flink Hadoop Compatibility в проект, который читает файл последовательности из пути hdfs,
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.5.6</version>
</dependency>
Вот фрагмент кода Java,
DataSource<Tuple2<NullWritable, BytesWritable>> input = env.createInput(HadoopInputs.readHadoopFile(
new org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat<NullWritable, BytesWritable>(),
NullWritable.class, BytesWritable.class, path));
Это работает довольно хорошо, когдаЯ запускаю его в Eclipse, но когда я отправляю его через командную строку 'flink run ...', он жалуется,
The type returned by the input format could not be automatically determined. Please specify the TypeInformation of the produced type explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead.
OK, поэтому я обновляю свой код, добавляя информацию о типе,
DataSource<Tuple2<NullWritable, BytesWritable>> input = env.createInput(HadoopInputs.readHadoopFile(
new org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat<NullWritable, BytesWritable>(),
NullWritable.class, BytesWritable.class, path),
TypeInformation.of(new TypeHint<Tuple2<NullWritable, BytesWritable>>() {}));
Теперь он жалуется,
Caused by: java.lang.RuntimeException: Could not load the TypeInformation for the class 'org.apache.hadoop.io.Writable'. You may be missing the 'flink-hadoop-compatibility' dependency.
Некоторые люди предлагают скопировать flink-hadoop-compatibility_2.11-1.5.6.jar в FLINK_HOME / lib, но это не помогает, но все жета же ошибка.
У кого-нибудь есть подсказки?
My Flink - это автономная установка, версия 1.5.6.
ОБНОВЛЕНИЕ:
Извините, я скопировал flink-hadoop-compatibility_2.11-1.5.6.jar в неправильное место, после исправления этого он работает.
Теперь мой вопрос, есть ли другой способ пойти?Потому что копирование этого jar-файла в FLINK_HOME / lib определенно не очень хорошая идея для меня, особенно когда речь идет о большом кластере flink.