Время выполнения раздела карты в искре с использованием аккумулятора - PullRequest
2 голосов
/ 27 апреля 2020

Я пытался найти время выполнения раздела карты

    long start=System.nanoTime();
    MapPartitionsFunction<Row, Row> mapPartitionsFunction = new MapPartitionsFunction<Row, Row>() {
        @Override
        public Iterator<Row> call(final Iterator<Row> iterator) throws Exception {
            return lookUpDataFrame(iterator);
        }
    };

    df = dataset.mapPartitions(mapPartitionsFunction, RowEncoder.apply(lookUpSchema));
    long end=System.nanoTime();
    long timeTaken=(end-start)/1000000000;

Действие относится к другому классу, который я не могу контролировать.

I знаю, что это выполнение занимает 100 секунд, тогда как приведенный выше код дал мне 1 секунду в качестве ответа. Я сделал некоторые поиски и обнаружил, что это связано с ленивым исполнением искры. Теперь я попытался использовать аккумулятор для этой цели и передал его в функцию разделения.

LongAccumulator longAccumulator=spark.sparkContext().longAccumulator();
MapPartitionsFunction<Row, Row> mapPartitionsFunction = new MapPartitionsFunction<Row, Row>() {
    @Override
    public Iterator<Row> call(final Iterator<Row> iterator) throws Exception {
        return lookUpDataFrame(iterator,longAccumulator);
    }
};

и сделал метод поиска следующим образом: -

public static Iterator<Row> lookUpDataFrame(Iterator<Row> partition, LongAccumulator longAccumulator){
    long start=System.nanoTime();
    //my logic.....
    long end=System.nanoTime();
    longAccumulator.add((end-start)/1000000000);
    //rest of code
}

Я думал, что я разделю значение аккумулятора по количеству исполнителей (в моем случае это 36), но это значение равно 18 секундам, что не близко к тому, что я ожидаю.

Я попробовал другую аппроксимацию, где я вызвал действие сразу после раздела, как этот

long start=System.nanoTime();
MapPartitionsFunction<Row, Row> mapPartitionsFunction = new MapPartitionsFunction<Row, Row>() {
    @Override
    public Iterator<Row> call(final Iterator<Row> iterator) throws Exception {
        return lookUpDataFrame(iterator);
    }
};

df = dataset.mapPartitions(mapPartitionsFunction, RowEncoder.apply(lookUpSchema));
df=df.persist();
long count=df.count();
long end=System.nanoTime();
long timeTaken=(end-start)/1000000000;

Но это приводило к проблемам с производительностью и памятью из-за количества и продолжительности использования. Тем не менее, он дал правильный ответ.

Что мне делать? Я уже искал другие ответы, но не смог решить проблему.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...