Я попробовал этот код в Flink Scala Repl, и я строю RichMapFunction в Java, но Scala не может работать.
Кто-нибудь может дать мне ссылку на документ о стратегии перегрузки метода scala?
функция java rich map
public class RichMapFunctions {
public static class CountRichMapFunction<E, T> extends RichMapFunction<E, T> {
private static final long serialVersionUID = 1L;
private transient Counter counter;
@Override
public void open(Configuration parameters) throws Exception {
this.counter = getRuntimeContext().getMetricGroup().counter("demo");
}
@Override
public T map(E value) throws Exception {
this.counter.inc();
return (T) value;
}
}
}
Java FlinkIMain расширяет импорт IMain
private static String[] PACKAGE_IMPORTS = new String[] { //
"org.apache.flink.core.fs._", //
"org.apache.flink.core.fs.local._", //
"org.apache.flink.api.common.io._", //
"org.apache.flink.api.common.aggregators._", //
"org.apache.flink.api.common.accumulators._", //
"org.apache.flink.api.common.distributions._", //
"org.apache.flink.api.common.operators._", //
"org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint", //
"org.apache.flink.api.common.functions._", //
"org.apache.flink.api.java.io._", //
"org.apache.flink.api.java.aggregation._", //
"org.apache.flink.api.java.functions._", //
"org.apache.flink.api.java.operators._", //
"org.apache.flink.api.java.sampling._", //
"org.apache.flink.api.scala._", //
"org.apache.flink.api.scala.utils._", //
"org.apache.flink.streaming.api.scala._", //
"org.apache.flink.streaming.api.windowing.time._", //
"com.gaff.sappo.dag.streaming.interpreter.RichMapFunctions.CountRichMapFunction" //
};
скрипт scala для запуска repl
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.map(new CountRichMapFunction()) // this code is error
.keyBy(0)
val window = counts.timeWindow(Time.seconds(5)).sum(1)
window.print()
env.execute("Window Stream WordCount")
сообщение об ошибке json
{"data":"<console>:69: error: overloaded method value map with alternatives:"}
{"data":" [R](mapper: org.apache.flink.api.common.functions.MapFunction[(String, Int),R])(implicit evidence$9: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] <and>"}
{"data":" [R](fun: ((String, Int)) => R)(implicit evidence$8: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]"}
{"data":" cannot be applied to (com.gaff.sappo.dag.streaming.interpreter.RichMapFunctions.CountRichMapFunction[Nothing,Nothing])"}
{"data":" .map(new CountRichMapFunction())"}
{"data":" ^"}