У меня есть около 10000 CSV-файлов, каждый из которых содержит 14 столбцов. Они содержат данные, касающиеся финансовой организации, торговых значений, даты и времени.
Некоторые файлы csv являются просто заголовками и не содержат данных. Мне удалось загрузить все файлы csv в моей локальной файловой системе hadoop. Чего я хочу добиться, так это отфильтровать данные, чтобы они включали записи, происходящие только с 9:00 до 18:00.
Как бы я этого достиг? Я так запутался с лямбдой и фильтром, и все это существует в спарк-питоне.
Не могли бы вы показать мне, как я могу отфильтровать это и использовать отфильтрованные данные для других анализов?
П.С., зимнее и летнее время также необходимо учитывать, и я подумал, что у меня должны быть некоторые функции, чтобы изменить время на формат UTC, возможно?
Поскольку я беспокоюсь о фильтрации данных на основе столбца Time в моем csv-файле, я упростил csvs. скажем:
CSV 1: (Filter.csv)
- ISIN, валюта, дата, время
- "1", "EUR", 2018-05-08,07: 00
- "2", "EUR", 2018-05-08,17: 00
- "3", "EUR", 2018-05-08,06: 59
- "4", "EUR", 2018-05-08,17: 01
CSV 2: (NoFilter.csv)
- ISIN, валюта, дата, время
- "1", "EUR", 2018-05-08,07: 01
- "2", "EUR", 2018-05-08,16: 59
- "3", "EUR", 2018-05-08,10: 59
- "4", "EUR", 2018-05-08,15: 01
и мой код:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
sqlc = SQLContext(sc)
ehsanLocationFiltered = 'hdfs://localhost:54310/user/oxclo/ehsanDbs/Filter.csv'
ehsanLocationNonFiltered = 'hdfs://localhost:54310/user/oxclo/ehsanDbs/NoFilter.csv'
df = sqlContext.read.format('com.databricks.spark.csv')\
.options(header='true', inferschema='true')\
.load(ehsanLocationNonFiltered)
dfFilter = sqlContext.read.format('com.databricks.spark.csv')\
.options(header='true', inferschema='true')\
.load(ehsanLocationFiltered)
data = df.rdd
dataFilter = dfFilter.rdd
data.filter(lambda row: row.Time > '07:00' and row.Time < '17:00')
dataFilter.filter(lambda row: row.Time > '07:00' and row.Time < '17:00')
print data.count()
print dataFilter.count()
Я ожидаю, что data.count вернет 4, так как все времена соответствуют диапазону, а dataFilter.count вернет 0, поскольку времени не найдено.
Спасибо!