У меня есть агрегат искры, который я хотел бы вывести результат в CSV, но я обнаружил, что искра всегда выводит большое количество десятичных знаков в научной нотации. Я пробовал решение, упомянутое в этом вопросе , но оно тоже не сработало.
Ожидаемый результат:
foo,avg(bar)
a,0.0000002
b,0.0000001
Фактический результат:
foo,avg(bar)
a,2.0E-7
b,1.0E-7
См. Пример ниже:
from os import path
import shutil
import glob
from pyspark.sql import SQLContext, functions as F, types
def test(sc):
sq = SQLContext(sc)
data = [("a", 1e-7), ("b", 1e-7), ("a", 3e-7)]
df = sq.createDataFrame(data, ['foo', 'bar'])
# 12 digits with 9 decimal places
decType = types.DecimalType(precision=12, scale=9)
# Cast both the column input and column output to Decimal
aggs = [F.mean(F.col("bar").cast(decType)).cast(decType)]
groups = [F.col("foo")]
result = df.groupBy(*groups).agg(*aggs)
write(result)
return df, aggs, groups, result
def write(result):
tmpDir = path.join("res", "tmp")
config = {"sep": ","}
result.write.format("csv")\
.options(**config)\
.save(tmpDir)
# Once the distributed portion is done, write out to a single a file
allFiles = glob.glob(path.join(tmpDir,"*.csv"))
fullOut = path.join("res", "final.csv")
with open(fullOut, 'wb') as wfd:
# First write out the header row
header = config.get("sep", ',').join(result.columns)
wfd.write(header + "\n")
for f in allFiles:
with open(f, 'rb') as fd:
shutil.copyfileobj(fd, wfd)
pass
pass
shutil.rmtree(tmpDir)
return
В оболочке pyspark:
import spark_test as t
t.test(sc)