«Преобразование» данных датчика с помощью Apache Spark Java - PullRequest
0 голосов
/ 19 мая 2019

У меня есть источник OPC, который генерирует данные датчика, и мне нужно «преобразовать» данные.

данные датчика хранятся в CSV-файле и выглядят так:

Sensor    Value  Timestamp
Sensor 1  1234   XYZ
Sensor 2  1342   XYZ+1
Sensor 3  ...
...
Sensor 1  1434   XYZ+n
Sensor 2  1523   XYZ+n+1
Sensor 3  ...
...

Контекст данных находится на заводе, каждый раз, когда производство заканчивается, новый продукт попадает на заводскую линию и датчик запускается снова

Структура назначения должна быть такой:

Product   Sensor 1  Sensor 2  Sensor  ...
X         1234      1342      ...
X+1       1434      1523      ...
...   

Я новичок в Apache Spark на Java и не знаю, как преобразовать эти данные.Будем благодарны за любую помощь.

Спасибо за вашу помощь!

Обновление

Как уже упоминалось, в исходных данных нет ссылки наконкретный продукт.В данных назначения нет отметки времени.

Моя идея заключалась в агрегировании данных.Каждый раз, когда Датчик 1 имеет значение, это новый продукт.Это означает, что с помощью Timestamp и Sensorname вы можете получить продукт.

Но я действительно собираюсь включить это в код.Если у вас есть идеи по этому поводу, я был бы очень рад!

Ответы [ 2 ]

0 голосов
/ 22 мая 2019

Сам нашел ответ:

Я добавил новую колонку, которая разбита на конкретный продукт, а затем:

Dataset <Row> df = oldf
                        .groupBy("Product")
                        .pivot("Sensor")
                        .agg(functions.first(oldf.col("Value")))    

Очень доволен этим!

0 голосов
/ 19 мая 2019

Я мало что знаю об искре, но это можно сделать с помощью Java, как показано ниже:

Подход

1. Считать данные на строку из существующего CSV вa List.
2. Введите список, чтобы собрать данные для каждого датчика в карту, т. е. сопоставить имя датчика с продуктом.
3. Выполните итерацию по карте, чтобы иметь 2D-матрицу, где размер будет равен максимальному размеру столбцалюбой датчик на количество клавиш, т.е. датчик
Следовательно, код для того же

public static void main(String[] args) throws IOException  {    
    Path filePath = new File("C:\\Users\\Sample\\Untitled.csv").toPath();
    Charset charset = Charset.defaultCharset();        
    List<String> lines = Files.readAllLines(filePath, charset);

    Map<String, List<String>> map = new HashMap<String, List<String>>();

    for(int i=1; i<lines.size(); i++) {
        String[] data = lines.get(i).split(",");
        List<String> sesnorDataList = map.get(data[0]);
        if(sesnorDataList!=null) {
            sesnorDataList.add(data[1]);
        }else {
            List<String> value = new ArrayList<String>();
            value.add(data[1]);
            map.put(data[0], value);
        }
    }
    //to find max size of list among all sensors
    int maxLength = 0;

    for (List<String> list : map.values()) {
        if(list.size()>=maxLength) {
            maxLength = list.size();
        }
    }
    String[][] csvData = new String[maxLength+1][map.keySet().size()];

    int counter=0;
    for (String header : map.keySet()) {
        csvData[0][counter] = header;
        counter++;
    }
    counter=0;
    for (Entry<String, List<String>> entry : map.entrySet()) {
        List<String> value = entry.getValue();
        for(int i=0 ;i<value.size(); i++) {
            csvData[i+1][counter] = value.get(i);
        }
        counter++;
    }

    //Printing 2D array can be written to another csv
    for(int i=0; i<csvData.length;i++) {
        for(int j=0; j<csvData[i].length; j++) {
            System.out.print(csvData[i][j]+",");
        }
        System.out.println();
    }   
}

вход

Sensor,Value,Timestamp
Sensor 1,1234,XYZ
Sensor 2,1342,XYZ+1
Sensor 3,2545,XYZ+3
Sensor 1,1434,XYZ+n
Sensor 2,1523,XYZ+n+1
Sensor 3,7112,XYZ+8

выход

Sensor 2,Sensor 3,Sensor 1,
1342,2545,1234,
1523,7112,1434,
...