Pyspark читает csv, комбинирует столбец даты и времени и фильтрует его - PullRequest
0 голосов
/ 31 августа 2018

У меня есть около 10000 CSV-файлов, каждый из которых содержит 14 столбцов. Они содержат данные, касающиеся финансовой организации, торговых значений, даты и времени.

Некоторые файлы csv являются просто заголовками и не содержат данных. Мне удалось загрузить все файлы csv в моей локальной файловой системе hadoop. Чего я хочу добиться, так это отфильтровать данные, чтобы они включали записи, происходящие только с 9:00 до 18:00.

Как бы я этого достиг? Я так запутался с лямбдой и фильтром, и все это существует в спарк-питоне.

Не могли бы вы показать мне, как я могу отфильтровать это и использовать отфильтрованные данные для других анализов?

П.С., зимнее и летнее время также необходимо учитывать, и я подумал, что у меня должны быть некоторые функции, чтобы изменить время на формат UTC, возможно?

Поскольку я беспокоюсь о фильтрации данных на основе столбца Time в моем csv-файле, я упростил csvs. скажем:

CSV 1: (Filter.csv)

  • ISIN, валюта, дата, время
  • "1", "EUR", 2018-05-08,07: 00
  • "2", "EUR", 2018-05-08,17: 00
  • "3", "EUR", 2018-05-08,06: 59
  • "4", "EUR", 2018-05-08,17: 01

CSV 2: (NoFilter.csv)

  • ISIN, валюта, дата, время
  • "1", "EUR", 2018-05-08,07: 01
  • "2", "EUR", 2018-05-08,16: 59
  • "3", "EUR", 2018-05-08,10: 59
  • "4", "EUR", 2018-05-08,15: 01

и мой код:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

sqlc = SQLContext(sc)

ehsanLocationFiltered = 'hdfs://localhost:54310/user/oxclo/ehsanDbs/Filter.csv'
ehsanLocationNonFiltered = 'hdfs://localhost:54310/user/oxclo/ehsanDbs/NoFilter.csv'

df = sqlContext.read.format('com.databricks.spark.csv')\
.options(header='true', inferschema='true')\
.load(ehsanLocationNonFiltered)

dfFilter = sqlContext.read.format('com.databricks.spark.csv')\
.options(header='true', inferschema='true')\
.load(ehsanLocationFiltered)

data = df.rdd
dataFilter = dfFilter.rdd

data.filter(lambda row: row.Time > '07:00' and row.Time < '17:00')
dataFilter.filter(lambda row: row.Time > '07:00' and row.Time < '17:00')

print data.count()
print dataFilter.count()

Я ожидаю, что data.count вернет 4, так как все времена соответствуют диапазону, а dataFilter.count вернет 0, поскольку времени не найдено.

Спасибо!

Ответы [ 2 ]

0 голосов
/ 01 сентября 2018

В вашем коде вы можете использовать только 'csv' в качестве формата

from pyspark import SparkContext, SparkConf
ehsanLocationFiltered = '/FileStore/tables/stackoverflow.csv'
df = sqlContext.read.format('csv')\
.options(header='true', inferschema='true')\
.load(ehsanLocationFiltered).rdd
result=data.map(lambda row: row.Time > '07:00' and row.Time < '17:00')
result.count()
0 голосов
/ 01 сентября 2018

Хорошо, я выяснил, в чем проблема с моим кодом! Я должен был использовать:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

sqlc = SQLContext(sc)

ehsanLocationFiltered = 'hdfs://localhost:54310/user/oxclo/ehsanDbs/Filter.csv'
ehsanLocationNonFiltered = 'hdfs://localhost:54310/user/oxclo/ehsanDbs/NoFilter.csv'

df = sqlContext.read.format('com.databricks.spark.csv')\
   .options(header='true', inferschema='true')\
   .load(ehsanLocationNonFiltered)

dfFilter = sqlContext.read.format('com.databricks.spark.csv')\
   .options(header='true', inferschema='true')\
   .load(ehsanLocationFiltered)

data = df.rdd
dataFilter = dfFilter.rdd

filteredResult = data.filter(lambda row: row.Time > '07:00' and row.Time < '17:00')
filteredResultExpected =dataFilter.filter(lambda row: row.Time > '07:00' and row.Time < '17:00')

print filteredResult.count()
print filteredResultExpected.count()

FilterResultExpected = и FilterResult отсутствует!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...