Дополнительные разделители при записи искрового фрейма данных в hdfs - PullRequest
0 голосов
/ 28 апреля 2018

Один из столбцов в моем исходном файле данных содержит двойные кавычки ("), и когда я пытаюсь записать эти данные из кадра данных в hdf с использованием кода pyspark, он добавляет дополнительные разделители в файл. Не уверен, что происходит прямо здесь , Мои исходные данные содержат 51 столбец, а для строк, в которых данные содержат двойные кавычки ("), количество столбцов изменяется до 59. В качестве разделителя используется труба (|). Файлы считываются из Google Cloud Storage (GCS), и после завершения преобразований с помощью pyspark на dataproc я записываю данные обратно в GCS.

Данные столбца с двойными кавычками выглядят как показано ниже.

"$200 CASHBACK in points on *   Points valid for 30 days. First month is awarded 24 hours after purchase 

Запись выписки из кадра данных - от df до hdfs

df.repartition("dt_col").write.partitionBy("dt_col").format('csv').option("quote", "'").option("nullValue", "").option("quoteMode", "NONE").mode("overwrite").options(delimiter="|",codec="org.apache.hadoop.io.compress.GzipCodec").save(hdfs_merge_path)

Исходные данные.

38896111|REGULAR EARN OFFER|Members earn $200 back in points on select qualifying mattresses.|$|S|N|MATTRESS / HOME BIG TIC|2017-11-01|2018-03-31|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|1|-6.43|-11|-3.85|-11|-11|-3.85|-11|-3.85|-6|1|2018-05-01|70815|1fffff0|2018-04-24||"$200 CASHBACK in points on select Sealy Response, Serta Perfect Sleeper and Beautyrest Silver mattresses*||1|S|S_ONLINE|9300|1|-11|2018-04-25

После записи вывод выглядит как

38896111|REGULAR EARN OFFER|Members earn $200 back in points on select qualifying mattresses.|$|S|N|MATTRESS / HOME BIG TIC|2017-11-01|2018-03-31|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|1|-6.43|-11|-3.85|-11|-11|-3.85|-11|-3.85|-6|1|2018-05-01|70815|1fffff0|2018-04-24||'$200 CASHBACK in points on select Sealy Response, Serta Perfect Sleeper and Beautyrest Silver mattresses*||1|S|S_ONLINE|9300|1|-11|2018-04-25'||||||||

Обратите внимание: Я должен использовать опцию ("quote", "'") во время записи, иначе при каждом запуске символ обратной косой черты добавляется перед двойной кавычкой.

Это поток кода, как показано ниже. Просто использовал 5 столбцов для примера здесь.

schema=StructType([StructField("col1",StringType(),True),StructField("col2",StringType(),True),StructField("col3",StringType(),True),StructField("col4",StringType(),True),StructField("col5",StringType(),True)]) 

 schema_df=spark.read.schema(schema).option("delimiter","|").csv("gs://path/to/incremental_file.txt*")    

schema_df.createOrReplaceTempView("df")    

schema_df2 = spark.read.schema(schema).option("delimiter", "|").csv("gs://path/to/hist-file*.gz")    

schema_df2.createOrReplaceTempView("df2")    

union_fn = schema_df2.union(schema_df)    

w = Window.partitionBy("col1","col2").orderBy(col("col4").desc())    

union_result=union_fn.withColumn("row_num",row_number().over(w)).where(col("row_num") == 1).drop("row_num").drop("col4") 

union_result.createOrReplaceTempView("merged_tbl") 

schema_merged_tbl=spark.sql("""select col1,col2,col3,col5 as col6 from merged_tbl""") 

 schema_merged_tbl.repartition("col6").write.partitionBy("col6").format('csv').option("quote","'").option("nullValue","").option("quoteMode","NONE").mode("overwrite").options(delimiter="|",codec="org.apache.hadoop.io.compress.GzipCodec").save(hdfs_merge_path)

1 Ответ

0 голосов
/ 01 мая 2018

По умолчанию spark-csv настраивает символ цитирования CSV в виде двойной кавычки ("). При обнаружении двойной кавычки, которая не имеет префикса перед escape-символом, который по умолчанию является обратной косой чертой (\), он предполагает, что вводит значение, которое может содержать символ-разделитель. В ваших данных это означает, что, как только он видит первую двойную кавычку, он начинает читать, ожидая, что достигнет второй двойной кавычки, чтобы закончить значение. Это не происходит в одной строке и поэтому предполагает, что в записи просто отсутствуют значения для этих полей, и вместо них вводятся пустые / пустые места.

Многие параметры spark-csv в конечном итоге устанавливаются для базового объекта apache-commons CSVFormat , используемого для описания того, как должен выполняться синтаксический анализ. Если во входном наборе данных символ двойной кавычки не следует использовать в качестве кавычки, а вместо этого следует разрешить его в значении поля, вы можете отключить логику кавычек с помощью:

 schema_df=spark.read.schema(schema).
   option("delimiter","|").
   option("quote", null).
   csv("gs://path/to/incremental_file.txt*")    
...