Ошибка Sparklyr: задание прервано из-за сбоя этапа 22.0 1 раз, недавний сбой: потерянное задание SparkException: невидимая метка - PullRequest
0 голосов
/ 25 января 2019

Я пытаюсь реализовать машинное обучение (kmeans) из моего источника искры.У меня есть одна таблица с 2 столбцами: обзор и ярлык (положительный или отрицательный) Все, кажется, работает хорошо.Но когда я запускаю прогноз, я получаю следующую ошибку:

SparkException: задание прервано из-за сбоя этапа: задание 0 на этапе 22.0 не выполнено 1 раз, последний сбой: потерянное задание 0.0 на этапе 22.0(TID 22, localhost): org.apache.spark.SparkException: невидимая метка

Вот код:

sc <- spark_connect(master = "local", version="2.0.0")

colnames(dfAllReviews3Cols) = c("ReviewText", "LabelReview")

#db_drop_table(sc, "dfallreviews3cols")
reviewsTbl <- copy_to(sc, dfAllReviews3Cols)

#List tables
src_tbls(sc)

#Select preview
reviews_preview <- dbGetQuery(sc, "SELECT * FROM dfallreviews3cols LIMIT 10")


##KMeans
partitions <- reviewsTbl %>%
  sdf_partition(training = 0.7, test = 0.3, seed = 999)

reviewsTbl_training <- partitions$training
reviewTbl_test <- partitions$test

kmeans_model <- reviewsTbl_training %>%
  ml_kmeans(ReviewText ~ .)

pred <- sdf_predict(reviewTbl_test, kmeans_model) %>% collect

Это ошибка, которую я получил:

pred <- sdf_predict (reviewTbl_test, kmeans_model)%>% collect
Ошибка: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задание 0 на этапе 22.0 не выполнено 1 раз, самое последнееОшибка: потерянная задача 0.0 на этапе 22.0 (TID 22, localhost): org.apache.spark.SparkException: невидимая метка: в моей комнате не работал кондиционер Когда ремонтник пришел, чтобы починить его, он не смог, а затем сказал мне, чтоэто зима, и людям не нужен номер AC было неудобно жарко. Выезд был кошмаром. Мой таксист ждал, чтобы отвезти меня в аэропорт. Дважды на ресепшене мне говорили, что у меня должны быть деньги, но после этого это было не так.эй проверил их записи. У меня возникла та же проблема при регистрации. Белл мальчику потребовалось более 20 минут, чтобы вынести мои вещи из моей комнаты, и я не рекомендовал бы этот отель.

at org.apache.spark.ml.feature.StringIndexerModel $$ anonfun $ 4.apply (StringIndexer.scala: 169)
в org.apache.spark.ml.feature.StringIndexerModel $$ anonfun $ 4.apply (StringIndexer.scala: 165)
в org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.processNext (неизвестный источник)
в org.apache.spark.sql.execution.BufferedRowIterator.hasNext (BufferedRowIterator.java:43)
в org.apache.spark.sql.execution.WholeStageCodegenExec $$ anonfun $ 8 $$ anon $ 1.hasNext (WholeStageCodegenExec.scala: 370) в org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 4.apply (SparkPlan.scala: 246).apache.spark.sql.execution.SparkPlan $$ anonfun $ 4.apply (SparkPlan.scala: 240) в org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1 $$ anonfun $ применить $ 24.apply (СДР.scala: 784) at org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1 $$ anonfun $ apply $ 24.apply (RDD.scala: 784) в org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 38) в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD).scala: 319) в org.apache.spark.rdd.RDD.iterator (RDD.scala: 283) в org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 70) в org.apache.spark.scheduler.Task.run (Task.scala: 85) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 274) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145)в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:615) в java.lang.Thread.run (Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:893)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:892)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182)
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2187)
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2187)
    at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2545)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2187)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2163)
    at sparklyr.Utils$.collect(utils.scala:200)
    at sparklyr.Utils.collect(utils.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at sparklyr.Invoke.invoke(invoke.scala:139)
    at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
    at sparklyr.StreamHandler.read(stream.scala:66)
    at sparklyr.BackendHandler.channelRead0(handler.scala:51)
    at sparklyr.BackendHandler.channelRead0(handler.scala:4)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Unseen label:  AC wasn t working in my room When the repair man came to fix it he couldn t and then told me that it s winter and people don t need the AC Room was uncomfortably hot Check out was a nightmare My cab driver was waiting to take me to the airport Twice reception told me I had money to be owed however this was untrue after they checked their records I had the same problem at check in Bell boy took over 20 min to bring my bags down from my room Wouldn t recommend this hotel .
    at org.apache

Как исправитьэто?

Спасибо, заранее!

1 Ответ

0 голосов
/ 30 января 2019

Это действительно не тот путь.Основываясь на сообщении об ошибке, ReviewText явно является блоком неструктурированного текста.

Если вы передадите его непосредственно в ml_kmeans, он будет рассматриваться как категориальная переменная, пропущенная через StringIndexer (именно здесь происходит сбой - если вас интересуют подробности, вы можете проверить, spark.ml StringIndexer выбрасывает 'Unseen label' в fit () , но на практике это вряд ли уместно здесь),Тогда результат соберется в вектор длины, равный 1.

Как вы можете себе представить, это не очень полезная модель.

В общем случае вы должны как минимум (этого может не быть и, скорее всего, будет недостаточно для достижения хороших результатов на практике):

  • Токенизация необработанного текста.
  • Кодирование токенов.

Spark ML предоставляет небольшой набор базовых преобразователей текста, включая, но не ограничиваясь, Tokenizer, StopWordsRemover, NGram, TF-IDF и дополнительные, более продвинутые инструменты предоставляются сторонними библиотеками (прежде всего NLP John Snow Labs), а также Pipeline API , который можно использовать для их составления в многократно используемые модули .Я настоятельно рекомендую вам прочитать официальную документацию по каждому из этих инструментов, прежде чем продолжить.

Возвращаясь к вашей проблеме, вы можете начать с чего-то вроде этого

pipeline <- ml_pipeline(
  # Tokenize the input
  ft_tokenizer(sc, input_col = "ReviewText", output_col = "raw_tokens"),

  # Remove stopwords - https://en.wikipedia.org/wiki/Stop_words
  ft_stop_words_remover(sc, input_col = "raw_tokens", output_col = "tokens"),

  # Apply TF-IDF - https://en.wikipedia.org/wiki/Tf-idf
  ft_hashing_tf(sc, input_col = "tokens", output_col = "tfs"),
  ft_idf(sc, input_col = "tfs", output_col = "features"),
  ml_kmeans(sc, features_col = "features", init_mode = "random")
)

model <- ml_fit(pipeline, reviewsTbl_training)

и настроить его так, чтобы оно подходиловаш конкретный сценарий.

...