Чтение содержимого из файлов, находящихся в Zip-файле, с использованием Scala / pysprak, а также из блоков данных и файлов, хранящихся в ADLS - PullRequest
0 голосов
/ 19 сентября 2019

У меня есть Zip-файл 1,3 ГБ и внутри него TXT-файл с разделенным запятыми форматом, который имеет 6 ГБ.Эта zip-папка находится в хранилище озера данных Azure и, используя принцип обслуживания, смонтирована в файловой системе DBFS Databricks.При использовании обычного кода Python для извлечения файла размером 6 ГБ, я получаю 1,98 ГБ как извлеченный файл.

Пожалуйста, предложите способ непосредственного чтения txt-файла и сохранения его в виде искры Dataframe.

Я пытался использовать код Python, но прямое чтение из Python дает ошибку - Error tokenizing data. C error: Expected 2 fields in line 371, увидел 3, это также было исправлено с помощью кодирования UTF-16-LE, но после этого получил ошибку - ConnectException: Connection refused (Connection refused) on Databricks while trying to display the df.head().

import pandas as pd
import zipfile

zfolder = zipfile.ZipFile('dbfszipath') 
zdf = pd.read_csv(zfolder.open('6GBtextfile.txt'),error_bad_lines=False,encoding='UTF-16-LE')
zdf.head()

Извлечение кода -

import pandas as pd
import zipfile

zfolder = zipfile.ZipFile('/dbfszippath')
zfolder.extract(dbfsexrtactpath) 

Кадр данных должен содержать все данные при непосредственном чтении через папку zip, а также отображать некоторые данные и не должен зависать кластер Databricks.Нужны варианты в Scala или Pyspark.

1 Ответ

0 голосов
/ 22 сентября 2019

Отказ в соединении происходит из-за настроек памяти, которые есть в Databricks и spark.Вам нужно будет увеличить размер пособия, чтобы избежать этой ошибки.

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext

conf=SparkConf()
conf.set("spark.executor.memory", "4g")
conf.set("spark.driver.memory", "4g")

В этом случае выделенная память составляет 4 ГБ, поэтому измените ее при необходимости.

Другое решение будет следующим:

import zipfile
import io

def zip_extract(x):
    in_memory_data = io.BytesIO(x[1])
    file_obj = zipfile.ZipFile(in_memory_data, "r")
    files = [i for i in file_obj.namelist()]
    return dict(zip(files, [file_obj.open(file).read() for file in files]))


zips = sc.binaryFiles("somerandom.zip")
files_data = zips.map(zip_extract)

Дайте мне знать, если это работает или в чем ошибка в этом случае.

[Источник]

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...