Как читать CSV-файлы с локального узла драйвера с помощью Spark? - PullRequest
0 голосов
/ 24 октября 2019

Мне пришлось разархивировать файлы из Amazon S3 в мой узел драйвера (кластер Spark), и мне нужно загрузить все эти CSV-файлы в виде Spark Dataframe, но я обнаружил следующую проблему при попытке загрузить данные из драйвераузел:

PySpark:

df = self.spark.read.format("csv").option("header", True).load("file:/databricks/driver/*.csv")

'Путь не существует: файл: / folder / *. csv'

Я пытался переместитьвсе эти файлы в dbfs, используя dbutils.fs.mv (), но я использую файл Python и не могу использовать dbutils (). Я думаю, что мне нужно передать файл, но я не знаю, как, потому что я пытался с self.sc.textFile("file:/databricks/driver/*.csv").collect() и self.sc.addFile("file:/databricks/driver/*.csv") и процесс не может найти файлы.

ОБНОВЛЕНИЕ Когда я запустил этот код:

import os
BaseLogs("INFO", os.getcwd())
folders = []
for r, d, f in os.walk(os.getcwd()):
    for folder in d:
      folders.append(os.path.join(r, folder))

for f in folders:
    BaseLogs("INFO", f)
BaseLogs("INFO", os.listdir("/databricks/driver/zipFiles/s3Sensor/2017/Tracking_Bounces_20190906.csv.zip"))
BaseLogs("INFO", os.listdir("/databricks/driver/zipFiles/s3Sensor/2017/Tracking_Opens_20190907.zip"))

я получил: enter image description here

Затем я попытался сделать:

try:
    df = self.spark.read.format("csv").option("header", True).option("inferSchema", "true").load("file:///databricks/driver/zipFiles/s3Sensor/2017/Tracking_Bounces_20190906.csv.zip/Bounces.csv")                       
except Exception as e:
    BaseLogs("INFO", e)
    BaseLogs("INFO", "Reading {0} as Spark Dataframe".format("file://" +  file  + ".csv"))
    df = self.spark.read.format("csv").option("header", True).option("inferSchema", "true").load("file://" + file + ".csv")  

Я получил следующую ошибку:

2019-10-24T15: 16: 25,321 + 0000: [GC (Ошибка распределения) [PSYoungGen: 470370K-> 14308K (630272K)] 479896K-> 30452K (886784K), 0,0209171 с] [Время: пользователь = 0,04 сис = 0,01, реальное = 0,02 с] 2019-10-24T15: 16: 25,977 + 0000: [GC (порог GC метаданных) [PSYoungGen: 211288K-> 20462K (636416K)] 227432K-> 64316K (892928K), 0,0285984 с] [Время: пользователь = 0,04 сс = 0,02, реальный = 0,02 с] 2019-10-24T15: 16: 26,006 + 0000: [Полный GC (порог GC для метаданных) [PSYoungGen:20462K-> 0K (636416K)] [ParOldGen: 43854K-> 55206K (377344K)] 64316K-> 55206K (1013760K), [метапространство: 58323K-> 58323K (1099776K)], 0,1093583 с] [времена: пользователь = 0]0,02, реальное = 0,12 с] 2019-10-24T15: 16: 28,333 + 0000:[GC (Ошибка распределения) [PSYoungGen: 612077K-> 23597K (990720K)] 667283K-> 78811K (1368064K), 0,0209207 с] [Время: пользователь = 0,02 сс = 0,01, реальный = 0,02 с]] INFO: ошибкапроизошла при вызове o195.load. : org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задание 0 на этапе 0.0 не выполнено 4 раза, последний сбой: потерянное задание 0.3 на этапе 0.0 (TID 3, 172.31.252.216, исполнитель 0): java.io.FileNotFoundException: файл file: /databricks/driver/zipFiles/s3Sensor/2017/Tracking_Bounces_20190906.csv.zip/Bounces.csv не существует Это невозможно, базовые файлы были обновлены. Вы можете явно аннулировать кэш в Spark, запустив команду «REFRESH TABLE tableName» в SQL или воссоздав соответствующий Dataset / DataFrame. в org.apache.spark.sql.execution.datasources.FileScanRDD $$ anon $ 1 $$ anon $ 2.getNext (FileScanRDD.scala: 248) в org.apache.spark.util.NextIterator.hasNext (NextIterator.scala: 73)

Ответы [ 2 ]

0 голосов
/ 24 октября 2019

Вы можете попытаться прочитать ваши данные в фрейме данных panda:

import pandas as pd
pdf = pd.read_csv("file:/databricks/driver/xyz.csv")

и преобразовать их в искровой фрейм данных:

df = spark.createDataFrame(pdf)
0 голосов
/ 24 октября 2019

Попробуйте это

scala> val test = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("file:///path/to/csv/testcsv.csv")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...