Подсчитать сотрудника по идентификатору отдела и определить два лучших отдела с наибольшим количеством идентификаторов сотрудников - PullRequest
0 голосов
/ 04 октября 2019

Первый пользователь Spark. Я создал RDD для двух CSV-файлов (сотрудников и отдела). Я хотел бы предоставить вывод, который подсчитывает количество сотрудников по идентификатору отдела, а также идентифицирует два верхних названия отдела с наибольшим количеством идентификаторов сотрудников. "deptno" - мой первичный ключ, но я не знаю, как соединить два файла вместе.

Файл сотрудника содержит следующие столбцы: [empno, ename, job, mgr, hiredate, sal, comm, deptno]

Файл dept содержит следующие столбцы: [deptno, dname,location]

Вот что я сделал до сих пор:

`employees_rdd = sc.textFile("/FileStore/tables/Employee.csv")
employees_rdd.take(3)
header_e = employees_rdd.first()
employees1 = employees_rdd.filter(lambda row : row != header_e)
employees1.take(1)`

`dept_rdd = sc.textFile("/FileStore/tables/Dept.csv")
dept_rdd.take(3)
header_d = dept_rdd.first()
dept1 = dept_rdd.filter(lambda row : row != header_d)
dept1.take(1)`

`employees2 = employees1.map(lambda row : row.split(","))
employees_kv = employees2.map(lambda row : (row[7],1))
employees_kv.take(3)`

Получение синтаксической ошибки ниже:

employee_kv.reduceByKey (лямбда x, y: x+ y) .takeOrdered (2, лямбда (x, y): -1 * y)

Любая помощь приветствуется.

Ответы [ 2 ]

0 голосов
/ 06 октября 2019

Вот мой код pyspark, чтобы сделать это. Я принял операторы чтения с разделителем "|".

from pyspark.sql.functions import *
from pyspark.sql.types import *

emp = spark.read.option("header","true") \
                .option("inferSchema","true") \
                .option("sep","|") \
                .csv("/FileStore/tables/employee.txt")

dept = spark.read.option("header","true") \
                 .option("inferSchema","true") \
                 .option("sep","|") \
                 .option("removeQuotes","true") \
                 .csv("/FileStore/tables/department.txt")

# Employee count by department
empCountByDept = emp.groupBy("deptno") \
                       .agg(count("empno").alias("no_of_employees"))

empCountByDept.show(20,False)

# Top two department names with the most employees 
topTwoDept = empCountByDept.join(dept, empCountByDept.deptno == dept.deptno, "inner") \
                           .orderBy(empCountByDept.no_of_employees.desc()).drop(dept.deptno) \
                           .select("dname","no_of_employees") \
                           .limit(2)
topTwoDept.show(20,False)

Результат ::

+------+---------------+
|deptno|no_of_employees|
+------+---------------+
|20    |5              |
|10    |3              |
|30    |6              |
+------+---------------+
+----------+---------------+
|dname     |no_of_employees|
+----------+---------------+
|'Sales'   |6              |
|'Research'|5              |
+----------+---------------+
0 голосов
/ 05 октября 2019

Я настоятельно рекомендую использовать Dataframes / DataSets. Не только потому, что их проще использовать и ими манипулировать, но и обеспечивают серьезное улучшение производительности. Даже искровой документ рекомендует то же самое.

Вот ваш код кадра данных: -

val spark = SparkSession.builder.master("local[*]").getOrCreate

Это создает SparkSession, который является точкой входа в приложение.

Теперь давайте прочитаем файлы ваших сотрудников и отделов.

val employeeDF = spark.read.format("csv").option("header","true").load("/path/to/employee/file")
val deptDF = spark.read.format("csv").option("header","true").load("/path/to/dept/file")

Теперь присоединиться довольно просто. Оператор ниже создаст кадр данных, который будет результатом внутреннего соединения между employeeDF и deptDF в столбце deptno

val joinedDF = employeeDF.join(deptDF,Seq("deptno"))

Теперь вы можете использовать joinedDF для получения результатов.

val countByDept = joinedDF.groupBy($"deptno").count
//countByDept.show to display the results

val top2Dept = joinedDF.groupBy($"dname").count.orderBy($"count".desc).limit(2)
//top2Dept.show to display the results

Надеюсь, это поможет вам начать путешествие с Spark DataFrames и DataSets.

...