Я пытаюсь лемматизировать данные.
шаг 1 :: Я читаю фрейм данных с двумя столбцами с ключом и значением в качестве заголовка.
шаг 2 :: я собираю эти два столбца в карте [строка, строка]
Шаг 3 :: Я делаю одну функцию, которая берет строку и разделяет ее пробелом и проверяет каждое слово на карте. если ключ получает значение, и он заменяет эту перикулярную работу, если нет, то оставьте как есть.
Шаг 4: записать вывод в s3
При этом я каждый раз получаю эти ошибки. Иногда я тоже получаю. java.lang.StringIndexOutOfBoundsException: индекс строки вне диапазона: при попытке записи в текстовом формате.
val schema = StructType(Array(
StructField("key", StringType, true),
StructField("value", StringType, true)
))
val lemonization_dataframe = spark.read
.option("header" , true)
.option("delimiter" , ":")
.schema(schema)
.csv("s3://my-data-export/WORK/DEV/lemmatizer-german-extensive.csv")
val processed_lemonization_dataframe = lemonization_dataframe.withColumn("key_lowerCase" , lower(col("key")))
.withColumn("value_lowerCase" , lower(col("value")))
.withColumn("key_lowerCase_clean" , regexp_replace(col("key_lowerCase"), ",", ""))
.withColumn("value_lowerCase_clean" , regexp_replace(col("value_lowerCase"), ",", ""))
.select("key_lowerCase_clean", "value_lowerCase_clean")
val column_to_map_conversion = processed_lemonization_dataframe.rdd.map(row => (row.getString(0) -> row.getString(1))).collectAsMap()
val replacements_many = udf(
(str :String ) =>
str.split(" ").toList.map {
s =>
column_to_map_conversion.getOrElse(s , s)
}.mkString(" "))
val lemanization_dataframe = dataframe_without_dulicate_words.withColumn("lemanized_text" , replacements_many(col("data_without_duplicate_words"))).select("lemanized_text")
lemanization_dataframe.coalesce(1).write.format("csv").mode(SaveMode.Overwrite).save("s3://myh-data-export/WORK/DEV/MAHAVIR/pre_processed_with_lemmatization_csv_data_1")
Caused by: java.io.IOException: File already exists:s3://myh-data-export/WORK/DEV/MAHAVIR/pre_processed_with_lemmatization_csv_data_1/part-00000-f4bf64ab-fc3c-4c0c-85ea-a7a2c3203fab-c000.csv
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:513)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:932)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:913)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:810)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:202)
at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CSVFileFormat.scala:149)
at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:77)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.newOutputWriter(FileFormatWriter.scala:367)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:378)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:269)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:267)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1414)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
... 8 more