PySpark не освобождает память, когда я запускаю удаление фрейма данных - PullRequest
0 голосов
/ 16 апреля 2020

Я пишу код, который зацикливается на фрейме данных несколько раз на основе извлеченной метки времени. Проблема здесь в том, что всякий раз, когда запускаются внешние циклы, память не освобождается в искре. После сбора фрейма данных осталось всего 100 записей. После четырех внешних циклов моя память пересекает более 10G. Я новичок в искре.


infected = [3, 6, 127]
import gc
print("Grouping The value by timestamp")
all_value = {}
for level, value in enumerate(data_2020.select('timestamp').distinct().collect()):
  print(level, value)
  filter_condition = 'timestamp="{}"'.format(value.timestamp)
  sample_data = data_2020.filter(filter_condition).select('latitude', 'longitude', 'car_id')
  infected_df = sample_data.filter(sample_data.car_id.isin(*infected)).collect()
  sample_data = sample_data.filter(~sample_data.car_id.isin(*infected))
  sample_data = sample_data.withColumn("min_distance", lit(np.inf)).withColumn("infector", lit(np.inf))

  for base_df in infected_df:
    sample_data = sample_data.withColumn('current_infector',
            lit(base_df.car_id)).withColumn('base_latitude',
            lit(base_df.latitude)).withColumn('base_longitude',
            lit(base_df.longitude)).withColumn('base_latitude',
            f.round('base_latitude', 6)).withColumn('base_longitude',
            f.round('base_longitude', 6)).withColumn('distance',
            haversine_custom('latitude', 'longitude', 'base_latitude',
            'base_longitude')).withColumn('calculation',
            find_min_udf('min_distance', 'distance', 'infector',
            'current_infector')).withColumn('threshold',
            threshold_udf('min_distance')).select('latitude',
            'longitude', 'car_id', 'calculation.min_distance',
            'calculation.infector')
  all_value[value.timestamp] = sample_data.withColumn("threshold", threshold_udf("min_distance")).filter("threshold=1").collect()
  sample_data.unpersist()
  sample_data = None
  del sample_data
  spark.catalog.clearCache()

  infected = list(set([i.car_id for i in all_value[value.timestamp]] + infected))
  print("Level: {} \n Infected: {}".format(level, infected))
  gc.collect()

...