Я не уверен, какие записи вы называете плохими, поскольку мы не можем видеть ваши входные данные.Исходя из моего предположения, допустим, что у нас есть входной файл ниже, имеющий пять столбцов.
col1,col2,col3,col4,col5
1,ABC,YYY,101,USA
2,ABC,ZZZ,102,USA
3,ABC,,,USA
4,ABC,GGG,104,USA
5,ABC,PPP,105
и строка №.3 имеет несколько пустых столбцов, а в строке № 5 меньше столбцов.поэтому я не хочу загружать эти две записи в мой фрейм данных.
PATH_TO_FILE = "file:///user/vikrant/hivespark/userinput"
df = sc.textFile(PATH_TO_FILE)\
.mapPartitions(lambda line: csv.reader(line,delimiter=',', quotechar='"'))\
.map(lambda x: [i for i in x if len(i)!= 0]) \
.filter(lambda line: len(line) > 4 and line[0] != 'col1') \
.toDF(['Col1','Col2','Col3','Col4','Col5'])
>>> df.show();
+----+----+----+----+----+
|Col1|Col2|Col3|Col4|Col5|
+----+----+----+----+----+
| 1| ABC| YYY| 101| USA|
| 2| ABC| ZZZ| 102| USA|
| 4| ABC| GGG| 104| USA|
+----+----+----+----+----+
и, если вы хотите извлечь плохие записи из вашего входного файла:
badrecords = sc.textFile(PATH_TO_FILE)\
.mapPartitions(lambda line: csv.reader(line,delimiter=',', quotechar='"'))\
.map(lambda x: [i for i in x if len(i)!= 0]) \
.filter(lambda line: len(line) < 5 and line[0] != 'col1')
>>> badrecords.take(10)
[['3', 'ABC', 'USA'], ['5', 'ABC', 'PPP', '105']]
Дайте мне знать, еслиэто работает для вас или помогает!