Я пытаюсь следовать руководству по StreamingFileSink из Stream Processing с Apache Flink.
В книгах упоминается, что я могу указать свой объект AvroPOJO и сохранить свой поток в файл паркета со схемой avro на моем локальном сервере.
Это выглядит так:
val sink: StreamingFileSink[String] = StreamingFileSink
.forBulkFormat(
new Path("/base/path"),
ParquetAvroWriters.forSpecificRecord(classOf[AvroPojo]))
.build()
После того, как я добавил (протестировал и 1.9, и 1.8, и в 1.9, я использовал flink-parquet_2.11)
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>${flink.version}</version>
</dependency>
Затем я могу успешно использовать ParquetAvroWriter, но скомпилировать с отсутствующей ошибкой ParquetAvroWriter.
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/formats/parquet/avro/ParquetAvroWriters
Итак, я должен добавить parquet-avro зависимость вручную.
Тогда мне просто дают:
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration
Обновление:
pom.xml
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.compat}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.compat}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.compat}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.7.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.7.0</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.7</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_${scala.compat}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.pulsar/pulsar-common -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-common</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.6.1</version>
</dependency>
Вопрос 1. Как мне скомпилировать этот кода без конфига hadoop? Или любой другой способ сохранить данные в моем локальном фс с паркетом и авро?