pyspark kafka log потребляет и записывает в hdfs как паркетный файл - PullRequest
0 голосов
/ 06 августа
• 1000 создание паркетного файла Итак, у меня есть тонны паркетных файлов. * 1006 1010 * вот мой код
from kafka.consumer import KafkaConsumer
from pyspark.sql import SparkSession
from .utils import *
import re
import pyspark

def write_to_hdfs(spark, message_list):
    if len(message_list) > 4:
        df = spark.createDataFrame(message_list, schema=log_schema)
        messages_list = []
        spark.read()
        df.repartition(1) \
            .write \
            .format('parquet') \
            .mode('append') \
            .option("header", "true") \
            .save('hdfs://hdfs-server:8020/user/nginx-log/test01/202007')

def consuming(spark, message, message_list):
    message_dict = re.match(log_pattern, message.value).groupdict()
    message_list.append(message_dict)

def main():
    consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                             value_deserializer=lambda m: m.decode('utf-8'))
    message_list = []
    spark = SparkSession.builder \
        .master("local[*]") \
        .appName('nginx log consumer') \
        .getOrCreate()
    consumer.subscribe('test01')
    for message in consumer:
        consuming(spark, message, message_list)
        write_to_hdfs(spark, message_list)

if __name__ == '__main__':
    try:
        main()
    except Exception as e:
        print(e)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...