Как узнать, какой раздел в данный момент запущен при использовании функции foreachPartition () в pyspark? - PullRequest
0 голосов
/ 21 июня 2020

У меня есть требование сохранять разделы в текстовый файл с разными именами для каждого раздела. Но при выполнении нижеприведенного фрагмента кода сохраняется только один файл путем перезаписи предыдущего раздела.

def chunks(iterator):
    chunks.counter += 1
    l = (list(iterator))
    df = pd.DataFrame(l,index=None)
    df.to_csv(parent_path+"C"+str(chunks.counter+1)+".txt", header=None, index=None, sep=' ')

chunks.counter=0
sc.parallelize([1,2,3,4,5,6],num_partions).foreachPartition(chunks)

Есть ли способ узнать, какой раздел в настоящее время работает в pySpark?

1 Ответ

0 голосов
/ 21 июня 2020
def chunks(lst, n):
    # Yield successive n-sized chunks from the lst...
    for i in (range(0, len(lst), n)):
        yield i, lst[i:i + n]

for (index, values) in chunks(range(0, 1e5), 1e3): # change this to int's as per your need otherwise it will give float error or will write range obj itself..
    with open(f"{parent_path}_C_{index}.txt", "w") as output:
        output.write(str(values)) # converting to str

И вы даже можете легко обернуть это в joblib;) На мой взгляд, PySpark как таковой нам не нужен ..

...