Недостаточно памяти с RAY Python Framework - PullRequest
0 голосов
/ 11 февраля 2020

Я создал простую дистанционную функцию с Ray, которая использует очень мало памяти. Однако после короткого периода времени память постоянно увеличивается, и я получаю исключение RayOutOfMemoryError.

Следующий код является ОЧЕНЬ простым примером этой проблемы. Массив "result_transformed" numpy отправляется рабочим, где каждый работник может выполнить работу над этим. Моя упрощенная функция calc_s Similarity ничего не делает, но ей все еще не хватает памяти. Я добавил к этому методу гораздо более длительное время ожидания, чтобы имитировать выполнение большей работы, но в итоге ему не хватает памяти.

Я работаю на 8-ядерном процессоре Intel 9900K с 32 ГБ оперативной памяти и Ubuntu 19.10 Python это: Intel Python Распределение 3.7.4 numpy 1.17.4 (с Intel MKL)

import numpy as np
from time import sleep
import ray
import psutil

@ray.remote
def calc_similarity(sims, offset):
    # Fake some work for 100 ms.
    sleep(0.10)
    return True

if __name__ == "__main__":
    # Initialize RAY to use all of the processors.
    num_cpus = psutil.cpu_count(logical=False)
    ray.init(num_cpus=num_cpus)

    num_docs = 1000000
    num_dimensions = 300
    chunk_size = 128
    sim_pct = 0.82

    # Initialize the array
    index = np.random.random((num_docs, num_dimensions)).astype(dtype=np.float32)
    index_array = np.arange(num_docs).reshape(1, num_docs)
    index_array_id = ray.put(index_array)

    calc_results = []

    for count, start_doc_no in enumerate(range(0, num_docs, chunk_size)):
        size = min( chunk_size, num_docs - (start_doc_no) + 1 )
        # Get the query vector out of the index.
        query_vector = index[start_doc_no:start_doc_no+size]
        # Calculate the matrix multiplication.
        result_transformed = np.matmul(index, query_vector.T).T
        # Serialize the result matrix out for each client.
        result_id = ray.put(result_transformed)

        # Simulate multi-threading extracting the results of a cosine similarity calculation
        for offset in range(chunk_size):
            calc_results.append(calc_similarity.remote(sims=result_id, offset=offset ))
            # , index_array=index_array_id))
        res = ray.get(calc_results)
        calc_results.clear()

Любая помощь / руководство будет принята с благодарностью.

Ответы [ 2 ]

0 голосов
/ 13 февраля 2020

Спасибо Санг за ваш ответ.

Проблема в том, что g c не работает, потому что пороговые значения по умолчанию не достигаются до того, как у меня не хватит памяти в моей системе на 32 ГБ.

Вызов ray.put (transformed_result) может занимать довольно большой объем памяти (в данном примере это 128 x 1 000 000) или около 0,5 ГБ памяти с использованием float32.

Для Чтобы обойти эту проблему, я создал метод, который выполняет следующие действия, в которых я могу передать пороговое значение в процентах используемой памяти и принудительно вызвать сборщик мусора:

def auto_garbage_collect(pct=80.0):
    if psutil.virtual_memory().percent >= pct:
        gc.collect()

. Циклы обработки ядра устраняют состояние нехватки памяти.

Ситуацию также можно разрешить, изменив настройки порога в сборщике мусора.

gc.set_threshold()

Это очень зависит от задачи и зависит от размера используемых объектов данных, поэтому я чувствовал, что первый метод был лучшим выбором.

Пел, спасибо за ваш подробный ответ! Это было очень полезно и поучительно.

0 голосов
/ 12 февраля 2020

В настоящее время Ray поддерживает подсчет ссылок частично. (Полный подсчет ссылок будет выпущен в ближайшее время). Проще говоря, когда object_id, переданный в удаленную функцию, не сериализован, на него ссылаются так же, как и на Python. Это означает, что если result_transformed является мусором, собранным Python, то result_transformed в хранилище плазмы следует открепить, а когда объект извлекается из LRU, его следует удалить. (Для ясности, закрепленные объекты с некоторым количеством ссылок не выселяются).

Я также предполагаю, что существует некоторый странный подсчет ссылок, такой как циклические ссылки. Я мог убедиться, что result_transformed был выселен, когда я запустил этот скрипт. Итак, я думаю, result_transformed само по себе не является проблемой. Возможных проблем может быть много. В моем случае я обнаружил, что i python создает ссылку на python объекты, когда я использую его для ввода (IN). (Например, когда вы видите значение какого-либо объекта, OUT [число] может иметь ссылку на ваш объект).

