Я использую локальный экземпляр Spark через пакет sparklyr
R на 64-ядерном компьютере с 64G RAM.
Мне нужно обработать тысячи текстовых файлов и разобрать в них адреса электронной почты.Цель состоит в том, чтобы иметь фрейм данных со столбцами, такими как имя пользователя, домен верхнего уровня, домен, поддомен (ы).Фрейм данных затем сохраняется как файл Parquet.Я разделяю файлы на пакеты по 2,5 ГБ каждый и обрабатываю каждую партию отдельно.
Большинство пакетов работают нормально, однако время от времени задача завершается сбоем, и весь пакет «уходит».Это вывод из журнала в таком случае:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 47 in stage 2260.0 failed 1 times, most recent failure: Lost task 47.0 in stage 2260.0 (TID 112228, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$2: (string) => array<string>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.project_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:57)
at org.apache.spark.RangePartitioner$$anonfun$13.apply(Partitioner.scala:306)
at org.apache.spark.RangePartitioner$$anonfun$13.apply(Partitioner.scala:304)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at java.util.regex.Matcher.getTextLength(Matcher.java:1283)
at java.util.regex.Matcher.reset(Matcher.java:309)
at java.util.regex.Matcher.<init>(Matcher.java:229)
at java.util.regex.Pattern.matcher(Pattern.java:1093)
at java.util.regex.Pattern.split(Pattern.java:1206)
at java.util.regex.Pattern.split(Pattern.java:1273)
at scala.util.matching.Regex.split(Regex.scala:526)
at org.apache.spark.ml.feature.RegexTokenizer$$anonfun$createTransformFunc$2.apply(Tokenizer.scala:144)
at org.apache.spark.ml.feature.RegexTokenizer$$anonfun$createTransformFunc$2.apply(Tokenizer.scala:141)
... 22 more
Я использую FTRegexTokenizer
довольно часто, например, здесь, чтобы разделить адрес электронной почты на имя пользователя и домены:
spark_tbls_separated %<>%
ft_regex_tokenizer(input_col = "email",
output_col = "email_split",
pattern = "@",
to_lower_case = FALSE) %>%
sdf_separate_column(column = "email_split",
into = c("email_user", "email_domain")) %>%
select(-email_split, -email)
ТакТеперь я хотел бы знать, какое именно преобразование Spark точно вызывает ошибку, и для какого типа входных данных оно терпит неудачу, чтобы я мог фактически отладить причину ошибки.Я предполагаю, что единственный способ понять это - посмотреть журналы задач (они вообще существуют?).В идеале я мог бы посмотреть журнал задачи 47 и получить более подробную информацию о регистрации.Как я могу получить доступ к ним на локальной машине?
Вот мои параметры конфигурации, например, сервер истории готов к работе:
spark_config <- spark_config()
spark_config$`sparklyr.shell.driver-memory` <- "64G"
spark_config$spark.memory.fraction <- 0.75
spark_config$spark.speculation <- TRUE
spark_config$spark.speculation.multiplier <- 2
spark_config$spark.speculation.quantile <- 0.5
spark_config$sparklyr.backend.timeout <- 3600 * 2 # two-hour timeout
spark_config$spark.eventLog.enabled <- TRUE
spark_config$spark.eventLog.dir <- "file:///tmp/spark-events"
spark_config$spark.history.fs.logDirectory <- "file:///tmp/spark-events"
sc <- spark_connect(master = "local", config = spark_config)
Обратите внимание, что этот вопрос не о фактической ошибке, замеченной здесь, а овозможность проверки журнала задач, чтобы выяснить, в какой строке мой sparklyr
сценарий завершается ошибкой .