Существуют ли коды для расчета среднего значения столбца с использованием pyspark? - PullRequest
0 голосов
/ 14 мая 2019

Я хочу рассчитать среднее значение для каждого города (группового города), используя RDD и данные. (Я хочу все два пути)

Кроме того, как я могу игнорировать первые две строки заголовка при вычислении среднего значения?

Я пытался использовать RDD и кадры данных. Но все были неудачны.

Это входной CSV-файл.

город, тип, enterdate, enternum

縣市, 軍種 類別, 入 營 日期, 入 營 人數

臺北市, 陸軍, 1040713.150

臺北市, 陸軍, 1040813.210

臺北市, 陸軍, 1040827.180

臺北市, 陸軍, 1040915.300

Это код Python:

if __name__=="__main__":

#RDD
rdd = sc.textFile("junren.csv").flatMap(lambda line: line.split(",")[3])
rdd.mean().show()

#datefrmae
sqlContext = SQLContext(sc)
df = sqlContext.read.load('junren.csv',format='com.databricks.spark.csv',header='true',inferSchema='true',encoding='UTF-8') 

df.mean("enternum").show()
#df.groupBy('city').agg(avg(col('enternum'))).show()

это ошибка для фрейма данных:

Traceback (последний вызов был последним): File «C: \ Users \ pc95 \ eclipse-workspace \ demo2 \ src \ Test7.py», строка 49, в df.mean ("enternum"). show () Файл "C: \ Users \ pc95 \ Downloads \ spark-2.4.2-bin-hadoop2.7 \ python \ pyspark \ sql \ dataframe.py", строка 1300, в getattr "'% s' объект не имеет атрибута '% s'"% (self. class . name , name)) AttributeError: у объекта 'DataFrame' нет атрибута 'mean'

Это ошибка для СДР:

org.apache.spark.api.python.PythonException: обратная связь (самая последняя последний звонок): File "C: \ Users \ pc95 \ Downloads \ искровым 2.4.2-бен-hadoop2.7 \ питон \ Lib \ pyspark.zip \ pyspark \ worker.py", строка 377, в основном файле "C: \ Users \ pc95 \ Downloads \ искровым 2.4.2-бен-hadoop2.7 \ питон \ Lib \ pyspark.zip \ pyspark \ worker.py", строка 372, в файле процесса "C: \ Users \ pc95 \ Downloads \ искровым 2.4.2-бен-hadoop2.7 \ питон \ pyspark \ rdd.py", строка 2499, в pipe_func return func (split, prev_func (split, iterator)) Файл "C: \ Users \ pc95 \ Downloads \ spark-2.4.2-bin-hadoop2.7 \ python \ pyspark \ rdd.py", строка 2499, в pipe_func return func (split, prev_func (split, iterator)) Файл "C: \ Users \ pc95 \ Downloads \ spark-2.4.2-bin-hadoop2.7 \ python \ pyspark \ rdd.py", строка 352, в фунц вернуть f (итератор) файл "C: \ Users \ pc95 \ Downloads \ spark-2.4.2-bin-hadoop2.7 \ python \ pyspark \ rdd.py", строка 1065, в вернуть файл self.mapPartitions (лямбда i: [StatCounter (i)]). уменьшить (redFunc) файл "C: \ Users \ pc95 \ Downloads \ искровым 2.4.2-бен-hadoop2.7 \ питон \ Lib \ pyspark.zip \ pyspark \ statcounter.py", строка 43, в init Файл self.merge (v) "C: \ Users \ pc95 \ Downloads \ spark-2.4.2-bin-hadoop2.7 \ python \ lib \ pyspark.zip \ pyspark \ statcounter.py", строка 47, в слиянии delta = значение - self.mu

