Я пытаюсь узнать, как использовать Spark, кодирование на Java (пожалуйста, не используйте код Scala).Я пытаюсь реализовать очень простой привет мир пример Spark, подсчет слов.
Я позаимствовал код из документации Spark быстрый старт :
/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
public class SimpleApp {
public static void main(String[] args) {
String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
Dataset<String> logData = spark.read().textFile(logFile).cache();
long numAs = logData.filter(s -> s.contains("a")).count();
long numBs = logData.filter(s -> s.contains("b")).count();
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
spark.stop();
}
}
Все хорошо, теперь я хочу заменить filter
на flatMap
, а затем map
.Пока у меня есть flatMap
:
logData.flatMap((FlatMapFunction<String, String>) l -> {
return Arrays.asList(l.split(" ")).iterator();
}, Encoders.STRING());
Теперь я хочу сопоставить каждое слово с Tuple2 (word, 1)
, а затем сгруппировать их по ключу.Но проблема в том, что я не могу найти, как добраться от String
до (String, Long)
.В большинстве документов говорится о mapToPair
, но Dataset
не имеет такого метода!
Может ли кто-нибудь помочь мне сопоставить String
с Tuple2<String, Long>
?Кстати, я даже не уверен, ищу ли я Tuple2
или какой-то другой класс.
[ОБНОВЛЕНИЕ]
Основано на предложении, предоставленном @mangusta, я попробовал это:
logData.flatMap((FlatMapFunction<String, String>) l -> {
return Arrays.asList(l.split(" ")).iterator();
}, Encoders.STRING())
.map(new Function<String, Tuple2<String, Long>>() {
public Tuple2<String, Long> call(String str) {
return new Tuple2<String, Long>(str, 1L);
}
})
.count()
И столкнулся с этой ошибкой компиляции:
Error:(108, 17) java: no suitable method found for map(<anonymous org.apache.spark.api.java.function.Function<java.lang.String,scala.Tuple2<java.lang.String,java.lang.Long>>>)
method org.apache.spark.sql.Dataset.<U>map(scala.Function1<java.lang.String,U>,org.apache.spark.sql.Encoder<U>) is not applicable
(cannot infer type-variable(s) U
(actual and formal argument lists differ in length))
method org.apache.spark.sql.Dataset.<U>map(org.apache.spark.api.java.function.MapFunction<java.lang.String,U>,org.apache.spark.sql.Encoder<U>) is not applicable
(cannot infer type-variable(s) U
(actual and formal argument lists differ in length))
Похоже, функция map
принимает два параметра.Я не уверен, что я должен передать в качестве второго параметра.