Spark-Java: Как изменить формат метки времени столбцов в наборе данных <Row>? - PullRequest
0 голосов
/ 11 февраля 2019

Я хочу сопоставить свои поля меток времени в наборе данных со значениями, такими как 2018-08-17T19:58:46.000+0000, с форматом, подобным 2018-08-17 19:58:46.000, т.е. гггг-мм-дд ЧЧ: мм: ss.SSS , а некоторые столбцы - гггг-мм-дд .

Например, у меня есть набор данных DS1 со столбцами id, lastModif, создан :

+------------------+----------------------------+----------------------------+
|Id                |lastModif                   |created                     |
+------------------+----------------------------+----------------------------+
|abc1              |2019-01-14T19:51:55.000+0000|2019-02-07T20:37:53.000+0000|
|AQA2              |2019-02-05T19:26:36.000+0000|2019-02-07T20:40:06.000+0000|
+------------------+----------------------------+----------------------------+ 

Сверху DS1 мне нужен *Столбец 1016 * сопоставлен с форматом yyyy-MM-dd HH:mm:ss.SSS, а столбец createdTime сопоставлен с yyyy-MM-dd.
У меня есть аналогичные DS2, DS3 с другим сопоставлением столбцов.
Я сохранил файл свойств, из которого он будет извлекать сопоставлениестолбцы в качестве ключей и формат отметки времени в качестве значений.
В коде я сохраняю список столбцов сопоставления и столбцов без сопоставления и выбираю столбец:

String cols = "Id,created,lastModif";
String[] colArr = cols.split(",");
String mappedCols = "lastModif,created"; //hardcoding as of now.

List<String> mappedColList = Arrays.asList(mappedCols.split(","));
String nonMappedCols = getNonMappingCols(colArr, mappedCols.split(",")).toLowerCase();
List<String> nonMapped = Arrays.asList(nonMappedCols.split(","));

//column-mapping logic
filtered = tempDS.selectExpr(convertListToSeq(nonMapped),unix_timestamp($"lastModif","yyyy-MM-dd HH:mm:ss.SSS").cast("timestamp").as("lastModif"));
filtered.show(false);


public static Seq<String> convertListToSeq(List<String> inputList)
{
    return JavaConverters.asScalaIteratorConverter(inputList.iterator()).asScala().toSeq();
}

private static String getNonMappingCols(String[] cols, String[] mapped)
{
    String nonMappedCols = "";
    List<String> mappedList = Arrays.asList(mapped);

    for(int i=0; i<cols.length; i++)
    {
        if(!mappedList.contains(cols[i])){
            nonMappedCols += cols[i]+",";               
        }
    }
    nonMappedCols = nonMappedCols.substring(0, nonMappedCols.length()-1);

    return nonMappedCols;
}

Как сопоставитьв столбец с требуемым форматом метки времени?
А в строке кода tempDS.selectExpr(convertListToSeq(nonMapped),unix_timestamp($"lastModif","yyyy-MM-dd HH:mm:ss.SSS").cast("timestamp").as("lastModif")); * * * * * * * * * $"lastModif" не идентифицируется в Java.
И, во-вторых, этот способ является статическим способом, то есть жестким кодированием столбца сопоставления.Как мне сопоставить столбцы из моего List<String> mappedColList?

1 Ответ

0 голосов
/ 15 февраля 2019

Вот как я сделал отображение динамическим:

private static Dataset<Row> mapColumns(Properties mappings, String tableNm, String[] colArr, Dataset<Row> tempDS) throws Exception
{
    String mappedCols = "lastmodif,createdDate,endDate";
    Dataset<Row> filtered = null;
    Properties mappingCols = mappings;
    List<String> mapped = Arrays.asList(mappedCols.split(","));

    List<String> colsList = Arrays.asList(colArr);
    ArrayList<String> tempList = new ArrayList<String>();
    Iterator itrTmp = colsList.iterator();
    while(itrTmp.hasNext()){
        tempList.add((String)itrTmp.next());
    }

    Iterator itr = mapped.iterator();
    filtered = tempDS.selectExpr(convertListToSeq(colsList));

    while(itr.hasNext()){
        String column = itr.next().toString();
        String newCol = column+"_mapped";
        String propertyKey = tableNm+"-"+column;
        String propertyValue = mappingCols.getProperty(propertyKey);

        filtered = filtered.selectExpr(convertListToSeq(colsList))
                .withColumn(newCol, functions.regexp_replace(functions.substring(filtered.col(column), 0, 23),"T", " ")).alias(newCol)
                .drop(filtered.col(column));

        tempList.remove(column);
        tempList.add(newCol);
        colsList = tempList;
    }

    filtered = filtered.selectExpr(convertListToSeq(colsList)); 
    filtered.show(false);
}

public static Seq<String> convertListToSeq(List<String> inputList)
{
    return JavaConverters.asScalaIteratorConverter(inputList.iterator()).asScala().toSeq();
}

Но преобразование String в Timestamp все еще ожидается.На данный момент я делаю substring, но эта логика для всех столбцов, чьи данные имеют тип yyyy-mm-ddThh:mm:ss.SSSZ или yyyy-mm-ddThh:mm:ss.SSS+0000 и т. Д., Но не будет работать, если столбец имеет данные типа yyyy-mm-dd и код сломается,Я поднял это здесь: как преобразовать строку в метку времени .

...