Я создал следующие топи c в kafka, используя оболочку kafka. Моя цель состоит в том, чтобы дедуплицировать данные при чтении с искрой из топки Кафки c, поэтому я создал сжатую запись топологии кафки c следующим образом:
bin/kafka-topics.sh --create --zookeeper server:2181 --replication-factor 2 --partitions 4 --topic topic_dfs --config "cleanup.policy=compact" --config "delete.retention.ms=1" --config "segment.ms=2" --config "min.cleanable.dirty.ratio=0.01"
затем я использую коннектор искры для кафки для чтения данных я ожидаю, что результирующий фрейм данных будет иметь разные строки, а это не так.
Входной файл, который я читаю с помощью spark:
number,word
8,def
5,sde
5,sde
8,def
Я записываю следующие данные в topi c, и при чтении получаю повторяющиеся строки.
val someDF = spark.read.format("csv").schema(eventStructType).load("/home/path_to_ input")
someDF.printSchema()
logger.warn("reading as expected")
someDF.selectExpr("CAST(number AS STRING) AS key",
"to_json(struct(*)) AS value").write
.format("kafka")
.option("server:9092")
.option("topic", "topic_dfs")
.save()
val result :DataFrame= spark.read.format("kafka").option("kafka.bootstrap.servers", "server:9092").option("subscribe", "topic_hsd")
.option("failOnDataLoss","false").option("enable.auto.commit","false")
.load()
val a= result.select(col("key").cast("string"),from_json(col("value").cast("string"), eventStructType))
a.show()
Я получаю одни и те же дубликаты в результате ввода, так как сжатие журнала не работает