Обработка данных в Spark Structured Streaming перед выводом на консоль - PullRequest
2 голосов
/ 21 июня 2020

Я постараюсь сделать это проще. Я периодически читаю некоторые данные от производителя kafka и выводю следующее, используя Spark Structured streaming

У меня есть данные, которые выводятся следующим образом:

+------------------------------------------+-------------------+--------------+-----------------+
|window                                    |timestamp          |Online_Emp    |Available_Emp    |
+------------------------------------------+-------------------+--------------+-----------------+
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:27|1             |0                |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:41|1             |0                |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:29|1             |0                |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:20|1             |0                |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:23|2             |0                |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:52|1             |0                |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:08|1             |0                |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:12|1             |0                |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:02|1             |1                |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:11|1             |0                |
+------------------------------------------+-------------------+--------------+-----------------+

И я хочу, чтобы они выводились следующим образом:

Time         Online_Emp Available_Emp
2017-01-01 00:00:00  52  23
2017-01-01 00:01:00  58  19
2017-01-01 00:02:00  65  28

Таким образом, в основном он считает сотрудников онлайн в минуту (с помощью их уникального идентификатора драйвера) и показывает, сколько из них доступно.

Обратите внимание, что один конкретный c идентификатор занятости может изменение между available и on_duty в течение минуты, и нам нужен окончательный результат до конца минуты

Kafka Prod

_producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: 
                         json.dumps(x).encode('utf-8'))
    
    # schedule.every(1).minutes.do(_producer.send(topic_name, value=( json.loads(json.dumps(dataDict))) ) )

    with open(filepath, 'r', encoding="utf16") as f: 

        for item in json_lines.reader(f):
            dataDict.update({'timeStamp':item['timestamp'],
                    'emp_id':item['emp_id'],
                    'on_duty':item['on_duty']})
            _producer.send(topic_name, value=( json.loads(json.dumps(dataDict))) )
            sleep(1)

Spark Structured streaming

schema = StructType([ \
    StructField("timeStamp", LongType()), \
    StructField("emp_id", LongType()), \
    StructField("on_duty", LongType())])

df = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe","emp_dstream")\
    .option("startingOffsets", "earliest")\
    .load()\
    .selectExpr("CAST(value AS STRING)")\
    .select(F.from_json(F.col("value"), schema).alias("value"))\
    .select(F.col("value.*"))\
    .withColumn("timestamp",F.from_unixtime(F.col("timestamp") / 1000))\
    .groupBy(F.window(F.col("timestamp"), "1 minutes"), F.col("timestamp"))\
    .agg(F.count(F.col("timeStamp")).alias("total_employees"),F.collect_list(F.col("on_duty")).alias("on_duty"),F.sum(F.when(F.col("on_duty") == 0, F.lit(1)).otherwise(F.lit(0))).alias("not_on_duty"))\
    .writeStream\
    .format("console")\
    .outputMode("complete")\
    .option("truncate", "false")\
    .start()\
    .awaitTermination()

Как получить желаемый результат?

Буду признателен за любые подсказки или помощь!

1 Ответ

0 голосов
/ 22 июня 2020

Ваш код работает отлично. Пожалуйста, проверьте ниже данные kafka и вывод потоковой передачи искр.

Batch 5 - ваш окончательный результат и игнорируйте другие пакеты, такие как пакеты от 0 до 4. Всегда учитывайте, что последний пакет обновил записи по данным, доступным в kafka.

Партия: 0

No data in kafka.

Spark Streaming
+------+---------+---------------+-------+-----------+
|window|timestamp|total_employees|on_duty|not_on_duty|
+------+---------+---------------+-------+-----------+
+------+---------+---------------+-------+-----------+

Партия: 1

Published to kafka.

{"timeStamp": 1592669691475, "emp_id": 12471114, "on_duty": 0} //2020-06-20T21:44:51
{"timeStamp": 1592669691475, "emp_id": 12471124, "on_duty": 0} //2020-06-20T21:44:51
{"timeStamp": 1592669691475, "emp_id": 12471134, "on_duty": 0} //2020-06-20T21:44:51

Spark Streaming
+---------------------------------------------+-------------------+---------------+---------+-----------+
|window                                       |timestamp          |total_employees|on_duty  |not_on_duty|
+---------------------------------------------+-------------------+---------------+---------+-----------+
|[2020-06-20 21:44:00.0,2020-06-20 21:45:00.0]|2020-06-20 21:44:51|3              |[0, 0, 0]|3          |
+---------------------------------------------+-------------------+---------------+---------+-----------+

Партия: 2

Published to kafka.
{"timeStamp": 1592669691475, "emp_id": 12471144, "on_duty": 0} //2020-06-20T21:44:51 // seconds difference
{"timeStamp": 1592669691575, "emp_id": 12471124, "on_duty": 0} //2020-06-20T21:44:51
{"timeStamp": 1592669691575, "emp_id": 12471234, "on_duty": 0} //2020-06-20T21:44:51
{"timeStamp": 1592669691575, "emp_id": 12471334, "on_duty": 1} //2020-06-20T21:44:51

