Как добавить новый столбец в dataframe и заполнить столбец? - PullRequest
1 голос
/ 28 июня 2019

Добавьте новый столбец с именем Download_Type к кадру данных с условиями:

Если размер <100 000, Download_Type = «Маленький» </p>

Если размер> 100 000 и размер <1 000 000, Download_Type = «Средний”</p>

Else Download_Type =« Large »

Входные данные: log_file.txt

Пример данных« дата »,« время »,« размер »,« r_version »,«r_arch "," r_os "," package "," version "," country "," ip_id "" 2012-10-01 "," 00:30:13 ", 35165," 2.15.1 "," i686 ","linux-gnu", "quadprog", "1.5-4", "AU", 1

Я создал кадр данных, выполнив следующие действия:

val file1 =  sc.textFile(“log_file.txt”)

val header = file1.first

val logdata = file1.filter(x=>x!=header)

case class Log(date:String, time:String, size: Double, r_version:String, r_arch:String, r_os:String, packagee:String, version:String, country:String, ipr:Int)

val logfiledata = logdata.map(_.split(“,”)),map(p=>Log(p(0),p(1),p(2).toDouble,p(3),p(4),p(5),p(6),p(7),p(8),p(9).toInt))

val logfiledf = logfiledata.toDF()

Я выделил столбец размераи преобразовал его в массив:

val size = logfiledf.select($"size")

val sizearr = size.collect.map(row=>row.getDouble(0))

Я сделал функцию, чтобы я мог заполнить недавно добавленный столбец:

def exp1(size:Array[Double])={

var result = ""

for(i <- 0 to (size.length-1)){

if(size(i)<100000) result += "small"

else(if(size(i) >=100000 && size(i) <1000000) "medium"

else "large"

}

return result

}

Я попытался это заполнить столбец Download_Type:

val logfiledf2 = logfiledf.withColumn("Download_Type", expr(exp1(sizearr))

Как можно заполнить новый столбец с именем Download_type условиями:

Если размер <100 000, Download_Type = «Маленький» </p>

Если размер> 100 000 и размер <1 000 000,Download_Type =«Средний» </p>

Иначе Download_Type = «Большой»?

1 Ответ

3 голосов
/ 28 июня 2019

Вы можете просто применить withColumn к загруженному фрейму данных logfiledf, используя when/otherwise, как показано ниже:

import org.apache.spark.sql.functions._
import spark.implicits._

val logfiledf = Seq(
  ("2012-10-01","00:30:13",35165.0,"2.15.1","i686","linux-gnu","quadprog","1.5-4","AU",1),
  ("2012-10-02","00:40:14",150000.0,"2.15.1","i686","linux-gnu","quadprog","1.5-4","US",2)
).toDF("date","time","size","r_version","r_arch","r_os","package","version","country","ip_id")

logfiledf.withColumn("download_type", when($"size" < 100000, "Small").otherwise(
    when($"size" < 1000000, "Medium").otherwise("Large")
  )
).show
// +----------+--------+--------+---------+------+---------+--------+-------+-------+-----+-------------+
// |      date|    time|    size|r_version|r_arch|     r_os| package|version|country|ip_id|download_type|
// +----------+--------+--------+---------+------+---------+--------+-------+-------+-----+-------------+
// |2012-10-01|00:30:13| 35165.0|   2.15.1|  i686|linux-gnu|quadprog|  1.5-4|     AU|    1|        Small|
// |2012-10-02|00:40:14|150000.0|   2.15.1|  i686|linux-gnu|quadprog|  1.5-4|     US|    2|       Medium|
// +----------+--------+--------+---------+------+---------+--------+-------+-------+-----+-------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...