Фильтр на новый DF, если столбец существует в текущем DF - PullRequest
0 голосов
/ 22 апреля 2020

У меня есть плоский файл, в котором есть столбец 998. Мне нужно проверить, если столбец 998 присутствует и столбец 999 нет, а затем поместить эти данные в новый DF.

Я пробовал следующее:

Создал функцию has_column(df, columnName), которая возвращает True или False. Протестировал эту функцию:

print(has_column(df,'_998')) - True
print(has_column(df,'_999')) - False

В моем фильтре я пробую следующее:

validRecordsDF=df.filter((has_column(df,'_996') == True & has_column(df,'_997') == False)).collect()

Здесь это не работает в Spark - TypeError: condition should be string or Column

Мне нужно это для Отфильтруйте только те записи, которые имеют 998 столбцов.

Если в Spark есть какой-либо другой подход, сообщите мне.

1 Ответ

0 голосов
/ 23 апреля 2020

Я пытался выполнить обработку файла, поскольку в Python это занимало много времени, так как у нас есть столбец 996 с почти 1М записей в каждом файле, поэтому он решил переместить его, чтобы запустить и обработать работу довольно быстро,

В DF, если столбец имеет значение NULL, при вызове collect () выдает ошибку для пропущенных полей. 5 полей являются обязательными, а 4 значения предоставляются.

Вместо этого я попытался сохранить операции в самом RDD и разделить хорошие и плохие записи, как показано в следующем примере: correctColumnRDD = splitDataRDD.filter (lambda z: len (z) == 5) invalidColumnRDD = splitDataRDD. фильтр (лямбда z: len (z)! = 5)

который я затем использую для дальнейшей обработки.

Спасибо @hristolliev за помощь.

С уважением и всем спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...