Кирпичи данных Создание таблицы с использованием MongoDB в качестве источника - PullRequest
0 голосов
/ 19 февраля 2019

Databricks / Spark SQL команда создания таблицы поддерживает использование источника данных.Как мне указать MongoDB в качестве источника данных?В конечном итоге я намереваюсь создать представление Databricks / Spark на основе этой таблицы.

Из документации по Databricks:

CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
[(col_name1 col_type1 [COMMENT col_comment1], ...)]
USING datasource
[OPTIONS (key1=val1, key2=val2, ...)]
[PARTITIONED BY (col_name1, col_name2, ...)]
[CLUSTERED BY (col_name3, col_name4, ...) INTO num_buckets BUCKETS]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1=val1, key2=val2, ...)]
[AS select_statement]

USING <data source>
The file format to use for the table. One of TEXT, CSV, JSON, JDBC, PARQUET, 
ORC, HIVE, DELTA, and LIBSVM, or a fully-qualified class name of a custom 
implementation of org.apache.spark.sql.sources.DataSourceRegister.

Я могу успешно создать фрейм данных из коллекции MongoDB с помощью PySparkи искровой соединитель MongoDB с этим базовым синтаксисом:

df = (spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("uri", uri)
.option("spark.mongodb.input.database", databaseName)
.option("spark.mongodb.input.collection", collectionName)
.load()

Я пытался использовать подобный синтаксис в SQL (ниже):

CREATE TABLE my_table
USING com.mongodb.spark.sql.DefaultSource
OPTIONS (
  uri "mongodb://...",
  spark.mongodb.input.database "myDatabaseName",
  spark.mongodb.input.collection "myCollectionName",
)

Но получаю следующую ошибку NoSuchMethodError:

Error in SQL statement: SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4.0 (TID 19, 192.168.4.4, executor 0): java.lang.NoSuchMethodError: com.mongodb.internal.operation.SyncOperations.aggregate(Ljava/util/List;Ljava/lang/Class;JJLjava/lang/Integer;Lcom/mongodb/client/model/Collation;Lorg/bson/conversions/Bson;Ljava/lang/String;Ljava/lang/Boolean;Ljava/lang/Boolean;Lcom/mongodb/client/model/AggregationLevel;)Lcom/mongodb/operation/ReadOperation;
at com.mongodb.client.internal.AggregateIterableImpl.asReadOperation(AggregateIterableImpl.java:166)
at com.mongodb.client.internal.MongoIterableImpl.execute(MongoIterableImpl.java:132)
at com.mongodb.client.internal.MongoIterableImpl.iterator(MongoIterableImpl.java:86)
at com.mongodb.spark.rdd.MongoRDD.getCursor(MongoRDD.scala:189)
at com.mongodb.spark.rdd.MongoRDD.compute(MongoRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:304)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:304)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:304)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
at org.apache.spark.scheduler.Task.run(Task.scala:112)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...