36 000 RDD поисков, чтобы сделать.Каждый из них занимает 2 секунды.Один день подряд: по 20 секунд на каждого.Как может произойти это замедление? - PullRequest
0 голосов
/ 11 декабря 2018

Я создал набор Spark RDD, которые предлагают деловые регистрации по SIREN, SIRET (два французских национальных идентификатора) и коду города (с именем Code Commune): разные разделы для различных применений.

/** RDD disponibles */
private transient DonneesAttributaires<SIRET, Etablissement> donneesEtablissements;    

/**
* Acquérir les établissements.
* @param anneeCOG Année du Code Officiel Géographique de référence.
* @param annee Année pour laquelle rechercher les établissements.
* @param actifsSeulement true s'il faut se limiter aux seuls établissements actifs.
* @return Ensemble des données attributaires accessibles sur les établissements.
* @throws TechniqueException si un incident survient.
*/
@SuppressWarnings("resource")
public DonneesAttributaires<SIRET, Etablissement> acquerirEtablissements(int anneeCOG, int annee, boolean actifsSeulement) throws TechniqueException {
  if (this.donneesEtablissements != null) {
     return this.donneesEtablissements;
  }

  LOGGER.info("Début de l'acquisition des établissements par Spark pour l'année {}...", annee);

  JavaSparkContext sc = JavaSparkContext.fromSparkContext(this.session.sparkContext());
  this.serviceGeographique.obtenirCodeOfficielGeographique(anneeCOG, true);
  File source = this.mapCsvVersEtablissements.parametrer(annee, this.serviceGeographique.obtenirCommunes(anneeCOG));

  JavaRDD<Etablissement> etablissements = sc.textFile(source.getAbsolutePath()).map(this.mapCsvVersEtablissements).filter(etablissement -> etablissement.hasAnomalie(SeveriteAnomalie.ERREUR, false) == false && etablissement.isAnnulationLogique() == false);

  // Restreindre aux seuls établissements actifs, si souhaité.
  if (actifsSeulement) {
     etablissements = etablissements.filter(etablissement -> etablissement.isActive());
  }

  JavaPairRDD<SIRET, Etablissement> etablissementsParSIRET = etablissements
     .mapToPair(etablissement -> new Tuple2<>(etablissement.getSiret(), etablissement))
     .partitionBy(this.partitionParSIRET)
     .persist(StorageLevel.MEMORY_ONLY());

  JavaPairRDD<SIREN, Etablissement> etablissementsParSIREN = etablissements
     .mapToPair(etablissement -> new Tuple2<>(etablissement.getSiret().getSIREN(), etablissement))
     .partitionBy(this.partitionParSIREN)
     .persist(StorageLevel.DISK_ONLY());

  JavaPairRDD<CodeCommune, Etablissement> etablissementsParCodeCommune = etablissements
     .mapToPair(etablissement -> new Tuple2<>(etablissement.getAdresses().get(0).getCodeCommune(), etablissement))
     .partitionBy(this.partitionParCodeCommune)
     .persist(StorageLevel.DISK_ONLY());

  this.donneesEtablissements = new DonneesAttributaires<>();
  this.donneesEtablissements.add(new DonneesAttributairesRDD<>(SIRET.class, etablissementsParSIRET, PreferenceAcces.SIRET));
  this.donneesEtablissements.add(new DonneesAttributairesRDD<>(SIREN.class, etablissementsParSIREN, PreferenceAcces.SIREN));
  this.donneesEtablissements.add(new DonneesAttributairesRDD<>(CodeCommune.class, etablissementsParCodeCommune, PreferenceAcces.CODE_COMMUNE));
  return this.donneesEtablissements;
}

Затем я перечисляю все коды городов во Франции, и после этого один за другим я использую один из этих RDD (код по городам), чтобы провести некоторый интересующий меня анализ, вызывая каждый раз эту функцию для 36 000 городов, которые мыиметь:

/**
* Recenser les activités d'une commune.
* @param anneeCOG Année du code officiel géographique.
* @param annee Année d'examen des entreprises.
* @param commune Commune cible.
* @throws TechniqueException si un incident technique survient. 
*/
public void recenserActivitesCommune(int anneeCOG, int annee, Commune commune) throws TechniqueException {
  Objects.requireNonNull(commune, "La commune dont on veut recenser les activités ne peut pas valoir null.");

  // Déterminer si un traitement a déjà eu lieu. Si c'est le cas, ne pas y procéder.
  if (this.preparationCoucheService.activitesPresentes(commune.getCodeCommune())) {
     LOGGER.info("Les activités de la commune {} ({}) ont déjà été recensées. Commune ignorée.", commune.getNom(), commune.getCodeCommune());
     return;
  }

  LOGGER.info("Début du recensement des activités de la commune {} ({}).", commune.getNom(), commune.getCodeCommune());
  Etablissements etablissements = new Etablissements(etablissementsPresents.getDonneesParCodeCommune().getRdd().lookup(codeCommune));

  // Calculations in memory...
  // Writing a single record in database.
   ...
}

