Scala: обрабатывать фрейм данных, пока значение в столбце соответствует условию - PullRequest
1 голос
/ 07 марта 2019

Мне нужно обработать огромный фрейм данных, загрузить файлы из сервиса по столбцу id фрейма данных.Логика для загрузки и все изменения подготовлены, но я не уверен, что это лучший способ сделать цикл вокруг этого.Я запускаю это на Databricks , поэтому мне нужно выполнять процессы в чанках.

В кадре данных есть столбец " status ", который может содержать следующиезначения:

"задача", "обработка", "сбой", "успешно"

В цикле while я хочу выполнить следующие задачи:

while (there are rows with status "todo") {
   - get the first 10 rows if status is todo (DONE)
   - start processing the dataframe, update status to processing (DONE)
   - download files (call UDF), update status to succeeded or failed
     (DONE, not in the code here)
}

Я бы хотел запустить этот , пока все строки 'status не будут отличаться от todo!Проблема в том, что этот цикл while не заканчивается, потому что сам фрейм данных не обновляется.Он должен быть назначен другому фрейму данных, но тогда как добавить новый в цикл?

Мой код прямо сейчас:

while(statusDoc.where("status == 'todo'").count > 0) {
  val todoDF = test.filter("status == 'todo'")

  val processingDF = todoDF.limit(10).withColumn("status", when(col("status") === "todo", "processing")
                           .otherwise(col("status")))

 statusDoc.join(processingDF, Seq("id"), "outer")
      .select($"id", \
       statusDoc("fileUrl"), \
       coalesce(processingDF("status"), statusDoc("status")).alias("status"))

}

Объединение должно выглядеть следующим образом:

val update = statusDoc.join(processingDF, Seq("id"), "outer")
                          .select($"id", statusDoc("fileUrl"),\
    coalesce(processingDF("status"), statusDoc("status")).alias("status"))

Затем этот новый update фрейм данных следует использовать для следующего цикла цикла.

1 Ответ

1 голос
/ 07 марта 2019

Следует помнить, что DataFrame (Spark) не является изменяемым, поскольку он распространяется.У вас нет гарантии, что данная модификация будет правильно распространена по всей сети исполнителей, если вы ее сделаете.И у вас также нет гарантии, что данная часть данных еще не использовалась где-то еще (например, в другом узле).

Одна вещь, которую вы можете сделать, это добавить еще один столбец с обновленными значениями и удалитьстарый столбец.

val update = statusDoc.
    .withColumnRenamed("status", "status_doc")
    .join(processingDF, Seq("id"), "outer")
    .withColumn("updated_status", udf((stold: String, stold: String) => if (stnew != null) stnew else stold).apply(col("status"), col("status_doc"))
    .drop("status_doc", "status")
    .withColumnRenamed("updated_status", "status")
    .select("id", "fileUrl", "status")

Затем убедитесь, что вы заменили «statusDoc» на «обновление» DataFrame.Не забудьте сделать DataFrame «var» вместо «val».Я удивлен, что ваша IDE еще не кричала.

Кроме того, я уверен, что вы можете придумать способ распространения проблемы, чтобы вы избежали цикла while - я могу помочь вам сделать это, но мне нужноболее четкое описание вашей проблемы.Если вы используете цикл while, вы не будете использовать все возможности вашего кластера, потому что цикл while выполняется только на главном.Затем вы будете обрабатывать только 10 строк за раз, каждый раз.Я уверен, что вы можете добавить все необходимые данные ко всему DataFrame в одной операции карты.

...