Spark Streaming
+---------------------------------------------+-------------------+---------------+---------------------+-----------+
|window                                       |timestamp          |total_employees|on_duty              |not_on_duty|
+---------------------------------------------+-------------------+---------------+---------------------+-----------+
|[2020-06-20 21:44:00.0,2020-06-20 21:45:00.0]|2020-06-20 21:44:51|7              |[0, 0, 0, 1, 0, 0, 0]|6          |
+---------------------------------------------+-------------------+---------------+---------------------+-----------+

Партия: 3

Published to kafka.
{"timeStamp": 1592669691575, "emp_id": 12471124, "on_duty": 0} // 2020-06-20T21:44:51
{"timeStamp": 1592669691575, "emp_id": 12471424, "on_duty": 1} // 2020-06-20T21:44:51
{"timeStamp": 1592669631475, "emp_id": 12472188, "on_duty": 1} // 2020-06-20T21:43:51
{"timeStamp": 1592669631475, "emp_id": 12472288, "on_duty": 0} // 2020-06-20T21:43:51
{"timeStamp": 1592669631475, "emp_id": 12472388, "on_duty": 0} // 2020-06-20T21:43:51
{"timeStamp": 1592669631475, "emp_id": 12472488, "on_duty": 1} // 2020-06-20T21:43:51

Spark Streaming
+---------------------------------------------+-------------------+---------------+---------------------------+-----------+
|window                                       |timestamp          |total_employees|on_duty                    |not_on_duty|
+---------------------------------------------+-------------------+---------------+---------------------------+-----------+
|[2020-06-20 21:44:00.0,2020-06-20 21:45:00.0]|2020-06-20 21:44:51|9              |[0, 1, 0, 0, 0, 1, 0, 0, 0]|7          |
|[2020-06-20 21:43:00.0,2020-06-20 21:44:00.0]|2020-06-20 21:43:51|4              |[1, 0, 0, 1]               |2          |
+---------------------------------------------+-------------------+---------------+---------------------------+-----------+

Партия: 4

Published to kafka.
{"timeStamp": 1592669691575, "emp_id": 12471524, "on_duty": 0} // 2020-06-20T21:44:51
{"timeStamp": 1592669691575, "emp_id": 12471624, "on_duty": 0} // 2020-06-20T21:44:51
{"timeStamp": 1592669631475, "emp_id": 12471188, "on_duty": 1} // 2020-06-20T21:43:51
{"timeStamp": 1592669631475, "emp_id": 12472288, "on_duty": 0} // 2020-06-20T21:43:51
{"timeStamp": 1592669631475, "emp_id": 12473388, "on_duty": 0} // 2020-06-20T21:43:51
{"timeStamp": 1592669631475, "emp_id": 12474488, "on_duty": 1} // 2020-06-20T21:43:51

Spark Streaming
+---------------------------------------------+-------------------+---------------+---------------------------------+-----------+
|window                                       |timestamp          |total_employees|on_duty                          |not_on_duty|
+---------------------------------------------+-------------------+---------------+---------------------------------+-----------+
|[2020-06-20 21:44:00.0,2020-06-20 21:45:00.0]|2020-06-20 21:44:51|11             |[0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0]|9          |
|[2020-06-20 21:43:00.0,2020-06-20 21:44:00.0]|2020-06-20 21:43:51|8              |[1, 0, 0, 1, 1, 0, 0, 1]         |4          |
+---------------------------------------------+-------------------+---------------+---------------------------------+-----------+

Партия: 5

Published to kafka.
{"timeStamp": 1592669571475, "emp_id": 12482185, "on_duty": 1} // 2020-06-20T21:42:51
{"timeStamp": 1592669571475, "emp_id": 12483185, "on_duty": 1} // 2020-06-20T21:42:51
{"timeStamp": 1592669631475, "emp_id": 12484488, "on_duty": 1} // 2020-06-20T21:43:51
{"timeStamp": 1592669691575, "emp_id": 12491524, "on_duty": 0} // 2020-06-20T21:44:51
{"timeStamp": 1592669091575, "emp_id": 12491124, "on_duty": 0} // 2020-06-20T21:34:51
{"timeStamp": 1592669091575, "emp_id": 12491224, "on_duty": 1} // 2020-06-20T21:34:51

Spark Streaming
+---------------------------------------------+-------------------+---------------+------------------------------------+-----------+
|window                                       |timestamp          |total_employees|on_duty                             |not_on_duty|
+---------------------------------------------+-------------------+---------------+------------------------------------+-----------+
|[2020-06-20 21:34:00.0,2020-06-20 21:35:00.0]|2020-06-20 21:34:51|2              |[0, 1]                              |1          |
|[2020-06-20 21:42:00.0,2020-06-20 21:43:00.0]|2020-06-20 21:42:51|2              |[1, 1]                              |0          |
|[2020-06-20 21:44:00.0,2020-06-20 21:45:00.0]|2020-06-20 21:44:51|12             |[0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0]|10         |
|[2020-06-20 21:43:00.0,2020-06-20 21:44:00.0]|2020-06-20 21:43:51|9              |[1, 1, 0, 0, 1, 1, 0, 0, 1]         |4          |
+---------------------------------------------+-------------------+---------------+------------------------------------+-----------+
...