Итак, я соединяю кафку с искрой и хочу найти это значение в моноблоке.Я не знаю, какой подход использовать внутри spark-клиента, использовать клиент mongdb или обращаться к mongo с сеансом spark.
Используя pyspark, я попытался обработать, что получить от kafka (объект json) внутри foreachrddfunc и попытался найти несколько значений в dong монго.
Если я использую сеанс искры: - Я должен каждый раз создавать сеанс.Если я использовал постоянный метод вне рабочего, сохранится ли он среди всех рабочих?- Это быстрее, чем при использовании обычного клиентского подключения Python Mongo?
Это лучший способ получить сообщение от kafka и обработать его в spark?
sc = SparkSession.builder.appName("test_kafka_app") \
.config("spark.mongodb.input.uri", MONGO_URL) \
.config("spark.mongodb.output.uri", MONGO_URL) \
.getOrCreate()
ww = sc.read.format("com.mongodb.spark.sql.DefaultSource").option("uri",
MONGO_URL).load()
ww.persist()
def test(value):
try:
sc = SparkSession.builder.appName("test_kafka_app") \
.config("spark.mongodb.input.uri", MONGO_URL) \
.config("spark.mongodb.output.uri", MONGO_URL) \
.getOrCreate()
ww = sc.read.format("com.mongodb.spark.sql.DefaultSource").option("uri",
MONGO_URL).load()
data = json.loads(value[1])
if "val1" in data:
print(ww.filter(func.array_contains(
ww.data, data["val1"])).collect())
except Exception as ee:
print(ee)
ssc = StreamingContext(sc.sparkContext, 2)
brokers = "localhost:9092" # sys.argv[1:]
topic = "new_topic"
kvs = KafkaUtils.createDirectStream(ssc,
[topic],
{
"metadata.broker.list": brokers
})
lines = kvs.flatMap(lambda x: x[1])
kvs.foreachRDD(lambda line: line.foreach(test))
ssc.start()
ssc.awaitTermination()