Блоки данных - структурированная потоковая передача: в формате консоли ничего не отображается - PullRequest
0 голосов
/ 09 ноября 2018

Я изучаю структурированную потоковую передачу с помощью блоков данных и борюсь с режимом консоли DataStreamWriter.

Моя программа:

  • Имитирует потоковое поступление файлов в папку «monitor_dir» (один новый файл передается из «source_dir» каждые 10 секунд).
  • Использует DataStreamReader для заполнения неограниченного DataFrame "inputUDF" содержимым каждого нового файла.
  • Использует DataStreamWriter для вывода новых строк «inputUDF» в допустимый приемник.

В то время как программа работает при выборе использования приемника файлов (пакеты добавляются к файлам текстового формата в «result_dir»), я не вижу ничего, отображаемого при выборе приемника консоли.

Более того, когда я запускаю эквивалентную версию программы на своем локальном компьютере (с установленным Spark), она отлично работает как для файловых, так и для консольных приемников.

Мой вопрос:

  • Как сделать так, чтобы эта программа выводила данные на консольный приемник и отображала результаты при использовании блоков данных?

Большое спасибо заранее!

С уважением, Начо


Моя программа: myTest.py

import pyspark
import pyspark.sql.functions

import time

#------------------------------------
# FUNCTION get_source_dir_file_names
#------------------------------------ 
def get_source_dir_file_names(source_dir):

    # 1. We create the output variable
    res = []

    # 2. We get the FileInfo representation of the files of source_dir
    fileInfo_objects = dbutils.fs.ls(source_dir)

    # 3. We traverse the fileInfo objects, to get the name of each file
    for item in fileInfo_objects:      
        # 3.1. We get a string representation of the fileInfo
        file_name = str(item)

        # 3.2. We look for the pattern name= to remove all useless info from the start
        lb_index = file_name.index("name='")
        file_name = file_name[(lb_index + 6):]

        # 3.3. We look for the pattern ') to remove all useless info from the end
        ub_index = file_name.index("',")
        file_name = file_name[:ub_index]

        # 3.4. We append the name to the list
        res.append(file_name)

    # 4. We sort the list in alphabetic order
    res.sort()

    # 5. We return res
    return res

#------------------------------------
# FUNCTION streaming_simulation
#------------------------------------ 
def streaming_simulation(source_dir, monitoring_dir, time_step_interval):
    # 1. We get the names of the files on source_dir
    files = get_source_dir_file_names(source_dir)

    # 2. We get the starting time of the process
    time.sleep(time_step_interval * 0.1)

    start = time.time()

    # 3. We set a counter in the amount of files being transferred
    count = 0

    # 4. We simulate the dynamic arriving of such these files from source_dir to dataset_dir
    # (i.e, the files are moved one by one for each time period, simulating their generation).
    for file in files:
        # 4.1. We copy the file from source_dir to dataset_dir#
        dbutils.fs.cp(source_dir + file, monitoring_dir + file)

        # 4.2. We increase the counter, as we have transferred a new file
        count = count + 1

        # 4.3. We wait the desired transfer_interval until next time slot.
        time.sleep((start + (count * time_step_interval)) - time.time())

    # 5. We wait a last time_step_interval
    time.sleep(time_step_interval)

#------------------------------------
# FUNCTION my_main
#------------------------------------ 
def my_main():
    # 0. We set the mode
    console_sink = True

    # 1. We set the paths to the folders
    source_dir = "/FileStore/tables/my_dataset/"
    monitoring_dir = "/FileStore/tables/my_monitoring/"
    checkpoint_dir = "/FileStore/tables/my_checkpoint/"
    result_dir = "/FileStore/tables/my_result/"

    dbutils.fs.rm(monitoring_dir, True)
    dbutils.fs.rm(result_dir, True)
    dbutils.fs.rm(checkpoint_dir, True)  

    dbutils.fs.mkdirs(monitoring_dir)
    dbutils.fs.mkdirs(result_dir)
    dbutils.fs.mkdirs(checkpoint_dir)    

    # 2. We configure the Spark Session
    spark = pyspark.sql.SparkSession.builder.getOrCreate()
    spark.sparkContext.setLogLevel('WARN')    

    # 3. Operation C1: We create an Unbounded DataFrame reading the new content copied to monitoring_dir
    inputUDF = spark.readStream.format("text")\
                               .load(monitoring_dir)

    myDSW = None
    # 4. Operation A1: We create the DataStreamWritter...

    # 4.1. To either save to result_dir in append mode  
    if console_sink == False:
        myDSW = inputUDF.writeStream.format("text")\
                                    .option("path", result_dir) \
                                    .option("checkpointLocation", checkpoint_dir)\
                                    .trigger(processingTime="10 seconds")\
                                    .outputMode("append")   
    # 4.2. Or to display by console in append mode    
    else:
        myDSW = inputUDF.writeStream.format("console")\
                                    .trigger(processingTime="10 seconds")\
                                    .outputMode("append")   

    # 5. We get the StreamingQuery object derived from starting the DataStreamWriter
    mySQ = myDSW.start()

    # 6. We simulate the streaming arrival of files (i.e., one by one) from source_dir to monitoring_dir
    streaming_simulation(source_dir, monitoring_dir, 10)

    # 7. We stop the StreamingQuery to finish the application
    mySQ.stop()    

#-------------------------------
# MAIN ENTRY POINT
#-------------------------------strong text
if __name__ == '__main__':
    my_main()

Мой набор данных: f1.txt

Первое предложение.

Второе предложение.


Мой набор данных: f2.txt

Третье предложение.

Четвертое предложение.


Мой набор данных: f3.txt

Пятое предложение.

Шестое предложение.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...