Я хочу прочитать потоковые данные из тем Кафки и записать в S3 в формате avro или parquet. Поток данных выглядит как строка json, но я не могу преобразовать и записать в S3 в формате avro или parquet.
Я нашел несколько фрагментов кода и попробовал
val sink = StreamingFileSink
.forBulkFormat (новый путь (outputS3Path), ParquetAvroWriters.forReflectRecord (classOf [myClass]))
.build ()
Но я получил «Несоответствие типов, ожидаемая SinkFunction [String], фактическая: StreamingFileSink [TextOut]» в addSink
val stream = env
.addSource (myConsumerSource)
.addSink (раковина)
Пожалуйста, помогите, спасибо!