Чтобы узнать, какие файлы содержат эти строки, вы можете использовать функцию input_file_name
из pyspark.sql.functions
например,
df.where("col1 == '#torefresh'").withColumn("file", input_file_name()).show()
, с которой вы также можете легко получить агрегат с одной строкой наfile
df.where("col1 == '#torefresh'").withColumn("file", input_file_name()).groupBy("file").count().show()
+--------------------+-----+
| file|count|
+--------------------+-----+
|file:///C:/Users/...| 119|
|file:///C:/Users/...| 131|
|file:///C:/Users/...| 118|
|file:///C:/Users/...| 127|
|file:///C:/Users/...| 125|
|file:///C:/Users/...| 116|
+--------------------+-----+
Я не знаю хорошего способа поиска номера строки в исходном файле - в тот момент, когда вы загружаете csv в DataFrame, эта информация в значительной степени теряется.Существует функция row_number
, но она работает над окном, поэтому числа будут зависеть от того, как вы определяете разбиение / сортировку окон.
Если вы работаете с локальной файловой системой, вы можете попробовать вручную снова прочитать csv и найти номера строк, что-то вроде этого:
import csv
from pyspark.sql.functions import udf
from pyspark.sql.types import *
@udf(returnType=ArrayType(StringType()))
def getMatchingRows(filePath):
with open(filePath.replace("file:///", ""), 'r') as file:
reader = csv.reader(file)
matchingRows = [index for index, line in enumerate(reader) if line[0] == "#torefresh"]
return matchingRows
withRowNumbers = df.where("col1 == '#torefresh'")\
.withColumn("file", input_file_name())\
.groupBy("file")\
.count()\
.withColumn("rows", getMatchingRows("file"))
withRowNumbers.show()
+--------------------+-----+--------------------+
| file|count| rows|
+--------------------+-----+--------------------+
|file:///C:/Users/...| 119|[1, 2, 4, 5, 6, 1...|
|file:///C:/Users/...| 131|[1, 2, 3, 6, 7, 1...|
|file:///C:/Users/...| 118|[1, 2, 3, 4, 5, 7...|
|file:///C:/Users/...| 127|[1, 2, 3, 4, 5, 7...|
|file:///C:/Users/...| 125|[1, 2, 3, 5, 6, 7...|
|file:///C:/Users/...| 116|[1, 2, 3, 5, 7, 8...|
+--------------------+-----+--------------------+
Но это будет довольно неэффективнои если вы ожидаете, что эти строки будут во многих файлах, это превосходит смысл использования DataFrames.Я бы предложил поработать с источником данных и включить какие-то идентификаторы при создании, но это, конечно, если только вам не нужно знать, что этот файл содержит какие-либо.
Если помимо знания, что первое значение равно "#torefresh "вам также нужно, чтобы все остальные значения были нулевыми, вы можете расширить фильтр where
и ручную проверку в udf.