Как написать в Cassandra, используя foreachBatch () в Java Spark? - PullRequest
0 голосов
/ 11 декабря 2019

У меня есть следующий код, и я хотел бы записать в cassandra, используя структурированную потоковую передачу spark 2.4 foreachBatch

        Dataset<Row> df = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "topic1")
                .load();

        Dataset<Row> values=df.selectExpr(
                "split(value,',')[0] as field1",
                "split(value,',')[1] as field2",
                "split(value,',')[2] as field3",
                "split(value,',')[3] as field4",
                "split(value,',')[4] as field5");

//TODO write into cassandra 

values.writeStream().foreachBatch(
                    new VoidFunction2<Dataset<String>, Long> {
                public void call(Dataset<String> dataset, Long batchId) {

                    // Transform and write batchDF
       
            }
            ).start();

Ответы [ 2 ]

0 голосов
/ 12 декабря 2019

Когда вы используете .forEachBatch, ваш код просто работает как с обычными наборами данных ... В Java код может выглядеть следующим образом (полный источник здесь ):

.foreachBatch((VoidFunction2<Dataset<Row>, Long>) (df, batchId) ->
         df.write()
         .format("org.apache.spark.sql.cassandra")
         .options(ImmutableMap.of("table", "sttest", "keyspace", "test"))
         .mode(SaveMode.Append)
         .save()
)
0 голосов
/ 12 декабря 2019

Попробуйте добавить его в ваш pom.xml:

<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.11</artifactId>
    <version>2.4.2</version>
</dependency>

, после чего импорт cassandra влечет за собой:

import org.apache.spark.sql.cassandra._

, чем вы можете использовать метод cassandraFormat в вашей df:

dataset
      .write
      .cassandraFormat("table","keyspace")
      .save()
...