Преобразование метки времени в эпоху в Spark (Java) - PullRequest
1 голос
/ 07 марта 2019

У меня есть столбец с типом Timestamp в формате yyyy-MM-dd HH:mm:ss в кадре данных.

Столбец отсортирован по времени, когда более ранняя дата находится в более ранней строке

КогдаЯ выполнил эту команду

List<Row> timeRows = df.withColumn(ts, df.col(ts).cast("long")).select(ts).collectAsList();

Я столкнулся со странной проблемой, когда значение более поздней даты меньше, чем более ранняя дата.Пример:

[670] : 1550967304 (2019-02-23 04:30:15)
[671] : 1420064100 (2019-02-24 08:15:04)

Это правильный способ преобразования в эпоху или есть другой способ?

Ответы [ 3 ]

1 голос
/ 07 марта 2019

Попробуйте использовать unix_timestamp, чтобы преобразовать строковое время даты в отметку времени. Согласно документу:

unix_timestamp (Column s, String p) Преобразовать строку времени с заданным образец (см. [http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html ]) в метку времени Unix (в секундах), возвращать ноль в случае неудачи.

import org.apache.spark.functions._  

val format = "yyyy-MM-dd HH:mm:ss"
df.withColumn("epoch_sec", unix_timestamp($"ts", format)).select("epoch_sec").collectAsList()

Также см. https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-functions-datetime.html

0 голосов
/ 08 марта 2019

Я думаю, что вы смотрите на использование: unix_timestamp()

Из которого вы можете импортировать:

import static org.apache.spark.sql.functions.unix_timestamp;

И использовать как:

df = df.withColumn(
    "epoch",
    unix_timestamp(col("date")));

И здесьэто полный пример, где я попытался имитировать ваш сценарий использования:

package net.jgp.books.spark.ch12.lab990_others;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.from_unixtime;
import static org.apache.spark.sql.functions.unix_timestamp;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * Use of from_unixtime() and unix_timestamp().
 * 
 * @author jgp
 */
public class EpochTimestampConversionApp {

  /**
   * main() is your entry point to the application.
   * 
   * @param args
   */
  public static void main(String[] args) {
    EpochTimestampConversionApp app = new EpochTimestampConversionApp();
    app.start();
  }

  /**
   * The processing code.
   */
  private void start() {
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
        .appName("expr()")
        .master("local")
        .getOrCreate();

    StructType schema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField(
            "event",
            DataTypes.IntegerType,
            false),
        DataTypes.createStructField(
            "original_ts",
            DataTypes.StringType,
            false) });

    // Building a df with a sequence of chronological timestamps
    List<Row> rows = new ArrayList<>();
    long now = System.currentTimeMillis() / 1000;
    for (int i = 0; i < 1000; i++) {
      rows.add(RowFactory.create(i, String.valueOf(now)));
      now += new Random().nextInt(3) + 1;
    }
    Dataset<Row> df = spark.createDataFrame(rows, schema);
    df.show();
    df.printSchema();

    // Turning the timestamps to Timestamp datatype
    df = df.withColumn(
        "date",
        from_unixtime(col("original_ts")).cast(DataTypes.TimestampType));
    df.show();
    df.printSchema();

    // Turning back the timestamps to epoch
    df = df.withColumn(
        "epoch",
        unix_timestamp(col("date")));
    df.show();
    df.printSchema();

    // Collecting the result and printing out
    List<Row> timeRows = df.collectAsList();
    for (Row r : timeRows) {
      System.out.printf("[%d] : %s (%s)\n",
          r.getInt(0),
          r.getAs("epoch"),
          r.getAs("date"));
    }
  }
}

И вывод должен быть:

...
[994] : 1551997326 (2019-03-07 14:22:06)
[995] : 1551997329 (2019-03-07 14:22:09)
[996] : 1551997330 (2019-03-07 14:22:10)
[997] : 1551997332 (2019-03-07 14:22:12)
[998] : 1551997333 (2019-03-07 14:22:13)
[999] : 1551997335 (2019-03-07 14:22:15)

Надеюсь, это поможет.

0 голосов
/ 07 марта 2019

Вы должны использовать встроенную функцию unix_timestamp () в org.apache.spark.sql.functions

https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/sql/functions.html#unix_timestamp()

...