Один из столбцов в моем исходном файле данных содержит двойные кавычки ("), и когда я пытаюсь записать эти данные из кадра данных в hdf с использованием кода pyspark, он добавляет дополнительные разделители в файл. Не уверен, что происходит прямо здесь ,
Мои исходные данные содержат 51 столбец, а для строк, в которых данные содержат двойные кавычки ("), количество столбцов изменяется до 59. В качестве разделителя используется труба (|).
Файлы считываются из Google Cloud Storage (GCS), и после завершения преобразований с помощью pyspark на dataproc я записываю данные обратно в GCS.
Данные столбца с двойными кавычками выглядят как показано ниже.
"$200 CASHBACK in points on * Points valid for 30 days. First month is awarded 24 hours after purchase
Запись выписки из кадра данных - от df до hdfs
df.repartition("dt_col").write.partitionBy("dt_col").format('csv').option("quote", "'").option("nullValue", "").option("quoteMode", "NONE").mode("overwrite").options(delimiter="|",codec="org.apache.hadoop.io.compress.GzipCodec").save(hdfs_merge_path)
Исходные данные.
38896111|REGULAR EARN OFFER|Members earn $200 back in points on select qualifying mattresses.|$|S|N|MATTRESS / HOME BIG TIC|2017-11-01|2018-03-31|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|1|-6.43|-11|-3.85|-11|-11|-3.85|-11|-3.85|-6|1|2018-05-01|70815|1fffff0|2018-04-24||"$200 CASHBACK in points on select Sealy Response, Serta Perfect Sleeper and Beautyrest Silver mattresses*||1|S|S_ONLINE|9300|1|-11|2018-04-25
После записи вывод выглядит как
38896111|REGULAR EARN OFFER|Members earn $200 back in points on select qualifying mattresses.|$|S|N|MATTRESS / HOME BIG TIC|2017-11-01|2018-03-31|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|1|-6.43|-11|-3.85|-11|-11|-3.85|-11|-3.85|-6|1|2018-05-01|70815|1fffff0|2018-04-24||'$200 CASHBACK in points on select Sealy Response, Serta Perfect Sleeper and Beautyrest Silver mattresses*||1|S|S_ONLINE|9300|1|-11|2018-04-25'||||||||
Обратите внимание: Я должен использовать опцию ("quote", "'") во время записи, иначе при каждом запуске символ обратной косой черты добавляется перед двойной кавычкой.
Это поток кода, как показано ниже. Просто использовал 5 столбцов для примера здесь.
schema=StructType([StructField("col1",StringType(),True),StructField("col2",StringType(),True),StructField("col3",StringType(),True),StructField("col4",StringType(),True),StructField("col5",StringType(),True)])
schema_df=spark.read.schema(schema).option("delimiter","|").csv("gs://path/to/incremental_file.txt*")
schema_df.createOrReplaceTempView("df")
schema_df2 = spark.read.schema(schema).option("delimiter", "|").csv("gs://path/to/hist-file*.gz")
schema_df2.createOrReplaceTempView("df2")
union_fn = schema_df2.union(schema_df)
w = Window.partitionBy("col1","col2").orderBy(col("col4").desc())
union_result=union_fn.withColumn("row_num",row_number().over(w)).where(col("row_num") == 1).drop("row_num").drop("col4")
union_result.createOrReplaceTempView("merged_tbl")
schema_merged_tbl=spark.sql("""select col1,col2,col3,col5 as col6 from merged_tbl""")
schema_merged_tbl.repartition("col6").write.partitionBy("col6").format('csv').option("quote","'").option("nullValue","").option("quoteMode","NONE").mode("overwrite").options(delimiter="|",codec="org.apache.hadoop.io.compress.GzipCodec").save(hdfs_merge_path)