Как добавить облако Google Pubsub в качестве источника в оболочке Beam SQL? - PullRequest
1 голос
/ 16 мая 2019

Я пробую BeamSQL в shell и хочу проверить, как работают неограниченные источники с точки зрения удобства использования и производительности. Читая документацию по здесь , я создал внешнюю таблицу следующим образом -

CREATE EXTERNAL TABLE pubsub_table (event_timestamp TIMESTAMP, attributes MAP<VARCHAR, VARCHAR>, payload ROW<id INTEGER, location VARCHAR>)
TYPE pubsub
LOCATION 'projects/test-project/topics/test-topic';

Теперь, когда я пытаюсь запросить эту таблицу следующим образом -

SELECT * FROM pubsub_table LIMIT 1;

Я получаю следующую ошибку-

java.lang.NoClassDefFoundError: org/apache/beam/sdk/io/gcp/pubsub/PubsubIO$Read
    at org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubJsonTableProvider.buildBeamSqlTable(PubsubJsonTableProvider.java:61)
    at org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore.buildBeamSqlTable(InMemoryMetaStore.java:79)
    at org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema.getTable(BeamCalciteSchema.java:107)
    at org.apache.beam.repackaged.sql.org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
    at org.apache.beam.repackaged.sql.org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:286)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:105)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:994)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:954)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3087)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3069)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3339)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:994)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:954)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:929)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:633)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:558)
    at org.apache.beam.repackaged.sql.org.apache.calcite.prepare.Prepare.prepareSql(Prepare.java:265)
    at org.apache.beam.repackaged.sql.org.apache.calcite.prepare.Prepare.prepareSql(Prepare.java:231)
    at org.apache.beam.repackaged.sql.org.apache.calcite.prepare.CalcitePrepareImpl.prepare2_(CalcitePrepareImpl.java:767)
    at org.apache.beam.repackaged.sql.org.apache.calcite.prepare.CalcitePrepareImpl.prepare_(CalcitePrepareImpl.java:631)
    at org.apache.beam.repackaged.sql.org.apache.calcite.prepare.CalcitePrepareImpl.prepareSql(CalcitePrepareImpl.java:601)
    at org.apache.beam.repackaged.sql.org.apache.calcite.jdbc.CalciteConnectionImpl.parseQuery(CalciteConnectionImpl.java:229)
    at org.apache.beam.repackaged.sql.org.apache.calcite.jdbc.CalciteMetaImpl.prepareAndExecute(CalciteMetaImpl.java:550)
    at org.apache.beam.repackaged.sql.org.apache.calcite.avatica.AvaticaConnection.prepareAndExecuteInternal(AvaticaConnection.java:675)
    at org.apache.beam.repackaged.sql.org.apache.calcite.avatica.AvaticaStatement.executeInternal(AvaticaStatement.java:156)
    at org.apache.beam.repackaged.sql.org.apache.calcite.avatica.AvaticaStatement.execute(AvaticaStatement.java:217)
    at sqlline.Commands.execute(Commands.java:823)
    at sqlline.Commands.sql(Commands.java:733)
    at sqlline.SqlLine.dispatch(SqlLine.java:795)
    at sqlline.SqlLine.begin(SqlLine.java:668)
    at org.apache.beam.sdk.extensions.sql.jdbc.BeamSqlLine.runSqlLine(BeamSqlLine.java:75)
    at org.apache.beam.sdk.extensions.sql.jdbc.BeamSqlLine.main(BeamSqlLine.java:39)

Caused by: java.lang.ClassNotFoundException: org.apache.beam.sdk.io.gcp.pubsub.PubsubIO$Read
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 40 more

Любая помощь будет оценена.

1 Ответ

1 голос
/ 16 мая 2019

Похоже, у вас нет PubsubIO, доступного во время выполнения. Оболочка по умолчанию не содержит никаких дополнительных операций ввода-вывода (или бегунов), вы должны явно собрать и иметь все такие дополнительные вещи в classpath, чтобы иметь возможность использовать их. При сборке оболочки должно быть достаточно указать необходимые модули SDK в командной строке arg -Pbeam.sql.shell.bundled.

Например, эта команда создает и устанавливает оболочку в комплекте с Flink Runner, Kafka IO и Google Cloud IO:

./gradlew -p sdks/java/extensions/sql/shell \
    -Pbeam.sql.shell.bundled=':runners:flink:1.5,:sdks:java:io:kafka,:sdks:java:io:google-cloud-platform' \
    installDist

Затем запустите:

./sdks/java/extensions/sql/shell/build/install/beam-sdks-java-extensions-sql-shell/bin/beam-sdks-java-extensions-sql-shell

Некоторые подробности здесь: https://beam.apache.org/documentation/dsls/sql/shell/

...