Apache Spark - сбор данных Kafka о потоковом событии для запуска рабочего процесса - PullRequest
0 голосов
/ 11 октября 2018

Короче говоря, я разработчик, пытающийся использовать Spark для перемещения данных из одной системы в другую.Необработанные данные из одной системы в сжатую, обобщенную форму в доморощенную аналитическую систему.

Я очень новичок в Spark - мои знания ограничены тем, что я смог выкопать и поэкспериментировать с последниминеделю или две.

То, что я представляю;используя Spark для наблюдения за событием от Кафки в качестве триггера.Захватите эту сущность / данные о событии потребителя и используйте его, чтобы сообщить мне, что необходимо обновить в аналитической системе.Затем я выполняю соответствующие запросы Spark для необработанных данных Cassandra и записываю результат в другую таблицу на стороне аналитики, которую метрики панели инструментов называют источником данных.

У меня есть простой структурированный потоковый запрос Kafkaза работой.В то время как я вижу, как объект потребления выводится на консоль, я не могу получить запись Kafka, когда происходит событие потребителя:

try {
    SparkSession spark = SparkSession
        .builder()
        .master(this.sparkMasterAddress)
        .appName("StreamingTest2")
        .getOrCreate();

    //THIS -> None of these events seem to give me the data consumed?
    //...thinking I'd trigger the Cassandra write from here?
    spark.streams().addListener(new StreamingQueryListener() {
        @Override
        public void onQueryStarted(QueryStartedEvent queryStarted) {
            System.out.println("Query started: " + queryStarted.id());
        }
        @Override
        public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
            System.out.println("Query terminated: " + queryTerminated.id());
        }
        @Override
        public void onQueryProgress(QueryProgressEvent queryProgress) {
            System.out.println("Query made progress: " + queryProgress.progress());
        }
    });

    Dataset<Row> reader = spark
        .readStream()
        .format("kafka")
        .option("startingOffsets", "latest")
        .option("kafka.bootstrap.servers", "...etc...")
        .option("subscribe", "my_topic")
        .load();

    Dataset<String> lines = reader
        .selectExpr("cast(value as string)")
        .as(Encoders.STRING());

    StreamingQuery query = lines
        .writeStream()
        .format("console")
        .start();
    query.awaitTermination();
} catch (Exception e) {
    e.printStackTrace();
}

Я также могу нормально запросить Cassandra с SparkSQL:

try {
    SparkSession spark = SparkSession.builder()
        .appName("SparkSqlCassandraTest")
        .master("local[2]")
        .getOrCreate();

    Dataset<Row> reader = spark
        .read()
        .format("org.apache.spark.sql.cassandra")
        .option("host", this.cassandraAddress)
        .option("port", this.cassandraPort)
        .option("keyspace", "my_keyspace")
        .option("table", "my_table")
        .load();

    reader.printSchema();
    reader.show();

    spark.stop();
} catch (Exception e) {
    e.printStackTrace();
}

Моя мысль такова;вызовите последний с первым, соберите эту вещь как приложение / пакет Spark / что угодно и разверните ее в spark.В этот момент я ожидал, что он будет постоянно обновлять таблицы метрик.

Будет ли это работоспособное, масштабируемое и разумное решение для того, что мне нужно?Я на правильном пути?Не против использования Scala, если это легче или лучше, в некотором смысле.

Спасибо!

РЕДАКТИРОВАТЬ : Вот диаграмма того, с чем я столкнулся.

enter image description here

1 Ответ

0 голосов
/ 12 октября 2018

Понял.Узнал о ForeachWriter.Прекрасно работает:

        StreamingQuery query = lines
            .writeStream()
            .format("foreach")
            .foreach(new ForeachWriter<String>() {
                @Override
                public void process(String value) {
                    System.out.println("process() value = " + value);
                }

                @Override
                public void close(Throwable errorOrNull) {}

                @Override
                public boolean open(long partitionId, long version) {
                    return true;
                }
            })
            .start(); 
...