Объединить столбец отметки времени со строковым столбцом - PullRequest
0 голосов
/ 28 января 2019

Мне нужно отправить данные из Кассандры в Elasticsearch.Загружен фрейм данных из cassandra, но столбец с именем timestamp имеет формат Long, поэтому мне нужно было изменить его на timestamp, чтобы сделать его более "читаемым человеком", что я и сделал с:

val cassDF2 = spark.createDataFrame(rawCass).withColumn("timestamp", ($"timestamp").cast(TimestampType))

Dataframe теперь выглядит следующим образом:

+--------------------+--------------------+-------------+--------------------+--------------------+
|             eventID|           timestamp|       userID|           sessionID|            fullJson|
+--------------------+--------------------+-------------+--------------------+--------------------+
|event00001.withSa...| 2018-11-15 09:00...|2512988381908|  WITH_EVENTS_IMPORT|{"header": {"appI...|
|event00002.withSa...| 2018-11-15 09:00...|2512988381908|WITH_EVENTS_SESSI...|{"body": {}, "hea...|
|event00003.withPa...| 2018-11-15 09:00...|2006052984315|  WITH_EVENTS_IMPORT|{"header": {"appI...|
+--------------------+--------------------+-------------+--------------------+--------------------+

Теперь мне нужно объединить 3 столбца (seesionID, userID and timestamp) в новый (docID) и передать его в ES:

  // concatStrings function
  val concatStrings = udf((userID: String, timestamp: String, eventID: String) => {userID + timestamp + eventID})

  // create column docID
  val cassDF = cassDF2.withColumn("docID", concatStrings($"userID", $"timestamp", $"eventID"))

Получение ошибки:

org.apache.spark.sql.AnalysisException: «отметка времени» не является числовым столбцом.Функция агрегирования может быть применена только к числовому столбцу.

Я знаю, timestamp после вызова .cast теперь объект и не может быть агрегирован, как раньше (когда это был тип Long),но как извлечь его значение в виде строки или что-то, что может быть агрегировано.

Все, что я могу получить, это сделать, пока столбец timestamp равен Long.

Мой окончательный фрейм данных должен выглядеть как cassDF2, но с новым столбцом docID, который содержит 251929883819082018-12-09T12:25:25.904+0100event00001.withSa... вместо 15147612000002512988381908event00001.withSa... в docID

1 Ответ

0 голосов
/ 28 января 2019

Нет необходимости в UDF.Вы можете использовать встроенный метод concat для объединения столбцов, включая столбец timestamp в формате строки с определенным форматом даты, как показано ниже:

import spark.implicits._
import org.apache.spark.sql.functions._
import java.sql.Timestamp

val df = Seq(
  ("1001", Timestamp.valueOf("2018-11-15 09:00:00"), "Event1"),
  ("1002", Timestamp.valueOf("2018-11-16 10:30:00"), "Event2")
).toDF("userID", "timestamp", "eventID")

val dateFormat = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"

df.
  withColumn("docID", concat($"userID", date_format($"timestamp", dateFormat), $"eventID")).
  show(false)
// +------+-------------------+-------+--------------------------------------+
// |userID|timestamp          |eventID|docID                                 |
// +------+-------------------+-------+--------------------------------------+
// |1001  |2018-11-15 09:00:00|Event1 |10012018-11-15T09:00:00.000-0800Event1|
// |1002  |2018-11-16 10:30:00|Event2 |10022018-11-16T10:30:00.000-0800Event2|
// +------+-------------------+-------+--------------------------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...