Мне пришлось разархивировать файлы из Amazon S3 в мой узел драйвера (кластер Spark), и мне нужно загрузить все эти CSV-файлы в виде Spark Dataframe, но я обнаружил следующую проблему при попытке загрузить данные из драйвераузел:
PySpark:
df = self.spark.read.format("csv").option("header", True).load("file:/databricks/driver/*.csv")
'Путь не существует: файл: / folder / *. csv'
Я пытался переместитьвсе эти файлы в dbfs, используя dbutils.fs.mv (), но я использую файл Python и не могу использовать dbutils (). Я думаю, что мне нужно передать файл, но я не знаю, как, потому что я пытался с self.sc.textFile("file:/databricks/driver/*.csv").collect()
и self.sc.addFile("file:/databricks/driver/*.csv")
и процесс не может найти файлы.
ОБНОВЛЕНИЕ Когда я запустил этот код:
import os
BaseLogs("INFO", os.getcwd())
folders = []
for r, d, f in os.walk(os.getcwd()):
for folder in d:
folders.append(os.path.join(r, folder))
for f in folders:
BaseLogs("INFO", f)
BaseLogs("INFO", os.listdir("/databricks/driver/zipFiles/s3Sensor/2017/Tracking_Bounces_20190906.csv.zip"))
BaseLogs("INFO", os.listdir("/databricks/driver/zipFiles/s3Sensor/2017/Tracking_Opens_20190907.zip"))
я получил:
Затем я попытался сделать:
try:
df = self.spark.read.format("csv").option("header", True).option("inferSchema", "true").load("file:///databricks/driver/zipFiles/s3Sensor/2017/Tracking_Bounces_20190906.csv.zip/Bounces.csv")
except Exception as e:
BaseLogs("INFO", e)
BaseLogs("INFO", "Reading {0} as Spark Dataframe".format("file://" + file + ".csv"))
df = self.spark.read.format("csv").option("header", True).option("inferSchema", "true").load("file://" + file + ".csv")
Я получил следующую ошибку:
2019-10-24T15: 16: 25,321 + 0000: [GC (Ошибка распределения) [PSYoungGen: 470370K-> 14308K (630272K)] 479896K-> 30452K (886784K), 0,0209171 с] [Время: пользователь = 0,04 сис = 0,01, реальное = 0,02 с] 2019-10-24T15: 16: 25,977 + 0000: [GC (порог GC метаданных) [PSYoungGen: 211288K-> 20462K (636416K)] 227432K-> 64316K (892928K), 0,0285984 с] [Время: пользователь = 0,04 сс = 0,02, реальный = 0,02 с] 2019-10-24T15: 16: 26,006 + 0000: [Полный GC (порог GC для метаданных) [PSYoungGen:20462K-> 0K (636416K)] [ParOldGen: 43854K-> 55206K (377344K)] 64316K-> 55206K (1013760K), [метапространство: 58323K-> 58323K (1099776K)], 0,1093583 с] [времена: пользователь = 0]0,02, реальное = 0,12 с] 2019-10-24T15: 16: 28,333 + 0000:[GC (Ошибка распределения) [PSYoungGen: 612077K-> 23597K (990720K)] 667283K-> 78811K (1368064K), 0,0209207 с] [Время: пользователь = 0,02 сс = 0,01, реальный = 0,02 с]] INFO: ошибкапроизошла при вызове o195.load. : org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задание 0 на этапе 0.0 не выполнено 4 раза, последний сбой: потерянное задание 0.3 на этапе 0.0 (TID 3, 172.31.252.216, исполнитель 0): java.io.FileNotFoundException: файл file: /databricks/driver/zipFiles/s3Sensor/2017/Tracking_Bounces_20190906.csv.zip/Bounces.csv не существует Это невозможно, базовые файлы были обновлены. Вы можете явно аннулировать кэш в Spark, запустив команду «REFRESH TABLE tableName» в SQL или воссоздав соответствующий Dataset / DataFrame. в org.apache.spark.sql.execution.datasources.FileScanRDD $$ anon $ 1 $$ anon $ 2.getNext (FileScanRDD.scala: 248) в org.apache.spark.util.NextIterator.hasNext (NextIterator.scala: 73)