зажечь код Java для Python - PullRequest
       8

зажечь код Java для Python

0 голосов
/ 02 июня 2018

Я получаю часть проверенных данных, например:

from pyspark.sql.types import DateType
import pyspark
from pyspark.sql import SQLContext

sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)

#columns names and each values
    columnMock = ['date','user_id','session_id','page_id','action_time','search_keyword','click_category_id','click_product_id',
                  'order_category_ids','order_product_ids','pay_category_ids','pay_product_ids','city_id']
    valsMock = [
    ('2017-03-04','1','2984','54684','2017-03-0418:02:03','dog','fjsd3','jf94fj','fk430','f4j89','rebj89','fejq9','GZ'),
    ('2017-03-04','2','294','9242','2017-03-0418:07:03','apple','fr343','jf94fj','fk430','f4j89','rebj89','fejq9','SH'),
    ('2017-03-04','1','2984','425','2017-03-0418:51:03','car','fbyt3','jf94fj','fk430','f4j89','rebj89','fejq9','BJ'),
    ('2017-03-04','2','294','92356','2017-03-0419:02:03','water','dad93','jf94fj','fk430','f4j89','rebj89','fejq9','HZ'),
    ('2017-03-04','1','2984','4014','2017-03-0419:22:03','wine','brt3','jf94fj','fk430','f4j89','rebj89','fejq9','GZ'),
    ('2017-03-04','2','294','4562','2017-03-0419:55:03','tiger','s21493','jf94fj','fk430','f4j89','rebj89','fejq9','GZ'),
    ('2017-03-04','1','2984','567','2017-03-0420:02:03','camel','rb493','jf94fj','fk430','f4j89','rebj89','fejq9','GZ'),
    ('2017-03-04','2','294','5372','2017-03-0431:02:03','glass','325g93','jf94fj','fk430','f4j89','rebj89','fejq9','GZ')
    ]


df = sqlContext.createDataFrame(valsMock, columnMock)
df.createOrReplaceTempView("sessionLog")

, поскольку у python нет обобщений, как в java, как я могу изменить эту функцию?Я имею в виду использовать тип карты в Python для перевода, чтобы я мог использовать groupByKey для действия на JavaPairRDD<String, Row> getSessionid2ActionRDD, как в Java, но не знаю, как его выполнить.

Java-функция:

public static JavaPairRDD<String, Row> getSessionid2ActionRDD(JavaRDD<Row> actionRDD) {


    return actionRDD.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Row>, String, Row>() {

        private static final long serialVersionUID = 1L;

        @Override
        public Iterable<Tuple2<String, Row>> call(Iterator<Row> iterator)
                throws Exception {
            List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();

            while(iterator.hasNext()) {
                Row row = iterator.next();
                list.add(new Tuple2<String, Row>(row.getString(2), row));  
            }

            return list;
        }

    });
}

1 Ответ

0 голосов
/ 02 июня 2018

перевод функции java в pyspark выглядит следующим образом:

def createPairedTuple(partIter):
    return [(row[2], row) for row in partIter]

, и вы можете вызвать его, преобразовав ваш фрейм данных в rdd и используя mapPartitions как

df.rdd.mapPartitions(lambda partitionIterator: createPairedTuple(partitionIterator))

Или, что еще лучше, вы можете просто встроить функцию внутрь mapPartitions как

df.rdd.mapPartitions(lambda partitionIterator: [(row[2], row) for row in partitionIterator])

Надеюсь, ответ будет полезным

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...