19/05/15 04:46:01 ОШИБКА TaskSetManager: Ошибка задачи 1 на этапе 0.0 1 раз; прерывание задания (последний вызов): файл «C: \ Users \ pc95 \ eclipse-workspace \ demo2 \ src \ Test7.py», строка 40, в rdd.mean (). show () Файл "C: \ Users \ pc95 \ Downloads \ spark-2.4.2-bin-hadoop2.7 \ python \ pyspark \ rdd.py", строка 1202, в среднем вернуть self.stats (). mean () файл "C: \ Users \ pc95 \ Downloads \ spark-2.4.2-bin-hadoop2.7 \ python \ pyspark \ rdd.py", строка 1065, в статистике вернуть файл self.mapPartitions (лямбда i: [StatCounter (i)]). уменьшить (redFunc) файл "C: \ Users \ pc95 \ Downloads \ искровым 2.4.2-бен-hadoop2.7 \ питон \ pyspark \ rdd.py", строка 844, в уменьшении vals = self.mapPartitions (func) .collect () Файл "C: \ Users \ pc95 \ Downloads \ spark-2.4.2-bin-hadoop2.7 \ python \ pyspark \ rdd.py", строка 816, в сборе Файл sock_info = self.ctx._jvm.PythonRDD.collectAndServe (self._jrdd.rdd ()) "C: \ Users \ pc95 \ Downloads \ искровым 2.4.2-бен-hadoop2.7 \ питон \ Lib \ py4j-0.10.7-src.zip \ py4j \ java_gateway.py", строка 1257, вызов Файл "C: \ Users \ pc95 \ Downloads \ spark-2.4.2-bin-hadoop2.7 \ python \ lib \ py4j-0.10.7-src.zip \ py4j \ protocol.py", строка 328, в get_return_value py4j. protocol.Py4JJavaError: Ошибка произошло во время звонка г: org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: задание прервано из-за сбоя этапа: Задача 1 на этапе 0.0 не выполнена 1 раз, последний сбой: потерянная задача 1.0 на этапе 0.0 (TID 1, localhost, драйвер исполнителя): org.apache.spark.api.python.PythonException: трассировка (самая последняя последний звонок): File "C: \ Users \ pc95 \ Downloads \ искровым 2.4.2-бен-hadoop2.7 \ питон \ Lib \ pyspark.zip \ pyspark \ worker.py", строка 377, в основном файле "C: \ Users \ pc95 \ Downloads \ искровым 2.4.2-бен-hadoop2.7 \ питон \ Lib \ pyspark.zip \ pyspark \ worker.py", строка 372, в файле процесса "C: \ Users \ pc95 \ Downloads \ искровым 2.4.2-бен-hadoop2.7 \ питон \ pyspark \ rdd.py", строка 2499, в pipe_func return func (split, prev_func (split, iterator)) Файл "C: \ Users \ pc95 \ Downloads \ spark-2.4.2-bin-hadoop2.7 \ python \ pyspark \ rdd.py", строка 2499, в pipe_func return func (split, prev_func (split, iterator)) Файл "C: \ Users \ pc95 \ Downloads \ spark-2.4.2-bin-hadoop2.7 \ python \ pyspark \ rdd.py", строка 352, в фунц вернуть f (итератор) файл "C: \ Users \ pc95 \ Downloads \ spark-2.4.2-bin-hadoop2.7 \ python \ pyspark \ rdd.py", строка 1065, в вернуть файл self.mapPartitions (лямбда i: [StatCounter (i)]). уменьшить (redFunc) файл "C: \ Users \ pc95 \ Downloads \ искровым 2.4.2-бен-hadoop2.7 \ питон \ Lib \ pyspark.zip \ pyspark \ statcounter.py", строка 43, в init Файл self.merge (v) "C: \ Users \ pc95 \ Downloads \ spark-2.4.2-bin-hadoop2.7 \ python \ lib \ pyspark.zip \ pyspark \ statcounter.py", строка 47, в слиянии delta = value - self.mu TypeError: неподдерживаемые типы операндов для -: 'str' и 'float'

1 Ответ

0 голосов
/ 14 мая 2019

Начиная с последней строки ошибки, я считаю, что ваш столбец имеет тип 'String'. Поэтому я преобразую его в тип Integer с использованием метода приведения SQL.

from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

#Entire column average
df.select(F.avg(df.enternum.cast(IntegerType()))).show()

#City wise average
df.groupby('City').agg(F.avg(df.enternum.cast(IntegerType())).alias('Average_enternum')).show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...