У меня есть скрипт на python, который каждые X секунд загружает CSV из Интернета.Затем у меня есть второй скрипт с spark, который отслеживает каждую загрузку и показывает поля из этого CSV.Он работает для каждой загрузки (каждые 60 секунд в данный момент).
Итак, теперь я хочу изменить код, чтобы загружать его каждые 2 минуты (конечно, это не проблема, так как это только изменениеX в первом сценарии), но затем во втором сценарии я хочу сохранить время окна 8 минут, чтобы каждые 2 минуты мой второй сценарий ловил эту загрузку и обновлял окно последними 4 csv.Как только у меня будет 4 csv за последние 8 минут, я хотел бы сделать среднее значение для числовых полей
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType,DoubleType, StringType,StructType, StructField
from pyspark.sql.functions import explode, split
def main(file) -> None:
spark = SparkSession.builder.appName("StreamingParkingMalaga").getOrCreate()
campos = [StructField("poiID", StringType(), True),
StructField("nombre", StringType(), True),
StructField("direccion", StringType(), True),
StructField("telefono", IntegerType(), True),
StructField("correoelectronico", StringType(), True),
StructField("latitude", DoubleType(), True),
StructField("longitude", DoubleType(), True),
StructField("altitud", DoubleType(), True),
StructField("capacidad", IntegerType(), True),
StructField("capacidad_discapacitados", IntegerType(), True),
StructField("fechahora_ultima_actualizacion", StringType(), True),
StructField("libres", IntegerType(), True),
StructField("libres_discapacitados", IntegerType(), True),
StructField("nivelocupacion_naranja", StringType(), True),
StructField("nivelocupacion_rojo", StringType(), True),
StructField("smassa_sector_sare", StringType(), True)]
estructura = StructType(campos)
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark.readStream.format("csv").schema(estructura).option("header", "true").load(file)
lines.printSchema()
# Start running the query that prints the running counts to the console
query = lines[["nombre","fechahora_ultima_actualizacion","capacidad","libres"]].writeStream.outputMode("update").format("console").start()
query.awaitTermination()
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Usage: python StreamingParkingMalaga <pathToFolderWithTheData>", file=sys.stderr)
exit(-1)
main(sys.argv[1])