Apache Flink: странное поведение FlatMap - PullRequest
0 голосов
/ 30 октября 2018

Я принимаю поток данных во Flink. Для каждого «экземпляра» этих данных у меня есть временная метка. Я могу определить, является ли машина, с которой я получаю данные, «производящей» или «не производящей», это делается с помощью пользовательской функции плоской карты, которая находится в ее собственном статическом классе.

Я хочу посчитать, сколько времени машина производит / не производит. Мой текущий подход заключается в сборе временных и непроизводственных временных меток в двух простых списках. Для каждого «экземпляра» данных я рассчитываю текущую производственную / непроизводственную продолжительность путем вычитания самой последней временной отметки из самой ранней временной отметки. Это дает мне неверные результаты, хотя. Когда состояние производства изменяется от производства к производству, я очищаю список временных меток для производства и наоборот, так что, если производство начинается снова, продолжительность начинается с нуля.

Я просмотрел два списка, в которых я собираю соответствующие метки времени, и вижу вещи, которые не понимаю. Я предполагаю, что до тех пор, пока машина «производит», первая временная метка в списке временных меток остается неизменной, а новые метки времени добавляются в список для каждого нового экземпляра данных. Очевидно, это предположение неверно, поскольку я получаю, казалось бы, случайные метки времени в списках. Они все еще правильно упорядочены.

Вот мой код для функции flatmap:

public static class ImaginePaperDataConverterRich extends RichFlatMapFunction<ImaginePaperData, String> {
    private static final long serialVersionUID = 4736981447434827392L;
    private transient ValueState<ProductionState> stateOfProduction;
    SimpleDateFormat dateFormat = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss.SS");
    DateFormat timeDiffFormat = new SimpleDateFormat("dd HH:mm:ss.SS");
    String timeDiffString = "00 00:00:00.000";
    List<String> productionTimestamps = new ArrayList<>();
    List<String> nonProductionTimestamps = new ArrayList<>();

    public String calcProductionTime(List<String> timestamps) {
        if (!timestamps.isEmpty()) {
            try {
                Date firstDate = dateFormat.parse(timestamps.get(0));
                Date lastDate = dateFormat.parse(timestamps.get(timestamps.size()-1));
                long timeDiff = lastDate.getTime() - firstDate.getTime();

                if (timeDiff < 0) {
                    System.out.println("Something weird happened. Maybe EOF.");
                    return timeDiffString;
                }

                timeDiffString = String.format("%02d %02d:%02d:%02d.%02d",
                    TimeUnit.MILLISECONDS.toDays(timeDiff),
                    TimeUnit.MILLISECONDS.toHours(timeDiff)   % TimeUnit.HOURS.toHours(1),
                    TimeUnit.MILLISECONDS.toMinutes(timeDiff) % TimeUnit.HOURS.toMinutes(1),
                    TimeUnit.MILLISECONDS.toSeconds(timeDiff) % TimeUnit.MINUTES.toSeconds(1),
                    TimeUnit.MILLISECONDS.toMillis(timeDiff)  % TimeUnit.SECONDS.toMillis(1));

            } catch (ParseException e) {
                e.printStackTrace();
            }
            System.out.println("State duration: " + timeDiffString);
        }
        return timeDiffString;
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<ProductionState> descriptor = new ValueStateDescriptor<>(
            "stateOfProduction",
            TypeInformation.of(new TypeHint<ProductionState>() {}),
            ProductionState.NOT_PRODUCING);
            stateOfProduction = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void flatMap(ImaginePaperData ImaginePaperData, Collector<String> output) throws Exception {
        List<String> warnings = new ArrayList<>();
        JSONObject jObject = new JSONObject();
        String productionTime = "0";
        String nonProductionTime = "0";

        // Data analysis
        if (stateOfProduction == null || stateOfProduction.value() == ProductionState.NOT_PRODUCING && ImaginePaperData.actSpeedCl > 60.0) {
            stateOfProduction.update(ProductionState.PRODUCING);
        } else if (stateOfProduction.value() == ProductionState.PRODUCING && ImaginePaperData.actSpeedCl < 60.0) {
            stateOfProduction.update(ProductionState.NOT_PRODUCING);
        }

        if(stateOfProduction.value() == ProductionState.PRODUCING) {
            if (!nonProductionTimestamps.isEmpty()) {
                System.out.println("Production has started again, non production timestamps cleared");
                nonProductionTimestamps.clear();
            }
            productionTimestamps.add(ImaginePaperData.timestamp);

            System.out.println(productionTimestamps);
            productionTime = calcProductionTime(productionTimestamps);
        } else {
            if(!productionTimestamps.isEmpty()) {
                System.out.println("Production has stopped, production timestamps cleared");
                productionTimestamps.clear();
            }
            nonProductionTimestamps.add(ImaginePaperData.timestamp);
            warnings.add("Production has stopped.");

            System.out.println(nonProductionTimestamps);
            //System.out.println("Production stopped");
            nonProductionTime = calcProductionTime(nonProductionTimestamps);
        }
// The rest is just JSON stuff

Должен ли я хранить эти два списка временных меток в ListState?

РЕДАКТИРОВАТЬ: Потому что другой пользователь спросил, вот данные, которые я получаю.

{'szenario': 'machine01', 'timestamp': '31.10.2018 09:18:39.432069', 'data': {1: 100.0, 2: 100.0, 101: 94.0, 102: 120.0, 103: 65.0}}

Я ожидаю, что моя программа flink собирает временные метки в двух списках productionTimestamps и nonProductionTimestamps. Затем я хочу, чтобы метод calcProductionTime вычитал последнюю временную метку в списке из первой временной метки, чтобы получить продолжительность между первым обнаружением того, что машина "производит" / "не производит", и временем, когда она перестала "производить" / " не продуцирующий».

1 Ответ

0 голосов
/ 05 ноября 2018

Я обнаружил, что причиной «на первый взгляд случайных» временных отметок является параллельное выполнение Apache Flink. Когда параллелизм установлен на> 1, порядок событий больше не гарантируется.

Мое быстрое решение состояло в том, чтобы установить параллелизм моей программы в 1, насколько я знаю, это гарантирует порядок событий.

...