Реализовать функцию NewHadoopRDD.mapPartitionsWithInputSplit в Scala - PullRequest
0 голосов
/ 07 февраля 2019

Я пытаюсь реализовать функцию org.apache.spark.rdd.NewHadoopRDD mapPartitionsWithInputSplit в scala.

mapPartitionsWithInputSplit определение функции:

RDD<U> mapPartitionsWithInputSplit ( scala.Function2<org.apache.hadoop.mapreduce.InputSplit,scala.collection.Iterator<scala.Tuple2<K,V>>,scala.collection.Iterator<U>> f, 
boolean preservesPartitioning, scala.reflect.ClassTag<U> evidence$1),

где scala.Function2<org.apache.hadoop.mapreduce.InputSplit, scala.collection.Iterator<scala.Tuple2<K,V>>,scala.collection.Iterator<U>> на самом деле является интерфейсом с одним методом Java, который необходимо создать в параметре mapPartitionsWithInputSplit, как в реализации анонимной функции Java.

определение Function2:

public interface Function2<T1, T2, R> extends Serializable {
    public R call(T1 v1, T2 v2) throws Exception;
}

Нужны предложения по определению этой анонимной функции с использованием scala.

При написании на Java такой же фрагмент кода может быть:

mapPartitionsWithInputSplit(new Function2<InputSplit, Iterator<Tuple2<LongWritable, Text>>, Iterator<Tuple2<String,String>> >(){

    @Override
    public Iterator<Tuple2<String,String>> call(InputSplit arg0,
            Iterator<Tuple2<LongWritable, Text>> dataIterator) throws Exception {
            FileSplit fileSplit = (FileSplit) arg0;
            //Retrieve the file name from the split
            String fileLocation = fileSplit.getPath().toString();
            List<Tuple2<String,String>> retList = new LinkedList<Tuple2<String,String>>();
            while(dataIterator.hasNext())
            {
                String data = dataIterator.next()._2.toString();
                retList.add(new Tuple2<String,String>(fileLocation,data));
            }
            return retList.iterator();
    }   
},true)
...