Хранить данные кафки в формате hdfs в формате паркета, используя flink? - PullRequest
2 голосов
/ 02 марта 2020

Хранить данные Кафки в формате hdf как паркетный формат, используя flink, я пытаюсь с документацией fink, которая не работает.

Я не нахожу никаких надлежащих документов для сохранения их в виде файла паркета

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(100);

        final List<Datum> data = Arrays.asList(new Datum("a", 1), new Datum("b", 2), new Datum("c", 3));

        DataStream<Datum> stream = env.addSource(new FiniteTestSource<>(data), TypeInformation.of(Datum.class));


        stream.addSink(
                    StreamingFileSink.forBulkFormat(
                            Path.fromLocalFile(new File("path")),
                            ParquetAvroWriters.forReflectRecord(String.class))
                            .build());

            env.execute();

Я создал сериализуемый класс

public static class Datum implements Serializable {

        public String a;
        public int b;

        public Datum() {
        }

        public Datum(String a, int b) {
            this.a = a;
            this.b = b;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }

            Datum datum = (Datum) o;
            return b == datum.b && (a != null ? a.equals(datum.a) : datum.a == null);
        }

        @Override
        public int hashCode() {
            int result = a != null ? a.hashCode() : 0;
            result = 31 * result + b;
            return result;
        }
    }

. Код выше не записывает данные в файл, просто продолжайте создавать много файлов

Если кто-то может помочь с правильной документацией или кодом

1 Ответ

2 голосов
/ 02 марта 2020

Как написано на documentation of StreamingFileSink:

ВАЖНО: При использовании StreamingFileSink необходимо включить контрольную точку. Файлы деталей могут быть завершены только на успешных контрольных точках. Если контрольная точка отключена, файлы деталей навсегда останутся в состоянии in-progress или pending и не могут быть безопасно прочитаны последующими системами.

Чтобы включить, просто используйте

env.enableCheckpointing(1000);

У вас есть несколько вариантов настройки it.


Вот полный пример

    final List<Address> data = Arrays.asList(
            new Address(1, "a", "b", "c", "12345"),
            new Address(2, "p", "q", "r", "12345"),
            new Address(3, "x", "y", "z", "12345")
    );

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.enableCheckpointing(100);

    DataStream<Address> stream = env.addSource(
            new FiniteTestSource<>(data), TypeInformation.of(Address.class));

    stream.addSink(
            StreamingFileSink.forBulkFormat(
                    Path.fromLocalFile(folder),
                    ParquetAvroWriters.forSpecificRecord(Address.class))
            .build());

    env.execute();
...