Когда я запускаю сценарий flink scala REPL в java, я не могу скомпилировать.
Я пробовал этот код Java для запуска Flink scala REPL для теста, ошибка всегда исключение.
Settings settings = new Settings();
((MutableSettings.BooleanSetting) settings.usejavacp()).value_$eq(true);
IMain main = new IMain(settings, new PrintWriter(System.out));
// Thread.currentThread().setContextClassLoader(main.classLoader());
for (String imp : imports) {
main.interpret(MessageFormat.format("import {0}", imp));
}
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
String script = FileUtils.readFileToString(new File("/opt/project/security-detection/sappo/src/sappo-interpreter/src/test/resources/demo.txt"), StandardCharsets.UTF_8);
main.bind(new NamedParamClass("env", ExecutionEnvironment.class.getName(), env));
main.interpret(script);
scala text
val text = env.fromElements("Who's there?", "I think I hear them. Stand, ho! Who's there?")
// result 1
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } map { (_, 1) } groupBy(0) sum(1)
counts.print()
// result 2
val counts = text.map((x:String) => 1)
counts.print()
// result 3
text.print()
результат 1
import org.apache.flink.core.fs._
import org.apache.flink.core.fs.local._
import org.apache.flink.api.common.io._
import org.apache.flink.api.common.aggregators._
import org.apache.flink.api.common.accumulators._
import org.apache.flink.api.common.distributions._
import org.apache.flink.api.common.operators._
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
import org.apache.flink.api.common.functions._
import org.apache.flink.api.java.io._
import org.apache.flink.api.java.aggregation._
import org.apache.flink.api.java.functions._
import org.apache.flink.api.java.operators._
import org.apache.flink.api.java.sampling._
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.utils._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time._
env: org.apache.flink.api.java.ExecutionEnvironment = Local Environment (parallelism = 8) : ee335d29eefca69ee5fe7279414fc534
console:67: error: missing parameter type for expanded function ((x$1) => x$1.toLowerCase.split("\\W+").filter(((x$2) => x$2.nonEmpty)))
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } map { (_, 1) } groupBy(0) sum(1)
результат 2
import org.apache.flink.core.fs._
import org.apache.flink.core.fs.local._
import org.apache.flink.api.common.io._
import org.apache.flink.api.common.aggregators._
import org.apache.flink.api.common.accumulators._
import org.apache.flink.api.common.distributions._
import org.apache.flink.api.common.operators._
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
import org.apache.flink.api.common.functions._
import org.apache.flink.api.java.io._
import org.apache.flink.api.java.aggregation._
import org.apache.flink.api.java.functions._
import org.apache.flink.api.java.operators._
import org.apache.flink.api.java.sampling._
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.utils._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time._
env: org.apache.flink.api.java.ExecutionEnvironment = Local Environment (parallelism = 8) : 5cbf8e476ebf32fd8fdf91766bd40af0
console:71: error: type mismatch;
found : String => Int
required: org.apache.flink.api.common.functions.MapFunction[String,?]
val counts = text.map((x:String) => 1)
результат 3
import org.apache.flink.core.fs._
import org.apache.flink.core.fs.local._
import org.apache.flink.api.common.io._
import org.apache.flink.api.common.aggregators._
import org.apache.flink.api.common.accumulators._
import org.apache.flink.api.common.distributions._
import org.apache.flink.api.common.operators._
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
import org.apache.flink.api.common.functions._
import org.apache.flink.api.java.io._
import org.apache.flink.api.java.aggregation._
import org.apache.flink.api.java.functions._
import org.apache.flink.api.java.operators._
import org.apache.flink.api.java.sampling._
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.utils._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time._
env: org.apache.flink.api.java.ExecutionEnvironment = Local Environment (parallelism = 8) : ee335d29eefca69ee5fe7279414fc534
Who's there?
I think I hear them. Stand, ho! Who's there?
text: org.apache.flink.api.java.operators.DataSource[String] = org.apache.flink.api.java.operators.DataSource@53e28097
PASSED: testIMain
PASSED: testIMainScript