Хранить данные Кафки в формате hdf как паркетный формат, используя flink, я пытаюсь с документацией fink, которая не работает.
Я не нахожу никаких надлежащих документов для сохранения их в виде файла паркета
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(100);
final List<Datum> data = Arrays.asList(new Datum("a", 1), new Datum("b", 2), new Datum("c", 3));
DataStream<Datum> stream = env.addSource(new FiniteTestSource<>(data), TypeInformation.of(Datum.class));
stream.addSink(
StreamingFileSink.forBulkFormat(
Path.fromLocalFile(new File("path")),
ParquetAvroWriters.forReflectRecord(String.class))
.build());
env.execute();
Я создал сериализуемый класс
public static class Datum implements Serializable {
public String a;
public int b;
public Datum() {
}
public Datum(String a, int b) {
this.a = a;
this.b = b;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Datum datum = (Datum) o;
return b == datum.b && (a != null ? a.equals(datum.a) : datum.a == null);
}
@Override
public int hashCode() {
int result = a != null ? a.hashCode() : 0;
result = 31 * result + b;
return result;
}
}
. Код выше не записывает данные в файл, просто продолжайте создавать много файлов
Если кто-то может помочь с правильной документацией или кодом