В начале, в первые несколько часов, моя программа работает быстро, тратя на анализ только две-четыре секунды (, если это быстро! Я могу 'На рисунке я запускаю Spark в автономном кластере):

2018-12-11 07:18:09.482  INFO 9384 --- [nio-9090-exec-1] f.m.a.s.a.e.AcquisitionEntrepriseService : D▒but du recensement des activit▒s de la commune Montagne (33290).
2018-12-11 07:18:09.482  INFO 9384 --- [nio-9090-exec-1] f.m.a.s.a.e.AcquisitionEntrepriseService : Collecte des ▒tablissements de la commune 33290...
2018-12-11 07:18:09.482  INFO 9384 --- [nio-9090-exec-1] org.apache.spark.SparkContext            : Starting job: lookup at AcquisitionEntrepriseService.java:295
2018-12-11 07:18:09.482  INFO 9384 --- [uler-event-loop] org.apache.spark.scheduler.DAGScheduler  : Got job 409 (lookup at AcquisitionEntrepriseService.java:295) with 1 output partitions
2018-12-11 07:18:09.482  INFO 9384 --- [uler-event-loop] org.apache.spark.scheduler.DAGScheduler  : Final stage: ResultStage 819 (lookup at AcquisitionEntrepriseService.java:295)
2018-12-11 07:18:09.482  INFO 9384 --- [uler-event-loop] org.apache.spark.scheduler.DAGScheduler  : Parents of final stage: List(ShuffleMapStage 818)
2018-12-11 07:18:09.482  INFO 9384 --- [uler-event-loop] org.apache.spark.scheduler.DAGScheduler  : Missing parents: List()
2018-12-11 07:18:09.482  INFO 9384 --- [uler-event-loop] org.apache.spark.scheduler.DAGScheduler  : Submitting ResultStage 819 (ShuffledRDD[10] at partitionBy at AcquisitionEntrepriseService.java:271), which has no missing parents
2018-12-11 07:18:09.482  INFO 9384 --- [uler-event-loop] o.a.spark.storage.memory.MemoryStore     : Block broadcast_411 stored as values in memory (estimated size 3.3 KB, free 8.8 GB)
2018-12-11 07:18:09.482  INFO 9384 --- [uler-event-loop] o.a.spark.storage.memory.MemoryStore     : Block broadcast_411_piece0 stored as bytes in memory (estimated size 2.0 KB, free 8.8 GB)
2018-12-11 07:18:09.482  INFO 9384 --- [er-event-loop-1] o.apache.spark.storage.BlockManagerInfo  : Added broadcast_411_piece0 in memory on SEVEN-PC:64179 (size: 2.0 KB, free: 8.8 GB)
2018-12-11 07:18:09.482  INFO 9384 --- [uler-event-loop] org.apache.spark.SparkContext            : Created broadcast 411 from broadcast at DAGScheduler.scala:1161
2018-12-11 07:18:09.482  INFO 9384 --- [uler-event-loop] org.apache.spark.scheduler.DAGScheduler  : Submitting 1 missing tasks from ResultStage 819 (ShuffledRDD[10] at partitionBy at AcquisitionEntrepriseService.java:271) (first 15 tasks are for partitions Vector(33))
2018-12-11 07:18:09.482  INFO 9384 --- [uler-event-loop] o.a.spark.scheduler.TaskSchedulerImpl    : Adding task set 819.0 with 1 tasks
2018-12-11 07:18:09.482  INFO 9384 --- [er-event-loop-0] o.apache.spark.scheduler.TaskSetManager  : Starting task 0.0 in stage 819.0 (TID 548, localhost, executor driver, partition 33, PROCESS_LOCAL, 7662 bytes)
2018-12-11 07:18:09.482  INFO 9384 --- [er for task 548] org.apache.spark.executor.Executor       : Running task 0.0 in stage 819.0 (TID 548)
2018-12-11 07:18:09.482  INFO 9384 --- [er for task 548] org.apache.spark.storage.BlockManager    : Found block rdd_10_33 locally
2018-12-11 07:18:13.259  INFO 9384 --- [er for task 548] org.apache.spark.executor.Executor       : Finished task 0.0 in stage 819.0 (TID 548). 241883 bytes result sent to driver
2018-12-11 07:18:13.275  INFO 9384 --- [result-getter-0] o.apache.spark.scheduler.TaskSetManager  : Finished task 0.0 in stage 819.0 (TID 548) in 3793 ms on localhost (executor driver) (1/1)
2018-12-11 07:18:13.275  INFO 9384 --- [result-getter-0] o.a.spark.scheduler.TaskSchedulerImpl    : Removed TaskSet 819.0, whose tasks have all completed, from pool
2018-12-11 07:18:13.275  INFO 9384 --- [uler-event-loop] org.apache.spark.scheduler.DAGScheduler  : ResultStage 819 (lookup at AcquisitionEntrepriseService.java:295) finished in 3,793 s
2018-12-11 07:18:13.275  INFO 9384 --- [nio-9090-exec-1] org.apache.spark.scheduler.DAGScheduler  : Job 409 finished: lookup at AcquisitionEntrepriseService.java:295, took 3,789294 s
2018-12-11 07:18:13.275  INFO 9384 --- [nio-9090-exec-1] f.m.a.s.a.e.AcquisitionEntrepriseService : Fin de la collecte de tous les ▒tablissements de la commune 33290.
2018-12-11 07:18:13.275  INFO 9384 --- [nio-9090-exec-1] f.m.a.s.a.e.AcquisitionEntrepriseService : Activit▒ observ▒e dans la commune : {id physique : null, code commune (id logique) : 33290, nom : Montagne, code APE : 01.62Z, nombre d'entreprises : 1, nombre de salari▒s : 1, date d'observation : 2018-12-11}
2018-12-11 07:18:13.275  INFO 9384 --- [nio-9090-exec-1] f.m.a.s.a.e.AcquisitionEntrepriseService : Activit▒ observ▒e dans la commune : {id physique : null, code commune (id logique) : 33290, nom : Montagne, code APE : 49.32Z, nombre d'entreprises : 1, nombre de salari▒s : 1, date d'observation : 2018-12-11}
2018-12-11 07:18:13.275  INFO 9384 --- [nio-9090-exec-1] f.m.a.s.a.e.AcquisitionEntrepriseService : Activit▒ observ▒e dans la commune : {id physique : null, code commune (id logique) : 33290, nom : Montagne, code APE : 82.92Z, nombre d'entreprises : 1, nombre de salari▒s : 0, date d'observation : 2018-12-11}
...

Иногда я вижу это в конце процесса поиска, я не уверен, имеет ли это значение:

2018-12-11 06:54:31.181  INFO 9384 --- [Context Cleaner] org.apache.spark.ContextCleaner          : Cleaned accumulator 1389
2018-12-11 06:54:31.181  INFO 9384 --- [Context Cleaner] org.apache.spark.ContextCleaner          : Cleaned accumulator 1440
2018-12-11 06:54:31.181  INFO 9384 --- [er-event-loop-0] o.apache.spark.storage.BlockManagerInfo  : Removed broadcast_58_piece0 on SEVEN-PC:64179 in memory (size: 2.0 KB, free: 8.8 GB)
2018-12-11 06:54:31.181  INFO 9384 --- [Context Cleaner] org.apache.spark.ContextCleaner          : Cleaned accumulator 1511
2018-12-11 06:54:31.181  INFO 9384 --- [Context Cleaner] org.apache.spark.ContextCleaner          : Cleaned accumulator 1428
2018-12-11 06:54:31.181  INFO 9384 --- [Context Cleaner] org.apache.spark.ContextCleaner          : Cleaned accumulator 1348
2018-12-11 06:54:31.181  INFO 9384 --- [Context Cleaner] org.apache.spark.ContextCleaner          : Cleaned accumulator 1456

Однако, по прошествии часов, мои программы становятся все медленнее и медленнее.Однажды после этого время, необходимое для сбора информации о городе, возросло до 20 секунд вместо 2 или 4.
Если я остановлюсь и снова начну программу, она снова запустится на полной скорости.

Что можетбыть причинами этого замедления?Похоже, что он хранил некоторые данные где-то в памяти (но мое использование памяти не растет) и делал все больше и больше последовательного доступа для чего-то ... но почему и где?

1) Нужно ли что-то делать, особенно после каждого поиска, который я пропустил?

2) Я новичок в Spark, поэтому я создаю три вида разделов изодни и те же данные правильные или неправильные?

3) Есть ли способ, с помощью которого я могу отследить любую причину медлительности с помощью веб-интерфейса spark или просмотреть статистику продолжительности для всех выполненных поисков ()?

С уважением,

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...