Короче говоря, я разработчик, пытающийся использовать Spark для перемещения данных из одной системы в другую.Необработанные данные из одной системы в сжатую, обобщенную форму в доморощенную аналитическую систему.
Я очень новичок в Spark - мои знания ограничены тем, что я смог выкопать и поэкспериментировать с последниминеделю или две.
То, что я представляю;используя Spark для наблюдения за событием от Кафки в качестве триггера.Захватите эту сущность / данные о событии потребителя и используйте его, чтобы сообщить мне, что необходимо обновить в аналитической системе.Затем я выполняю соответствующие запросы Spark для необработанных данных Cassandra и записываю результат в другую таблицу на стороне аналитики, которую метрики панели инструментов называют источником данных.
У меня есть простой структурированный потоковый запрос Kafkaза работой.В то время как я вижу, как объект потребления выводится на консоль, я не могу получить запись Kafka, когда происходит событие потребителя:
try {
SparkSession spark = SparkSession
.builder()
.master(this.sparkMasterAddress)
.appName("StreamingTest2")
.getOrCreate();
//THIS -> None of these events seem to give me the data consumed?
//...thinking I'd trigger the Cassandra write from here?
spark.streams().addListener(new StreamingQueryListener() {
@Override
public void onQueryStarted(QueryStartedEvent queryStarted) {
System.out.println("Query started: " + queryStarted.id());
}
@Override
public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
System.out.println("Query terminated: " + queryTerminated.id());
}
@Override
public void onQueryProgress(QueryProgressEvent queryProgress) {
System.out.println("Query made progress: " + queryProgress.progress());
}
});
Dataset<Row> reader = spark
.readStream()
.format("kafka")
.option("startingOffsets", "latest")
.option("kafka.bootstrap.servers", "...etc...")
.option("subscribe", "my_topic")
.load();
Dataset<String> lines = reader
.selectExpr("cast(value as string)")
.as(Encoders.STRING());
StreamingQuery query = lines
.writeStream()
.format("console")
.start();
query.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
Я также могу нормально запросить Cassandra с SparkSQL:
try {
SparkSession spark = SparkSession.builder()
.appName("SparkSqlCassandraTest")
.master("local[2]")
.getOrCreate();
Dataset<Row> reader = spark
.read()
.format("org.apache.spark.sql.cassandra")
.option("host", this.cassandraAddress)
.option("port", this.cassandraPort)
.option("keyspace", "my_keyspace")
.option("table", "my_table")
.load();
reader.printSchema();
reader.show();
spark.stop();
} catch (Exception e) {
e.printStackTrace();
}
Моя мысль такова;вызовите последний с первым, соберите эту вещь как приложение / пакет Spark / что угодно и разверните ее в spark.В этот момент я ожидал, что он будет постоянно обновлять таблицы метрик.
Будет ли это работоспособное, масштабируемое и разумное решение для того, что мне нужно?Я на правильном пути?Не против использования Scala, если это легче или лучше, в некотором смысле.
Спасибо!
РЕДАКТИРОВАТЬ : Вот диаграмма того, с чем я столкнулся.