сжатие логов не работает при чтении данных с кафки с искрой - PullRequest
0 голосов
/ 16 января 2020

Я создал следующие топи c в kafka, используя оболочку kafka. Моя цель состоит в том, чтобы дедуплицировать данные при чтении с искрой из топки Кафки c, поэтому я создал сжатую запись топологии кафки c следующим образом:

bin/kafka-topics.sh --create --zookeeper server:2181 --replication-factor 2  --partitions 4 --topic topic_dfs --config "cleanup.policy=compact" --config "delete.retention.ms=1"  --config "segment.ms=2" --config "min.cleanable.dirty.ratio=0.01"

затем я использую коннектор искры для кафки для чтения данных я ожидаю, что результирующий фрейм данных будет иметь разные строки, а это не так.

Входной файл, который я читаю с помощью spark:

number,word
8,def
5,sde
5,sde
8,def

Я записываю следующие данные в topi c, и при чтении получаю повторяющиеся строки.

    val someDF = spark.read.format("csv").schema(eventStructType).load("/home/path_to_ input")
    someDF.printSchema()
  logger.warn("reading as expected")
    someDF.selectExpr("CAST(number AS STRING) AS key",
      "to_json(struct(*)) AS value").write
      .format("kafka")
      .option("server:9092")
      .option("topic", "topic_dfs")
      .save()


    val result :DataFrame= spark.read.format("kafka").option("kafka.bootstrap.servers",  "server:9092").option("subscribe", "topic_hsd")
      .option("failOnDataLoss","false").option("enable.auto.commit","false")
    .load()
     val a= result.select(col("key").cast("string"),from_json(col("value").cast("string"), eventStructType))
    a.show() 

Я получаю одни и те же дубликаты в результате ввода, так как сжатие журнала не работает

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...