Spark - улучшено время поиска совпадений строк между списком имен и заархивированными файлами - PullRequest
0 голосов
/ 04 мая 2020

Я пытаюсь найти список строк в списке заархивированных файлов, используя Spark. Ниже приведен рабочий код, который я использую. Сохраненный список строк в словаре с ключом int. Я строю разделенный запятыми список совпадающих ключей int для каждого файла в функции fnMatch. Хотя код работает, его выполнение занимает несколько часов. Какая оптимизация позволяет сократить время выполнения?

#Function to extract zip files 
def zip_extract(x):
    in_memory_data = io.BytesIO(x[1])
    file_obj = zipfile.ZipFile(in_memory_data, "r")
    files = [i for i in file_obj.namelist()]
    return dict(zip(files, [file_obj.open(file).read() for file in files]))


def safeStr(obj):
    try: return str(obj)
    except UnicodeEncodeError:
        return obj.encode('ascii', 'ignore').decode('ascii')
    except: return ""

#Function to match string list contained in dictionary dcust, one by one against file doctext
def fnMatch(doctext,dcust):
  retval=""
  for k in dcust:
     if dcust[k] in doctext:
        retval=retval+","+str(k)

  return retval


schema = StructType([StructField('fpath', StringType(), True),StructField('docText', StringType(), True)])
zips = sc.binaryFiles('hdfs://hp3/user/test/testhdfs/myzipfile.zip')  
files_data = zips.map(zip_extract)
files_data_flat = files_data.flatMap(lambda x: x.items())
files_data_flat_tfm = files_data_flat.map(lambda x: (safeStr(x[0]),safeStr(x[1])))
df = hc.createDataFrame(files_data_flat_tfm,schema)
df2 = df.withColumn("docLength", size_(col("docText"))  ) 


dfcust = hc.sql('select fullname from tbl_custfull').toPandas()
res=len(dfcust)
print "##################################################"+str(res)+"##############################"
dictcust = dfcust.to_dict().values()[0]
strmatches = udf(lambda x: fnMatch(x,dictcust), StringType())

df2 = df2.withColumn("strMatches", strmatches(col("docText"))  ) 

df2.createOrReplaceTempView ("df2")
dfres=hc.sql("SELECT fpath,docLength,strMatches FROM df2 WHERE length(strMatches) >0")
dfres.show(5)

Я отправляю задание на зажигание, используя

spark-submit \
--conf spark.executor.memory=20g \
--conf spark.executor.cores=5 \
--conf spark.executor.instances=139 \
--conf spark.driver.maxResultSize=8g \
--files /etc/spark2/conf/hive-site.xml \
--master yarn \
--deploy-mode cluster \
myprogram.py

Что мне следует избегать, чтобы улучшить производительность? Пробовал менять исполнительную память и ядра, но особой разницы нет. В списке около 270 тыс. Строк и 60 тыс. Документов

1 Ответ

0 голосов
/ 04 мая 2020

расширение zip не разделяется, поэтому его нельзя распараллелить на уровне записи. Если вы можете использовать сжатие GZIP, производительность значительно улучшится. Кроме того, чтение всего файла в память, имеющее карту sh, и последующий поиск не нужны и не масштабируются для больших файлов. Если используется сжатие gzip, Spark может автоматически распаковывать и искать вас. Вот пример в Scala.

  val spark : SparkSession = SparkSession.builder
    .appName("Test")
    .master("local[2]")
    .config("spark.ui.enabled",false)
    .getOrCreate()

  import spark.implicits._

  val test = spark.read.text("/Users/..../temp/test.txt.gz")
  test.filter(r => r.getAs[String]("value").contains("gzip")).show(false)

Входным файлом является test.gzip, содержащий

this is just testing
to see that this test is passing
gzip is a splittable format
test to see it runs in parallel

Результат

zgip is a splittable format

...