In [2]: import psutil 
   ...: import gc 
   ...: import ray 
   ...: from time import sleep 
   ...: import numpy as np 
   ...: @ray.remote 
   ...: def calc_similarity(sims, offset): 
   ...:     # Fake some work for 100 ms. 
   ...:     sleep(0.10) 
   ...:     return True 
   ...:  
   ...: if __name__ == "__main__": 
   ...:     # Initialize RAY to use all of the processors. 
   ...:     num_cpus = psutil.cpu_count(logical=False) 
   ...:     ray.init(num_cpus=num_cpus) 
   ...:  
   ...:     num_docs = 1000000 
   ...:     num_dimensions = 300 
   ...:     chunk_size = 128 
   ...:     sim_pct = 0.82 
   ...:  
   ...:     # Initialize the array 
   ...:     index = np.random.random((num_docs, num_dimensions)).astype(dtype=np.float32) 
   ...:     index_array = np.arange(num_docs).reshape(1, num_docs) 
   ...:     index_array_id = ray.put(index_array) 
   ...:  
   ...:     calc_results = [] 
   ...:     i = 0 
   ...:     for count, start_doc_no in enumerate(range(0, num_docs, chunk_size)): 
   ...:         i += 1 
   ...:         size = min( chunk_size, num_docs - (start_doc_no) + 1 ) 
   ...:         # Get the query vector out of the index. 
   ...:         query_vector = index[start_doc_no:start_doc_no+size] 
   ...:         # Calculate the matrix multiplication. 
   ...:         result_transformed = np.matmul(index, query_vector.T).T 
   ...:         # Serialize the result matrix out for each client. 
   ...:         result_id = ray.put(result_transformed) 
   ...:         if i == 1: 
   ...:             # The first result_id binary number should be stored in result_id_special 
   ...:             # In this way, we can verify if this object id is evicted after filling up our  
   ...:             # plasma store by some random numpy array 
   ...:             # If this object id is not evicted, that means it is pinned, meaning if is  
   ...:             # not properly reference counted. 
   ...:             first_object_id = result_id.binary() 
   ...:         # Simulate multi-threading extracting the results of a cosine similarity calculation 
   ...:         for offset in range(chunk_size): 
   ...:             calc_results.append(calc_similarity.remote(sims=result_id, offset=offset )) 
   ...:             # , index_array=index_array_id)) 
   ...:         res = ray.get(calc_results) 
   ...:         calc_results.clear() 
   ...:         print('ref count to result_id {}'.format(len(gc.get_referrers(result_id)))) 
   ...:         print('Total number of ref counts in a ray cluster. {}'.format(ray.worker.global_worker.core_worker.get_all_reference_counts())) 
   ...:         if i == 5: 
   ...:             break 
   ...:     # It should contain the object id of the  
   ...:     print('first object id: {}'.format(first_object_id)) 
   ...:     print('fill up plasma store by big numpy arrays. This should evict the first_object_id from the plasma store.') 
   ...:     print('because if the data_transformed is garbage collected properly, it should be unpinned from plasma store') 
   ...:     print('and when plasma store is filled by numpy array, first_object_id should be evicted.') 
   ...:     for _ in range(40): 
   ...:         import numpy as np 
   ...:         ray.put(np.zeros(500 * 1024 * 1024, dtype=np.uint8)) 
   ...:     print('total ref count from a ray cluster after eviction: {}'.format(ray.worker.global_worker.core_worker.get_all_reference_counts())) 
   ...:     # this should fail as first_object_id is already evicted 
   ...:     print(ray.get(ray.ObjectID(first_object_id))) 

[ray] Forcing OMP_NUM_THREADS=1 to avoid performance degradation with many workers (issue #6998). You can override this by explicitly setting OMP_NUM_THREADS.
2020-02-12 00:10:11,932 INFO resource_spec.py:212 -- Starting Ray with 4.35 GiB memory available for workers and up to 2.19 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
2020-02-12 00:10:12,273 INFO services.py:1080 -- View the Ray dashboard at localhost:8265
2020-02-12 00:10:18,522 WARNING worker.py:289 -- OMP_NUM_THREADS=1 is set, this may slow down ray.put() for large objects (issue #6998).
ref count to result_id 1
Total number of ref counts in a ray cluster. {ObjectID(ffffffffffffffffffffffff0100008002000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}}
ref count to result_id 1
Total number of ref counts in a ray cluster. {ObjectID(ffffffffffffffffffffffff0100008003000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}}
ref count to result_id 1
Total number of ref counts in a ray cluster. {ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008004000000): {'local': 1, 'submitted': 0}}
ref count to result_id 1
Total number of ref counts in a ray cluster. {ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008005000000): {'local': 1, 'submitted': 0}}
ref count to result_id 1
Total number of ref counts in a ray cluster. {ObjectID(ffffffffffffffffffffffff0100008006000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}}
first object id: b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x01\x00\x00\x80\x02\x00\x00\x00'
fill up plasma store by big numpy arrays. This should evict the first_object_id from the plasma store.
because if the data_transformed is garbage collected properly, it should be unpinned from plasma store
and when plasma store is filled by numpy array, first_object_id should be evicted.
total ref count from a ray cluster after eviction: {ObjectID(ffffffffffffffffffffffff0100008006000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}}
2020-02-12 00:10:57,108 WARNING worker.py:1515 -- Local object store memory usage:
num clients with quota: 0
quota map size: 0
pinned quota map size: 0
allocated bytes: 2092865189
allocation limit: 2347285708
pinned bytes: 520000477
(global lru) capacity: 2347285708
(global lru) used: 67.0078%
(global lru) num objects: 4
(global lru) num evictions: 41
(global lru) bytes evicted: 21446665725

2020-02-12 00:10:57,112 WARNING worker.py:1072 -- The task with ID ffffffffffffffffffffffff0100 is a driver task and so the object created by ray.put could not be reconstructed.
---------------------------------------------------------------------------
UnreconstructableError                    Traceback (most recent call last)
<ipython-input-1-184e5836123c> in <module>
     63     print('total ref count from a ray cluster after eviction: {}'.format(ray.worker.global_worker.core_worker.get_all_reference_counts()))
     64     # this should fail as first_object_id is already evicted
---> 65     print(ray.get(ray.ObjectID(first_object_id)))
     66 

~/work/ray/python/ray/worker.py in get(object_ids, timeout)
   1517                     raise value.as_instanceof_cause()
   1518                 else:
-> 1519                     raise value
   1520 
   1521         # Run post processors.

UnreconstructableError: Object ffffffffffffffffffffffff0100008002000000 is lost (either LRU evicted or deleted by user) and cannot be reconstructed. Try increasing the object store memory available with ray.init(object_store_memory=<bytes>) or setting object store limits with ray.remote(object_store_memory=<bytes>). See also: https://ray.readthedocs.io/en/latest/memory-management.html
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...