Я использовал кусок кода на сайте Flink, чтобы подключить Apache Flink к Elastic Search. Я хочу запустить этот кусок кода из программного обеспечения NetBeans через проект maven.
public class FlinkElasticCon {
public static void main(String[] args) throws Exception {
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap((String value, Collector<WordWithCount> out) -> {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
.reduce(new ReduceFunction<WordWithCount>() {
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
// print the results with a single thread, rather than in parallel
env.execute("Socket Window WordCount");
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("", 9200, "http"));
httpHosts.add(new HttpHost("", 9200, "http"));
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
windowCounts.addSink((SinkFunction<WordWithCount>) esSinkBuilder);
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
public String toString() {
return word + " : " + count;
При добавлении Dependency он не идентифицирует классasticsearchsink. Учитывая, что я добавил разные зависимости, но проблема до сих пор не решена. При импорте:
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
Красная строка создается как неизвестная в коде.
my pom:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-
instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-
apache flink version: 1.8.1asticsearch версия: 7.4.2 версия NetBeans: 8.2 Java-версия: 8
пожалуйста, помогите мне.