Я играл с Beam SQL DSL и не могу использовать выходные данные запроса, не предоставив кодер, который знает схему вывода вручную. Могу ли я вывести схему вывода вместо ее жесткого кодирования?
Ни в пошаговом руководстве , ни в примерах фактически не используются выходные данные запроса. Я использую Scio вместо простого Java API, чтобы сделать код относительно читабельным и лаконичным, я не думаю, что это имеет значение для этого вопроса.
Вот пример того, что я имею в виду.
С учетом входной схемы inSchema
и некоторого источника данных, который отображается на Row
следующим образом: (в этом примере на основе Avro, но, опять же, я не думаю, что это имеет значение):
sc.avroFile[Foo](args("input"))
.map(fooToRow)
.setCoder(inSchema.getRowCoder)
.applyTransform(SqlTransform.query("SELECT COUNT(1) FROM PCOLLECTION"))
.saveAsTextFile(args("output"))
Запуск этого конвейера приводит к KryoException
следующим образом:
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
fieldIndices (org.apache.beam.sdk.schemas.Schema)
schema (org.apache.beam.sdk.values.RowWithStorage)
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Тем не менее, вставка RowCoder
, совпадающая с выводом SQL, в данном случае один столбец с числом int:
...snip...
.applyTransform(SqlTransform.query("SELECT COUNT(1) FROM PCOLLECTION"))
.setCoder(Schema.builder().addInt64Field("count").build().getRowCoder)
.saveAsTextFile(args("output"))
Теперь конвейер работает нормально.
Необходимость вручную указывать конвейеру, как кодировать выходные данные SQL, кажется ненужной, учитывая, что мы указываем входную схему / кодер (ы) и запрос. Мне кажется, что мы должны быть в состоянии вывести выходную схему из этого - но я не могу понять, как, кроме, возможно, прямого использования Calcite?
Прежде чем поднять билет на Beam Jira, я решил проверить, не упустил ли я что-нибудь очевидное!