Я пытаюсь использовать couchbase в качестве источника потоковой передачи для искровой структурированной потоковой передачи с использованием искрового соединителя.
val records = spark.readStream
.format(“com.couchbase.spark.sql”).schema(schema)
.load()
И у меня есть этот запрос
records
.groupBy(“type”)
.count()
.writeStream
.outputMode(“complete”)
.format(“console”)
.start()
.awaitTermination()
Для этого запроса я не получить правильный вывод. Моя таблица вывода запросов выглядит следующим образом
Batch: 0
20/04/14 14:28:00 INFO CodeGenerator: Code generated in 10.538654 ms
20/04/14 14:28:00 INFO WriteToDataSourceV2Exec: Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@17fe0ec7 committed.
±-------±----+
|type | count|
±-------±----+
±-------±----+
Однако, если я использую couchbase для извлечения документов как потоковых. Подобно
val cdr = spark.read.couchbase(EqualTo(“type”, “cdr”))
cdr.count()
Схема правильно выведена для этой не потоковой операции и использовала ту же схему и для структурированной потоковой передачи.
INFO N1QLRelation: Inferred schema is StructType(StructField(META_ID,StringType,true), StructField(_class,StringType,true), StructField(accountId,StringType,true),
дает правильный вывод. (count = 28).
Пожалуйста, дайте мне знать, почему это не работает со структурированной потоковой передачей.