StructuredStreaming с ForEachWriter создает дубликаты - PullRequest
0 голосов
/ 20 марта 2020

Привет. Я пытаюсь создать приемник neo4j, используя pyspark и kafka, но по какой-то причине этот приемник создает дубликаты в neo4j, и я не уверен, почему это происходит. Я ожидаю получить только один узел, но похоже, что он создает 4. Если у кого-то есть идея, пожалуйста, дайте мне знать.

Код производителя Kafka:

from kafka import KafkaProducer
import json

producer = KafkaProducer(bootstrap_servers='10.0.0.38:9092')
message = {
    'test_1': 'test_1',
    'test_2': 'test_2'
}

producer.send('test_topic', json.dumps(message).encode('utf-8'))
producer.close()

Код потребителя Kafka :

from kafka import KafkaConsumer
import findspark
from py2neo import Graph
import json

findspark.init()
from pyspark.sql import SparkSession

class ForeachWriter:
    def open(self, partition_id, epoch_id):
        neo4j_uri = '' # neo4j uri
        neo4j_auth = ('', '') # neo4j user, password

        self.graph = Graph(neo4j_uri, auth=neo4j_auth)
        return True

    def process(self, msg):        
        msg = json.loads(msg.value.decode('utf-8'))
        self.graph.run("CREATE (n: MESSAGE_RECEIVED) SET n.key = '" + str(msg).replace("'", '"') + "'")

        raise KeyError('received message: {}. finished creating node'.format(msg))

spark = SparkSession.builder.appName('test-consumer') \
                            .config('spark.executor.instances', 1) \
                            .getOrCreate()        

ds1 = spark.readStream \
                   .format('kafka') \
                   .option('kafka.bootstrap.servers', '10.0.0.38:9092') \
                   .option('subscribe', 'test_topic') \
                   .load()

query = ds1.writeStream.foreach(ForeachWriter()).start()
query.awaitTermination()

neo4j график после выполнения кода

1 Ответ

0 голосов
/ 20 марта 2020

После некоторого поиска я нашел следующий фрагмент текста из Потоковая обработка с Apache Spark: освоение структурированной потоковой передачи и Spark Streaming в главе 11 p151 после описания открытия, обработки и закрытия для ForeachWriter:

Этот контракт является частью семантики доставки данных, поскольку он позволяет нам удалять дублированные разделы, которые, возможно, уже были отправлены в приемник, но повторно обработаны структурированной потоковой передачей в рамках сценария восстановления. Для правильной работы этого механизма приемник должен реализовать какой-то постоянный способ запоминания комбинаций разделов / версий, которые он уже видел.

В другом примечании с сайта Spark: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html (см. Раздел по Foreach).

Примечание: Spark не гарантирует одинаковый вывод для (partitionId, epochId), поэтому дедупликация не может быть достигнута с помощью (partitionId, epochId). например, источник предоставляет различное количество разделов по некоторым причинам, оптимизация Spark изменяет количество разделов и т. д. c См. SPARK-28650 для более подробной информации. Если вам нужна дедупликация при выводе, попробуйте вместо этого foreachBatch.

Похоже, мне нужно реализовать проверку на уникальность, потому что структурированная потоковая передача автоматически обрабатывает разделы в случае сбоя, если я использую ForeachWriter, в противном случае я должен вместо этого переключиться на foreachBatch.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...