У блоков данных spark.read csv есть строка #torefresh - PullRequest
0 голосов
/ 27 сентября 2019

Я собираюсь прочитать csv в dataframe 1. Я создаю структуру 2. load csv spark.read.option ("header", "false"). Schema (schema) .option ('delimiter', ',') .option ('mode', 'PERMISSIVE'). csv (path1) введите описание изображения здесь

Как проверить, какие файлы / какие строки получают #torefresh и null ... ... ???

1 Ответ

0 голосов
/ 28 сентября 2019

Чтобы узнать, какие файлы содержат эти строки, вы можете использовать функцию input_file_name из pyspark.sql.functions

например,

df.where("col1 == '#torefresh'").withColumn("file", input_file_name()).show()

, с которой вы также можете легко получить агрегат с одной строкой наfile

df.where("col1 == '#torefresh'").withColumn("file", input_file_name()).groupBy("file").count().show()

+--------------------+-----+
|                file|count|
+--------------------+-----+
|file:///C:/Users/...|  119|
|file:///C:/Users/...|  131|
|file:///C:/Users/...|  118|
|file:///C:/Users/...|  127|
|file:///C:/Users/...|  125|
|file:///C:/Users/...|  116|
+--------------------+-----+

Я не знаю хорошего способа поиска номера строки в исходном файле - в тот момент, когда вы загружаете csv в DataFrame, эта информация в значительной степени теряется.Существует функция row_number, но она работает над окном, поэтому числа будут зависеть от того, как вы определяете разбиение / сортировку окон.

Если вы работаете с локальной файловой системой, вы можете попробовать вручную снова прочитать csv и найти номера строк, что-то вроде этого:

import csv
from pyspark.sql.functions import udf
from pyspark.sql.types import *

@udf(returnType=ArrayType(StringType()))
def getMatchingRows(filePath):
    with open(filePath.replace("file:///", ""), 'r') as file:
      reader = csv.reader(file)
      matchingRows = [index for index, line in enumerate(reader) if line[0] == "#torefresh"]
      return matchingRows

withRowNumbers = df.where("col1 == '#torefresh'")\
    .withColumn("file", input_file_name())\
    .groupBy("file")\
    .count()\
    .withColumn("rows", getMatchingRows("file"))
withRowNumbers.show()

+--------------------+-----+--------------------+
|                file|count|                rows|
+--------------------+-----+--------------------+
|file:///C:/Users/...|  119|[1, 2, 4, 5, 6, 1...|
|file:///C:/Users/...|  131|[1, 2, 3, 6, 7, 1...|
|file:///C:/Users/...|  118|[1, 2, 3, 4, 5, 7...|
|file:///C:/Users/...|  127|[1, 2, 3, 4, 5, 7...|
|file:///C:/Users/...|  125|[1, 2, 3, 5, 6, 7...|
|file:///C:/Users/...|  116|[1, 2, 3, 5, 7, 8...|
+--------------------+-----+--------------------+

Но это будет довольно неэффективнои если вы ожидаете, что эти строки будут во многих файлах, это превосходит смысл использования DataFrames.Я бы предложил поработать с источником данных и включить какие-то идентификаторы при создании, но это, конечно, если только вам не нужно знать, что этот файл содержит какие-либо.

Если помимо знания, что первое значение равно "#torefresh "вам также нужно, чтобы все остальные значения были нулевыми, вы можете расширить фильтр where и ручную проверку в udf.

...