Как загрузить типы данных Collection с помощью коннектора spark-cassandra в пакетном режиме - PullRequest
0 голосов
/ 22 января 2019

Я пытаюсь загрузить искровой фрейм данных, имеющий два атрибута с типами данных коллекции, в таблицу Cassandra.

Во входящем файле фида эти атрибуты являются text / String. Я использовал приведенный ниже код для преобразования типа String в типы List и Map соответственно:

    spark.udf.register("getLst", (input: String) => input.split(",").toList)

    spark.udf.register("getMap", (input:String) => parse(input).values.asInstanceOf[Map[String, String]])

    val ofr_data_final=spark.sql("""select  
            ...
            getLst(acct_nb_ls) as acct_nb_ls, 
            getMap(brw_eci_and_sts_mp) as brw_eci_and_sts_mp, 
            .....""")

Схема печати кадра данных искры показывает эти два атрибута, как показано ниже:

    |-- acct_nb_ls: array (nullable = true)
    |    |-- element: string (containsNull = true)
    |-- brw_eci_and_sts_mp: map (nullable = true)
    |    |-- key: string
    |    |-- value: string (valueContainsNull = true)

В Кассандре эти два атрибута определены, как показано ниже:

    acct_nb_ls FROZEN<LIST<text>>,
    brw_eci_and_sts_mp FROZEN<MAP<text, text>>,

Вот мой оператор загрузки:

    ofr_data_final.rdd.saveToCassandra(Config.keySpace,offerTable, writeConf = WriteConf(ttl = TTLOption.perRow("ttl")))

Однако загрузка завершается с ошибкой ниже:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 140 in stage 24.0 failed 4 times, most recent failure: Lost task 140.3 in stage 24.0 (TID 1741, bdtcstr70n12.svr.us.jpmchase.net, executor 9): java.io.IOException: Failed to write statements to mars_offerdetails.offer_detail_2.
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:167)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:37)
    at com.jpmc.mars.LoadOfferData$.delayedEndpoint$com$jpmc$mars$LoadOfferData$1(LoadOfferData.scala:246)
    at com.jpmc.mars.LoadOfferData$delayedInit$body.apply(LoadOfferData.scala:22)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at com.jpmc.mars.LoadOfferData$.main(LoadOfferData.scala:22)
    at com.jpmc.mars.LoadOfferData.main(LoadOfferData.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:782)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.IOException: Failed to write statements to mars_offerdetails.offer_detail_2.
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:167)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Я подозреваю, что проблема может быть в том, что атрибут acct_nb_lst выводится как «массив», а не как «список», но я не уверен, как заставить spark выводить его как «список» вместо «массив». В моем UDF я определил упомянутый

    input.split(",").toList

но все равно он выводится как массив.

1 Ответ

0 голосов
/ 04 февраля 2019

Загрузка типов данных коллекции с использованием коннектора spark-cassandra в пакетном режиме работала, как и ожидалось, с опцией ttl на уровне записи с использованием rdd.saveToCassandra. Проблема была с данными. Данные были старыми и имели истекшие даты истечения, которые генерировали отрицательные значения ttl и, следовательно, загрузка не удалась.

Сообщение об ошибке Spark должно быть улучшено, чтобы подразумевать это.

...