Как я могу прочитать в режиме реального времени обновленные данные из Postgresql (или MySQL) через потоковое искры (или Кафка)? - PullRequest
0 голосов
/ 23 мая 2018

Я получаю обновленные данные в режиме реального времени от postgresql и хочу передать данные в реальном времени на фиксированную модель, чтобы прогнозировать клиента с помощью потокового воспроизведения или kafka.

, пожалуйста, порекомендуйте любой блог и точныйкод, который работает хорошо, или любая информация / предложения, которые вы знаете. postgresql / mysql обновленные данные в реальном времени в среде python / java также хороши! спасибо!

Или, может быть, это просто невозможно выполнить?

1 Ответ

0 голосов
/ 30 октября 2018

Это мой ответ. Надеюсь, вам это поможет.

Моя версия Spark 2.2.0.Язык программирования - Python.

Поток данных идет от kafka к mysql, а версия kafka - 0.9.

примечание: вы должны найти правильные jar с mysql и kafka, вы можете перейти кофициальный сайт, чтобы найти это.

код, подобный этому:

from pyspark import SparkContext, Row
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# note: the mysql's driver is must be correct

def getSparkSessionInstance(sparkConf):
    if ('sparkSessionSingletonInstance' not in globals()):
        globals()['sparkSessionSingletonInstance'] = SparkSession\
            .builder\
            .config(conf=sparkConf)\
            .getOrCreate()
    return globals()['sparkSessionSingletonInstance']

if __name__ == "__main__":

    # mysql config
    url = "jdbc:mysql://your_server:3306/spark_test"
    table_name = "word_info"
    username = "root"
    pasword = "root"

    # spark context init
    para_seconds = 10
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, para_seconds)

    # receiver in kafka
    brokers = 'kafka1:9092'
    topic = 'two-two-para'

    # get streaming datas from kafka
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

    lines = kvs.map(lambda x: x[1])

    # Convert RDDs of the words DStream to DataFrame and run SQL query
    def process(time, rdd):
        print("========= %s =========" % str(time))

        if (rdd.isEmpty()):
            return

        try:
            # Get the singleton instance of SparkSession
            spark = getSparkSessionInstance(rdd.context.getConf())

            # Convert RDD[String] to RDD[Row] to DataFrame
            rowRdd = rdd.map(lambda w: Row(word=w))
            wordsDataFrame = spark.createDataFrame(rowRdd)

            # Creates a temporary view using the DataFrame.
            wordsDataFrame.createOrReplaceTempView("words")

            # Do word count on table using SQL and print it
            wordCountsDataFrame = \
                spark.sql("select word, count(*) as word_count from words group by word")
            wordCountsDataFrame.show()

            wordCountsDataFrame.write \
            .format("jdbc") \
            .option("url", url) \
            .option("driver", "org.mariadb.jdbc.Driver") \
            .option("dbtable", table_name) \
            .option("user", username) \
            .option("password", pasword) \
            .save(mode="append")

        except Exception as e:
            print("Some error happen!")
            print(e)

    lines.foreachRDD(process)


    # start job
    ssc.start()
    ssc.awaitTermination()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...