Я пытаюсь реализовать функцию org.apache.spark.rdd.NewHadoopRDD
mapPartitionsWithInputSplit
в scala.
mapPartitionsWithInputSplit
определение функции:
RDD<U> mapPartitionsWithInputSplit ( scala.Function2<org.apache.hadoop.mapreduce.InputSplit,scala.collection.Iterator<scala.Tuple2<K,V>>,scala.collection.Iterator<U>> f,
boolean preservesPartitioning, scala.reflect.ClassTag<U> evidence$1),
где scala.Function2<org.apache.hadoop.mapreduce.InputSplit, scala.collection.Iterator<scala.Tuple2<K,V>>,scala.collection.Iterator<U>>
на самом деле является интерфейсом с одним методом Java, который необходимо создать в параметре mapPartitionsWithInputSplit
, как в реализации анонимной функции Java.
определение Function2
:
public interface Function2<T1, T2, R> extends Serializable {
public R call(T1 v1, T2 v2) throws Exception;
}
Нужны предложения по определению этой анонимной функции с использованием scala.
При написании на Java такой же фрагмент кода может быть:
mapPartitionsWithInputSplit(new Function2<InputSplit, Iterator<Tuple2<LongWritable, Text>>, Iterator<Tuple2<String,String>> >(){
@Override
public Iterator<Tuple2<String,String>> call(InputSplit arg0,
Iterator<Tuple2<LongWritable, Text>> dataIterator) throws Exception {
FileSplit fileSplit = (FileSplit) arg0;
//Retrieve the file name from the split
String fileLocation = fileSplit.getPath().toString();
List<Tuple2<String,String>> retList = new LinkedList<Tuple2<String,String>>();
while(dataIterator.hasNext())
{
String data = dataIterator.next()._2.toString();
retList.add(new Tuple2<String,String>(fileLocation,data));
}
return retList.iterator();
}
},true)