SQL-запрос Spark на DSE Cassandra в Scala - PullRequest
0 голосов
/ 22 мая 2018

Я хочу протестировать запрос Spark-SQL для таблицы DSE Cassandra в Scala IDE.Запрос выполняется без ошибок, когда файл JAR выполняется в dse spark-submit.Но выдает ошибку при запуске в Scala IDE.Ошибка:

Исключение в потоке "main" org.apache.spark.sql.AnalysisException: Таблица или представление не найдены: killr_video. videos;строка 1, позиция 14;

Я думаю, что это ошибка конфигурации мастера свечей, поскольку я запускаю мастер в локальном режиме.

Вот сеанс искры, который я инициировал.

val spark = SparkSession
          .builder()
          .appName("CassandraSpark")
          .config("spark.cassandra.connection.host", "127.0.0.1")
          .config("spark.cassandra.connection.port", "9042")
          .enableHiveSupport()
          .master("local")
          .getOrCreate();

Но я не знаю, какой адрес установить в качестве главного.Я попытался установить главный адрес как «spark: //127.0.0.1: 7077», который я нашел в веб-интерфейсе (localhost: 7080) при запуске Cassandra.Но, тем не менее, он выдал ошибку следующим образом:

ОШИБКА MapOutputTrackerMaster: Ошибка связи с MapOutputTracker java.lang.InterruptedException в java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanavaS6: at1scala.concurrent.impl.Promise $ DefaultPromise.tryAwait (Promise.scala: 212) в scala.concurrent.impl.Promise $ DefaultPromise.ready (Promise.scala: 222) в scala.concurrent.impl.Promise $ DefaultPromise.result (Promise.scala: 227) в scala.concurrent.Await $$ anonfun $ result $ 1.apply (package.scala: 190) в scala.concurrent.BlockContext $ DefaultBlockContext $ .blockOn (BlockContext.scala: 53) в scala.concurrent.Ожидайте $ .result (package.scala: 190) в org.apache.spark.rpc.RpcTimeout.awaitResult (RpcTimeout.scala: 81) в org.apache.spark.rpc.RpcEndpointRef.askWithRetry (RpcEndpointRef.scala: 102)org.apache.spark.rpc.RpcEndpointRef.askWithRetry (RpcEndpointRef.scala: 78) в org.apache.spark.MapOutputTracker.askTracker (MapOutputTracker.scala: 100) в org.apache.spark.MapOutputTracker.sendTracker (MapOutputTracker.scala: 110) в org.apache.spark.MapOutputTrackerMaster.stop (MapOutputTracker.scala: 580) в org.apache.spark.SparkEnv.stop (SparkEnv.stop (SparkEnv.stop).scala: 84) в org.apache.spark.SparkContext $$ anonfun $ stop $ 11.apply $ mcV $ sp (SparkContext.scala: 1797) в org.apache.spark.util.Utils $ .tryLogNonFatalError (Utils.scala:1290) по адресу org.apache.spark.SparkContext.stop (SparkContext.scala: 1796) по адресу org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead (StandaloneSchedulerBackend.scala: 142) в org.apentsp..StandaloneAppClient $ ClientEndpoint.markDead (StandaloneAppClient.scala: 254) в org.apache.spark.deploy.client.StandaloneAppClient $ ClientEndpoint $$ anon $ 2.run (StandaloneAppClient.scala: 131Execur..call (Executors.java:511) на java.util.concurrent.FutureTask.run (FutureTask.java:266) на java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.access $ 201 (ScheduledThreadPool)Executor.java:180) в java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293) в java.util.concurrent.ThreadPoolExecutor.runWorker.read_exec.tutor$ Worker.run (ThreadPoolExecutor.java:624) в java.lang.Thread.run (Thread.java:748) 18/05/22 11:46:44 ОШИБКИ: невыполненное исключение в потоке appclient-registration-retry-threadorg.apache.spark.SparkException: Ошибка связи с MapOutputTracker в org.apache.spark.MapOutputTracker.askTracker (MapOutputTracker.scala: 104) в org.apache.spark.MapOutputTracker.sendTracker (MapOutputTracker.scalaap 110 или at).spark.MapOutputTrackerMaster.stop (MapOutputTracker.scala: 580) в org.apache.spark.SparkEnv.stop (SparkEnv.scala: 84) в org.apache.spark.SparkContext $$ anonfun $ stop $ 11.apply $ mcV $ sp(SparkContext.scala: 1797)в org.apache.spark.util.Utils $ .tryLogNonFatalError (Utils.scala: 1290) в org.apache.spark.SparkContext.stop (SparkContext.scala: 1796) в org.apache.spark.scheduler.cackter.StandaloneScheler.dead (StandaloneSchedulerBackend.scala: 142) в org.apache.spark.deploy.client.StandaloneAppClient $ ClientEndpoint.markDead (StandaloneAppClient.scala: 254) в org.apache.spark.deploy.client.Endli $выполнить (StandaloneAppClient.scala: 131) в java.util.concurrent.Executors $ RunnableAdapter.call (Executors.java:511) в java.util.concurrent.FutureTask.run (FutureTask.java:266) в java.util.concrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.access $ 201 (ScheduledThreadPoolExecutor.java:180) в java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.run (ScheduledThreadPoolExececutor.java :o.в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) вjava.lang.Thread.run (Thread.java:748) Вызывается: java.lang.InterruptedException в java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos (AbstractQueuedSynchronizer.java:1326) на уровне scala.DefaultPromise.tryAwait (Promise.scala: 212) в scala.concurrent.impl.Promise $ DefaultPromise.ready (Promise.scala: 222) в scala.concurrent.impl.Promise $ DefaultPromise.result (Promise.scala: 227) в scala.concurrent.Await $$ anonfun $ result $ 1.apply (package.scala: 190) в scala.concurrent.BlockContext $ DefaultBlockContext $ .blockOn (BlockContext.scala: 53) в scala.concurrent.Await $ .result (package.scala: 190) в org.apache.spark.rpc.RpcTimeout.awaitResult (RpcTimeout.scala: 81) в org.apache.spark.rpc.RpcEndpointRef.askWithRetry (RpcEndpointRef.scala: 102) в org.apache.spark.RpcEndpointRef.askWithRetry (RpcEndpointRef.scala: 78) в org.apache.spark.MapOutputTracker.askTracker (MapOutputTracker.scala: 100) ... еще 16 18/05/22 11:46:44 ОШИБКА SparkContext: Ошибка инициализации SparkContext.java.lang.NullPointerException в org.apache.spark.SparkContext. (SparkContext.scala: 546) в org.apache.spark.SparkContext $ .getOrCreate (SparkContext.scala: 2258) в org.apache.spark.sql $ SparkSq.Builder $$ anonfun $ 8.apply (SparkSession.scala: 831) в org.apache.spark.sql.SparkSession $ Builder $$ anonfun $ 8.apply (SparkSession.scala: 823) в scala.Option.getOrElse (Option.scala: Option.scala:121) at org.apache.spark.sql.SparkSession $ Builder.getOrCreate (SparkSession.scala: 823) 18/05/22 11:46:44 ИНФОРМАЦИЯ SparkContext: SparkContext уже остановлен.Исключение в потоке "main" java.lang.NullPointerException в org.apache.spark.SparkContext. (SparkContext.scala: 546) в org.apache.spark.SparkContext $ .getOrCreate (SparkContext.scala: 2258) в org.apache.spark.sql.SparkSession $ Builder $$ anonfun $ 8.apply (SparkSession.scala: 831) в org.apache.spark.sql.SparkSession $ Builder $$ anonfun $ 8.apply (SparkSession.scala: 823) в scala.Option.getOrElse (Option.scala: 121) в org.apache.spark.sql.SparkSession $ Builder.getOrCreate (SparkSession.scala: 823)

Что я могу сделать, чтобы этот код работал?

1 Ответ

0 голосов
/ 22 мая 2018

Вам не нужно жестко кодировать IP-адреса Cassandra или мастера - просто создайте объект SparkSession, и он будет работать.Вот рабочий код (на Java):

SparkSession spark = SparkSession
  .builder()
  .appName("CassandraSpark")
  .getOrCreate();

Dataset<Row> sqlDF = spark.sql("select * from test.t1 limit 1000");
sqlDF.printSchema();
sqlDF.show();

В DSE, если вы отправляете в распределенный кластер, вы можете указать master как dse://?, и DSE найдет текущего master автоматически.Все возможные варианты описаны в документации .

...