Перейти к следующему элементу в списке, переданному в качестве аргумента функции map () - PullRequest
0 голосов
/ 25 января 2020

Работа с пулом процессов Мне нужно пропустить элемент в списке, переданный функции карты, и перейти к следующему элементу в списке, причем тот же процесс или следующий процесс продолжают выполнять функцию.

Функция передана процессу Пул. Обход функции, если выбранная строка заблокирована, и переход к следующему элементу в списке - src_dest_paths

def do_something(src_dest_paths):
destination = str(src_dest_paths[1])
try:
      logger("Distcp running for "+destination_formatted+". Updating table for status 'RUNNING'")
      con = get_connection()
      con.set_isolation_level(1)
      logger("Database opened successfully")
      with closing(con.cursor()) as cur:
        cur.execute("BEGIN;")
        cur.execute("update " + info_table + " set status = 'RUNNING' where status not in ('RUNNING', 'FINISHED') and ctid in (select ctid from " +info_table+ " where destination_path ='"+destination_formatted+"' and status not in ('RUNNING', 'FINISHED') order by destination_db_name, destination_table_name, destination_path for update skip locked) returning destination_path;")

 #This is where I want the function to break and move to the next item in the list
 if cur.rowcount == 0:
    logger("LOCKED: "+destination_formatted!+" SKIP to next destination_path")

        logger("Updated table successfully")
        time.sleep(10)
        status = 0
        if status == 0:
          logger("Distcp completed for "+destination_formatted+". Updating table for status 'FINISHED'")
          last_updated_t = str(datetime.now())
          cur.execute("update " + info_table + " set status='FINISHED',last_updated_time='"+ last_updated_t +"' where destination_path = '" + destination_formatted + "';")
          cur.execute("COMMIT;")
          logger("Updated table successfully")

        else:
          logger("Distcp failed for "+destination_formatted+". Updating table for status 'FAILED'")
          last_updated_t = str(datetime.now())
          cur.execute("update " + info_table + " set status='FAILED',last_updated_time='"+ last_updated_t +"' where destination_path = '" + destination_formatted + "';")
          cur.execute("COMMIT;")
          logger("Updated table successfully")
    except Exception as e:
      logger("Error in connection to DB!!")
      logger(e)
    finally:
      if con is not None:
        con.close()

Основная функция, которая вызывает функцию карты со списком [(источник назначения) ]

main()
p = multiprocessing.Pool(NUM_PROCESSES)
l = p.map(do_something, src_dest_paths)
p.close()
p.join()

Любое руководство или подсказка высоко ценится.

...