MapPartition в PySpark - PullRequest
       5

MapPartition в PySpark

0 голосов
/ 15 января 2020

Я чрезвычайно новичок в Python и не очень знаком с синтаксисом. Я смотрю на некоторые примеры реализации метода pyspark mappartitions. Чтобы лучше сформулировать вопрос, я написал Java Эквивалент того, что мне нужно.

JavaRDD<Row> modified =  auditSet.javaRDD().mapPartitions(new FlatMapFunction<Iterator<Row>, Row>() {

            public Iterator<Row> call(Iterator<Row> t) throws Exception {
                Iterable<Row> iterable = () -> t;
                return StreamSupport.stream(iterable.spliterator(), false).map(m -> enrich(m)).iterator();
            }
            private Row enrich(Row r) {
                //<code to enrich row r
                return RowFactory.create(/*new row from enriched row r*/);
            }

});

У меня есть rdd. Мне нужно назвать mappartitions на нем. Я не уверен, как передать / обработать итератор внутри python. Как только вызов достигает метода, я пытаюсь перебрать каждую запись, обогатить ее и вернуть результат.

Любая помощь приветствуется.

1 Ответ

0 голосов
/ 15 января 2020

Не уверен, что это правильный путь, но это то, что я сделал. Открыта для комментариев и исправлений.

auditSetDF.rdd.mapPartitions(lambda itr:mpImpl(itr,locationListBrdcast))

def mpImpl(itr,broadcastList):
        lst=broadcastList.value
        for x in itr:
                yield enrich(x,lst)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...