Я получаю часть проверенных данных, например:
from pyspark.sql.types import DateType
import pyspark
from pyspark.sql import SQLContext
sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)
#columns names and each values
columnMock = ['date','user_id','session_id','page_id','action_time','search_keyword','click_category_id','click_product_id',
'order_category_ids','order_product_ids','pay_category_ids','pay_product_ids','city_id']
valsMock = [
('2017-03-04','1','2984','54684','2017-03-0418:02:03','dog','fjsd3','jf94fj','fk430','f4j89','rebj89','fejq9','GZ'),
('2017-03-04','2','294','9242','2017-03-0418:07:03','apple','fr343','jf94fj','fk430','f4j89','rebj89','fejq9','SH'),
('2017-03-04','1','2984','425','2017-03-0418:51:03','car','fbyt3','jf94fj','fk430','f4j89','rebj89','fejq9','BJ'),
('2017-03-04','2','294','92356','2017-03-0419:02:03','water','dad93','jf94fj','fk430','f4j89','rebj89','fejq9','HZ'),
('2017-03-04','1','2984','4014','2017-03-0419:22:03','wine','brt3','jf94fj','fk430','f4j89','rebj89','fejq9','GZ'),
('2017-03-04','2','294','4562','2017-03-0419:55:03','tiger','s21493','jf94fj','fk430','f4j89','rebj89','fejq9','GZ'),
('2017-03-04','1','2984','567','2017-03-0420:02:03','camel','rb493','jf94fj','fk430','f4j89','rebj89','fejq9','GZ'),
('2017-03-04','2','294','5372','2017-03-0431:02:03','glass','325g93','jf94fj','fk430','f4j89','rebj89','fejq9','GZ')
]
df = sqlContext.createDataFrame(valsMock, columnMock)
df.createOrReplaceTempView("sessionLog")
, поскольку у python нет обобщений, как в java, как я могу изменить эту функцию?Я имею в виду использовать тип карты в Python для перевода, чтобы я мог использовать groupByKey для действия на JavaPairRDD<String, Row> getSessionid2ActionRDD
, как в Java, но не знаю, как его выполнить.
Java-функция:
public static JavaPairRDD<String, Row> getSessionid2ActionRDD(JavaRDD<Row> actionRDD) {
return actionRDD.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Row>, String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<Tuple2<String, Row>> call(Iterator<Row> iterator)
throws Exception {
List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
while(iterator.hasNext()) {
Row row = iterator.next();
list.add(new Tuple2<String, Row>(row.getString(2), row));
}
return list;
}
});
}