Фрейм данных заменяет нулевые значения каждой строки уникальным временем эпохи - PullRequest
0 голосов
/ 09 октября 2018

У меня есть 3 строки в кадрах данных и в 2 строки, столбец id имеет нулевые значения.Мне нужно перебрать каждую строку в этом конкретном идентификаторе столбца и заменить на время эпохи, которое должно быть уникальным и должно происходить в самом фрейме данных.Как это можно сделать?Например:

id | name
1    a
null b
null c

Я хотел этот кадр данных, который преобразует ноль в время эпохи.

id     |     name
1             a
1435232       b
1542344       c

Ответы [ 2 ]

0 голосов
/ 09 октября 2018

Проверьте это

scala>  val s1:Seq[(Option[Int],String)] = Seq( (Some(1),"a"), (null,"b"), (null,"c"))
s1: Seq[(Option[Int], String)] = List((Some(1),a), (null,b), (null,c))

scala> val df = s1.toDF("id","name")
df: org.apache.spark.sql.DataFrame = [id: int, name: string]

scala> val epoch = java.time.Instant.now.getEpochSecond
epoch: Long = 1539084285

scala> df.withColumn("id",when( $"id".isNull,epoch).otherwise($"id")).show
+----------+----+
|        id|name|
+----------+----+
|         1|   a|
|1539084285|   b|
|1539084285|   c|
+----------+----+


scala>

EDIT1:

Я использовал миллисекунды, затем также я получаю те же значения.Spark не захватывает нано секунды во времени.Возможно, что многие строки могут получить одинаковые миллисекунды.Таким образом, ваше предположение о получении уникальных значений, основанных на эпохе, не будет работать.

scala> def getEpoch(x:String):Long = java.time.Instant.now.toEpochMilli
getEpoch: (x: String)Long

scala> val myudfepoch = udf( getEpoch(_:String):Long )
myudfepoch: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,LongType,Some(List(StringType)))

scala> df.withColumn("id",when( $"id".isNull,myudfepoch('name)).otherwise($"id")).show
+-------------+----+
|           id|name|
+-------------+----+
|            1|   a|
|1539087300957|   b|
|1539087300957|   c|
+-------------+----+


scala>

Единственная возможность - использовать monotonicallyIncreasingId, но эти значения могут не всегда иметь одинаковую длину.

scala> df.withColumn("id",when( $"id".isNull,myudfepoch('name)+monotonicallyIncreasingId).otherwise($"id")).show
warning: there was one deprecation warning; re-run with -deprecation for details
+-------------+----+
|           id|name|
+-------------+----+
|            1|   a|
|1539090186541|   b|
|1539090186543|   c|
+-------------+----+


scala>

EDIT2:

Я могу обмануть System.nanoTime и получить увеличивающиеся идентификаторы, но они не будут последовательными, но длина может быть сохранена.См. Ниже

scala> def getEpoch(x:String):String = System.nanoTime.toString.take(12)
getEpoch: (x: String)String

scala>  val myudfepoch = udf( getEpoch(_:String):String )
myudfepoch: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

scala> df.withColumn("id",when( $"id".isNull,myudfepoch('name)).otherwise($"id")).show
+------------+----+
|          id|name|
+------------+----+
|           1|   a|
|186127230392|   b|
|186127230399|   c|
+------------+----+


scala>

Попробуйте это при работе в кластерах и отрегулируйте take (12), если вы получите повторяющиеся значения.

0 голосов
/ 09 октября 2018
df
  .select(
    when($"id").isNull, /*epoch time*/).otherwise($"id").alias("id"),
    $"name"
  )

РЕДАКТИРОВАТЬ

Необходимо убедиться, что UDF достаточно точен - если он имеет разрешение только в миллисекундах, вы увидите дублированные значения.Посмотрите мой пример ниже, который ясно иллюстрирует мои подходы:

scala> def rand(s: String): Double = Math.random
rand: (s: String)Double

scala> val udfF = udf(rand(_: String))
udfF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,Some(List(StringType)))

scala> res11.select(when($"id".isNull, udfF($"id")).otherwise($"id").alias("id"), $"name").collect
res21: Array[org.apache.spark.sql.Row] = Array([0.6668195187088702,a], [0.920625293516218,b])
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...