Я совершенно новичок в программировании на Spark и Scala.В настоящее время я работаю над Spark DataFrames.У меня есть требование перебирать записи и повторять одно и то же значение до следующего условия.Ниже приведен пример, у меня есть только один столбец в данном файле.В примере есть два типа значений: один - данные заголовка, а другой - подробные данные.Данные заголовка всегда имеют длину 10 символов, а подробные данные всегда имеют длину 15 символов.Я хотел бы объединить первые 10 символов со следующими рекордными 15 символами, пока мы не достигнем следующих 10 символов и т. Д. ...
df
---------------
1RHGTY567U //header data
6786TYUIOPTR141 //detail data
6786TYUIOPTYU67 //detail data
T7997999HHBFFE6 //detail data
8YUITY567U //header data
HJS7890876997BB //detail data
BFJFBFKFN787897
GS678790877656H
BFJFDK786WQ4243
74849469GJGNVFM
67YUBMHJKH
VFJF788968FJFJD
HFJFGKJD789768D
GFJFHFFLLJFJDLD
Я попробовал это, собрав DataFrame, просматривая его иобъединяя его с другой записью, как показано ниже.Метод, который я использовал, является дорогостоящей операцией, так как метод collect () не рекомендуется.Я мог бы использовать функцию окна запаздывания, чтобы объединить текущее значение с предыдущим значением, но мой сценарий немного отличается.
val srcDF = spark.read.format("csv").load(location + "/" + filename)
//Adding another column to the DataFrame which shows length of the value in the column
var newDF = srcDF.withColumn("col_length", length($"_c0"))
//Converting DataFrame to RDD
var RDD = newDF.map(row => row(0).toString + "," + row(1).toString).rdd
//Iterating through RDD to concatenate Header data with the detail
for (row <- RDD.collect) {
if (row.split(",")(1).toInt == 16) { Rec = row.split(",")(0).toString }
if (row.split(",")(1).toInt > 16) {
srcModified += Rec + row.split(",")(0).toString
}
else {
srcModified += Rec
}
}
//Converting ListBuffer to RDD
val modifiedRDD = sc.parallelize(srcModified.toSeq)
Результат, который я ожидаю, показан ниже:
new_DF
------
1RHGTY567U //header data
1RHGTY567U6786TYUIOPTR141 //header data concatenated with detail data
1RHGTY567U6786TYUIOPTYU67 //header data concatenated with detail data
1RHGTY567UT7997999HHBFFE6 //header data concatenated with detail data
8YUITY567U //header data
8YUITY567UHJS7890876997BB //header data concatenated with detail data
8YUITY567UBFJFBFKFN787897 //header data concatenated with detail data
8YUITY567UGS678790877656H //header data concatenated with detail data
8YUITY567UBFJFDK786WQ4243 //header data concatenated with detail data
8YUITY567U74849469GJGNVFM //header data concatenated with detail data
67YUBMHJKH
67YUBMHJKHVFJF788968FJFJD
67YUBMHJKHHFJFGKJD789768D
67YUBMHJKHGFJFHFFLLJFJDLD
Есть предложения, пожалуйста?