Python Spark Streaming Window - PullRequest
       9

Python Spark Streaming Window

0 голосов
/ 27 ноября 2018

У меня есть скрипт на python, который каждые X секунд загружает CSV из Интернета.Затем у меня есть второй скрипт с spark, который отслеживает каждую загрузку и показывает поля из этого CSV.Он работает для каждой загрузки (каждые 60 секунд в данный момент).

Итак, теперь я хочу изменить код, чтобы загружать его каждые 2 минуты (конечно, это не проблема, так как это только изменениеX в первом сценарии), но затем во втором сценарии я хочу сохранить время окна 8 минут, чтобы каждые 2 минуты мой второй сценарий ловил эту загрузку и обновлял окно последними 4 csv.Как только у меня будет 4 csv за последние 8 минут, я хотел бы сделать среднее значение для числовых полей

import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType,DoubleType, StringType,StructType, StructField
from pyspark.sql.functions import explode, split

def main(file) -> None:
spark = SparkSession.builder.appName("StreamingParkingMalaga").getOrCreate()

campos = [StructField("poiID", StringType(), True),
          StructField("nombre", StringType(), True),
          StructField("direccion", StringType(), True),
          StructField("telefono", IntegerType(), True),
          StructField("correoelectronico", StringType(), True),
          StructField("latitude", DoubleType(), True),
          StructField("longitude", DoubleType(), True),
          StructField("altitud", DoubleType(), True),
          StructField("capacidad", IntegerType(), True),
          StructField("capacidad_discapacitados", IntegerType(), True),
          StructField("fechahora_ultima_actualizacion", StringType(), True),
          StructField("libres", IntegerType(), True),
          StructField("libres_discapacitados", IntegerType(), True),
          StructField("nivelocupacion_naranja", StringType(), True),
          StructField("nivelocupacion_rojo", StringType(), True),
          StructField("smassa_sector_sare", StringType(), True)]

estructura = StructType(campos)

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark.readStream.format("csv").schema(estructura).option("header", "true").load(file)

lines.printSchema()

# Start running the query that prints the running counts to the console
query = lines[["nombre","fechahora_ultima_actualizacion","capacidad","libres"]].writeStream.outputMode("update").format("console").start()

query.awaitTermination()


if __name__ == '__main__':
    if len(sys.argv) != 2:
        print("Usage: python StreamingParkingMalaga <pathToFolderWithTheData>", file=sys.stderr)
        exit(-1)

    main(sys.argv[1])
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...