Как я могу записать avro файлы в S3 во Flink? - PullRequest
0 голосов
/ 11 июля 2019

Я хочу прочитать потоковые данные из тем Кафки и записать в S3 в формате avro или parquet. Поток данных выглядит как строка json, но я не могу преобразовать и записать в S3 в формате avro или parquet.

Я нашел несколько фрагментов кода и попробовал

val sink = StreamingFileSink .forBulkFormat (новый путь (outputS3Path), ParquetAvroWriters.forReflectRecord (classOf [myClass])) .build ()

Но я получил «Несоответствие типов, ожидаемая SinkFunction [String], фактическая: StreamingFileSink [TextOut]» в addSink

val stream = env .addSource (myConsumerSource) .addSink (раковина)

Пожалуйста, помогите, спасибо!

1 Ответ

0 голосов
/ 15 июля 2019

Обходное решение: вы можете использовать AWS Kinesis Firehose после того, как ваш базовый этл конвертирует таблицу SQL Query Flink в String и пишет в Kinesis из AWS Console, а затем пишет в S3 как паркет.

https://github.com/kali786516/FlinkStreamAndSql/blob/master/src/main/scala/com/aws/examples/kinesis/producer/TransactionExample/TransactionProducer.scala

https://github.com/kali786516/FlinkStreamAndSql/blob/master/src/main/scala/com/aws/examples/kinesis/producer/TransactionExample/TransactionProducer.scala

Кафка Пример: - https://github.com/kali786516/FlinkStreamAndSql/tree/master/src/main/scala/com/aws/examples/kafka

enter image description here

...