AvroInputFormat возвращает набор адресов объектов вместо значений - PullRequest
0 голосов
/ 23 февраля 2020

Я записываю некоторые данные, используя flink AvroOutputFormat,

val source: DataSet[Row] = environment.createInput(inputBuilder.finish)
val tableEnv: BatchTableEnvironment = new BatchTableEnvironment(environment, TableConfig.DEFAULT)
val table: Table = source.toTable(tableEnv)
val avroOutputFormat = new AvroOutputFormat[Row](classOf[Row])
avroOutputFormat.setCodec(AvroOutputFormat.Codec.NULL)
source.write(avroOutputFormat, "/Users/x/Documents/test_1.avro").setParallelism(1)
environment.execute()

Это записывает данные в файл с именем test_1.avro. Когда я пытался прочитать файл как,

val users = new AvroInputFormat[Row](new Path("/Users/x/Documents/test_1.avro"), classOf[Row])
val usersDS = environment.createInput(users)
usersDS.print()

Это печатает строку как,

java.lang.Object@4462efe1,java.lang.Object@7c3e4b1a,java.lang.Object@2db4ad1,java.lang.Object@765d55d5,java.lang.Object@2513a118,java.lang.Object@2bfb583b,java.lang.Object@73ae0257,java.lang.Object@6fc1020a,java.lang.Object@5762658b

Есть ли способ напечатать значения этих данных вместо адресов объектов.

1 Ответ

1 голос
/ 24 февраля 2020

Вы смешиваете Table API и Datastream API странным образом. Было бы лучше придерживаться одного API или использовать надлежащие методы преобразования .

Как вы в принципе не позволяете Flink знать ожидаемую схему ввода / вывода. classOf[Row] - это все и ничего.

Чтобы записать таблицу в файл Avro, используйте соединитель таблицы . Редактирование Basi c sketch

tableEnv.connect(new FileSystem("/path/to/file"))
    .withFormat(new Avro().avroSchema("...")) // <- Adjust
    .withSchema(schema)
    .createTemporaryTable("AvroSinkTable")
table.insertInto("AvroSinkTable")

: на данный момент коннектор файловой системы, к сожалению, не поддерживает Avro.

Поэтому нет другого выбора, кроме как использовать API набора данных. Я рекомендую использовать avrohugger для создания соответствующего scala класса для вашей схемы avro.

// convert to your scala class
val dsTuple: DataSet[User] = tableEnv.toDataSet[User](table)
// write out
val avroOutputFormat = new AvroOutputFormat<>(User.class)
avroOutputFormat.setCodec(Codec.SNAPPY)
avroOutputFormat.setSchema(User.SCHEMA$)
specificUser.write(avroOutputFormat, outputPath1)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...