Играя с Flink, я пытался перенести данные в Elasticsearch. У меня есть эта ошибка на моем STDOUT:
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in
the classpath.
Reason: Required context properties mismatch.
The following properties are requested:
connector.hosts=http://elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local:9200
connector.index=transfers-sum
connector.key-null-literal=n/a
connector.property-version=1
connector.type=elasticsearch
connector.version=6
format.json-schema={ \"curr_careUnit\": {\"type\": \"text\"}, \"sum\": {\"type\": \"float\"} }
format.property-version=1
format.type=json
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=curr_careUnit
schema.1.data-type=FLOAT
schema.1.name=sum
update-mode=upsert
The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
...
Вот то, что у меня есть в моем scala Коде Flink:
def main(args: Array[String]) {
// Create streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// Set properties per KafkaConsumer API
val properties = new Properties()
properties.setProperty("bootstrap.servers", "kafka.kafka:9092")
properties.setProperty("group.id", "test")
// Add Kafka source to environment
val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data4", new SimpleStringSchema(), properties)
// Read from beginning of topic
myKConsumer.setStartFromEarliest()
val streamSource = env
.addSource(myKConsumer)
// Transform CSV (with a header row per Kafka event into a Transfers object
val streamTransfers = streamSource.map(new TransfersMapper())
// create a TableEnvironment
val tEnv = StreamTableEnvironment.create(env)
// register a Table
val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
tEnv.createTemporaryView("transfers", tblTransfers)
tEnv.connect(
new Elasticsearch()
.version("6")
.host("elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local", 9200, "http")
.index("transfers-sum")
.keyNullLiteral("n/a")
.withFormat(new Json().jsonSchema("{ \"curr_careUnit\": {\"type\": \"text\"}, \"sum\": {\"type\": \"float\"} }"))
.withSchema(new Schema()
.field("curr_careUnit", DataTypes.STRING())
.field("sum", DataTypes.FLOAT())
)
.inUpsertMode()
.createTemporaryTable("transfersSum")
val result = tEnv.sqlQuery(
"""
|SELECT curr_careUnit, sum(los)
|FROM transfers
|GROUP BY curr_careUnit
|""".stripMargin)
result.insertInto("transfersSum")
env.execute("Flink Streaming Demo Dump to Elasticsearch")
}
}
Я создаю толстую банку и загружаю ее к моему удаленному экземпляру Flink. Вот мои зависимости build.gradle:
compile 'org.scala-lang:scala-library:2.11.12'
compile 'org.apache.flink:flink-scala_2.11:1.10.0'
compile 'org.apache.flink:flink-streaming-scala_2.11:1.10.0'
compile 'org.apache.flink:flink-connector-kafka-0.10_2.11:1.10.0'
compile 'org.apache.flink:flink-table-api-scala-bridge_2.11:1.10.0'
compile 'org.apache.flink:flink-connector-elasticsearch6_2.11:1.10.0'
compile 'org.apache.flink:flink-json:1.10.0'
compile 'com.fasterxml.jackson.core:jackson-core:2.10.1'
compile 'com.fasterxml.jackson.module:jackson-module-scala_2.11:2.10.1'
compile 'org.json4s:json4s-jackson_2.11:3.7.0-M1'
Вот как команда farJar создана для gradle:
jar {
from {
(configurations.compile).collect {
it.isDirectory() ? it : zipTree(it)
}
}
manifest {
attributes("Main-Class": "main" )
}
}
task fatJar(type: Jar) {
zip64 true
manifest {
attributes 'Main-Class': "flinkNamePull.Demo"
}
baseName = "${rootProject.name}"
from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
with jar
}
Может кто-нибудь помочь мне увидеть, что мне не хватает? Я довольно новичок в Flink и потоке данных в целом. Хе-хе
Заранее спасибо!