Я изучаю структурированную потоковую передачу с помощью блоков данных и борюсь с режимом консоли DataStreamWriter.
Моя программа:
- Имитирует потоковое поступление файлов в папку «monitor_dir» (один новый файл передается из «source_dir» каждые 10 секунд).
- Использует DataStreamReader для заполнения неограниченного DataFrame "inputUDF" содержимым каждого нового файла.
- Использует DataStreamWriter для вывода новых строк «inputUDF» в допустимый приемник.
В то время как программа работает при выборе использования приемника файлов (пакеты добавляются к файлам текстового формата в «result_dir»), я не вижу ничего, отображаемого при выборе приемника консоли.
Более того, когда я запускаю эквивалентную версию программы на своем локальном компьютере (с установленным Spark), она отлично работает как для файловых, так и для консольных приемников.
Мой вопрос:
- Как сделать так, чтобы эта программа выводила данные на консольный приемник и отображала результаты при использовании блоков данных?
Большое спасибо заранее!
С уважением,
Начо
Моя программа: myTest.py
import pyspark
import pyspark.sql.functions
import time
#------------------------------------
# FUNCTION get_source_dir_file_names
#------------------------------------
def get_source_dir_file_names(source_dir):
# 1. We create the output variable
res = []
# 2. We get the FileInfo representation of the files of source_dir
fileInfo_objects = dbutils.fs.ls(source_dir)
# 3. We traverse the fileInfo objects, to get the name of each file
for item in fileInfo_objects:
# 3.1. We get a string representation of the fileInfo
file_name = str(item)
# 3.2. We look for the pattern name= to remove all useless info from the start
lb_index = file_name.index("name='")
file_name = file_name[(lb_index + 6):]
# 3.3. We look for the pattern ') to remove all useless info from the end
ub_index = file_name.index("',")
file_name = file_name[:ub_index]
# 3.4. We append the name to the list
res.append(file_name)
# 4. We sort the list in alphabetic order
res.sort()
# 5. We return res
return res
#------------------------------------
# FUNCTION streaming_simulation
#------------------------------------
def streaming_simulation(source_dir, monitoring_dir, time_step_interval):
# 1. We get the names of the files on source_dir
files = get_source_dir_file_names(source_dir)
# 2. We get the starting time of the process
time.sleep(time_step_interval * 0.1)
start = time.time()
# 3. We set a counter in the amount of files being transferred
count = 0
# 4. We simulate the dynamic arriving of such these files from source_dir to dataset_dir
# (i.e, the files are moved one by one for each time period, simulating their generation).
for file in files:
# 4.1. We copy the file from source_dir to dataset_dir#
dbutils.fs.cp(source_dir + file, monitoring_dir + file)
# 4.2. We increase the counter, as we have transferred a new file
count = count + 1
# 4.3. We wait the desired transfer_interval until next time slot.
time.sleep((start + (count * time_step_interval)) - time.time())
# 5. We wait a last time_step_interval
time.sleep(time_step_interval)
#------------------------------------
# FUNCTION my_main
#------------------------------------
def my_main():
# 0. We set the mode
console_sink = True
# 1. We set the paths to the folders
source_dir = "/FileStore/tables/my_dataset/"
monitoring_dir = "/FileStore/tables/my_monitoring/"
checkpoint_dir = "/FileStore/tables/my_checkpoint/"
result_dir = "/FileStore/tables/my_result/"
dbutils.fs.rm(monitoring_dir, True)
dbutils.fs.rm(result_dir, True)
dbutils.fs.rm(checkpoint_dir, True)
dbutils.fs.mkdirs(monitoring_dir)
dbutils.fs.mkdirs(result_dir)
dbutils.fs.mkdirs(checkpoint_dir)
# 2. We configure the Spark Session
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel('WARN')
# 3. Operation C1: We create an Unbounded DataFrame reading the new content copied to monitoring_dir
inputUDF = spark.readStream.format("text")\
.load(monitoring_dir)
myDSW = None
# 4. Operation A1: We create the DataStreamWritter...
# 4.1. To either save to result_dir in append mode
if console_sink == False:
myDSW = inputUDF.writeStream.format("text")\
.option("path", result_dir) \
.option("checkpointLocation", checkpoint_dir)\
.trigger(processingTime="10 seconds")\
.outputMode("append")
# 4.2. Or to display by console in append mode
else:
myDSW = inputUDF.writeStream.format("console")\
.trigger(processingTime="10 seconds")\
.outputMode("append")
# 5. We get the StreamingQuery object derived from starting the DataStreamWriter
mySQ = myDSW.start()
# 6. We simulate the streaming arrival of files (i.e., one by one) from source_dir to monitoring_dir
streaming_simulation(source_dir, monitoring_dir, 10)
# 7. We stop the StreamingQuery to finish the application
mySQ.stop()
#-------------------------------
# MAIN ENTRY POINT
#-------------------------------strong text
if __name__ == '__main__':
my_main()
Мой набор данных: f1.txt
Первое предложение.
Второе предложение.
Мой набор данных: f2.txt
Третье предложение.
Четвертое предложение.
Мой набор данных: f3.txt
Пятое предложение.
Шестое предложение.