Я пробую простой пример публикации данных в Kafka и использования их с помощью Spark.
Вот код производителя:
var kafka_input = spark.sql("""
SELECT CAST(Id AS STRING) as key,
to_json(
named_struct(
'Id', Id,
'Title',Title
)
) as value
FROM offer_data""")
kafka_input.write
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("topic", topicName)
.save()
Я проверил, что kafka_input
имеет строку json для значения и число, приведенное в качестве строки для ключа.
Вот код потребителя:
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("subscribe", topicName)
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
df.take(50)
display(df)
Данные, которые я получаю на стороне потребителя, представляют собой строку в кодировке base64.
Как мне декодировать значение в Scala?
Кроме того, этот оператор чтения не сбрасывает эти записи из очереди Kafka. Я предполагаю, что это потому, что я не посылаю никаких сигналов подтверждения обратно Кафке. Это верно? Если да, то как мне это сделать?