У меня есть приложение, которое загружает миллионы документов в собрание, используя 30-80 работников для одновременной загрузки данных. Иногда я обнаруживаю, что процесс загрузки не завершился гладко, и с другими базами данных я могу просто удалить таблицу и начать заново, но не с коллекциями Firestore. Я должен перечислить документы и удалить их, и я не нашел способа масштабировать это с той же емкостью, что и процесс загрузки. Что я делаю сейчас, так это то, что у меня есть два метода Flask / Python, размещенных в AppEngine, один для получения страницы из 1000 документов и передачи другому методу для их удаления. Таким образом, процесс создания списка документов не блокируется процессом их удаления. Это все еще занимает дни, чтобы завершить, что слишком долго.
Метод, чтобы получить список документов и создать задачу для их удаления, который является однопоточным:
@app.route('/delete_collection/<collection_name>/<batch_size>', methods=['POST'])
def delete_collection(collection_name, batch_size):
batch_size = int(batch_size)
coll_ref = db.collection(collection_name)
print('Received request to delete collection {} {} docs at a time'.format(
collection_name,
batch_size
))
num_docs = batch_size
while num_docs >= batch_size:
docs = coll_ref.limit(batch_size).stream()
found = 0
deletion_request = {
'doc_ids': []
}
for doc in docs:
deletion_request['doc_ids'].append(doc.id)
found += 1
num_docs = found
print('Creating request to delete docs: {}'.format(
json.dumps(deletion_request)
))
# Add to task queue
queue = tasks_client.queue_path(PROJECT_ID, LOCATION, 'database-manager')
task_meet = {
'app_engine_http_request': { # Specify the type of request.
'http_method': 'POST',
'relative_uri': '/delete_documents/{}'.format(
collection_name
),
'body': json.dumps(deletion_request).encode(),
'headers': {
'Content-Type': 'application/json'
}
}
}
task_response_meet = tasks_client.create_task(queue, task_meet)
print('Created task to delete {} docs: {}'.format(
batch_size,
json.dumps(deletion_request)
))
Вот метод I используйте для удаления документов, которые можно масштабировать. По сути, он обрабатывает только 5-10 за один раз, ограниченный скоростью, с которой другой метод передает страницы doc_ids для удаления. Разделение двух помогает, но не так уж и много.
@app.route('/delete_documents/<collection_name>', methods=['POST'])
def delete_documents(collection_name):
# Validate we got a body in the POST
if flask.request.json:
print('Request received to delete docs from :{}'.format(collection_name))
else:
message = 'No json found in request: {}'.format(flask.request)
print(message)
return message, 400
# Validate that the payload includes a list of doc_ids
doc_ids = flask.request.json.get('doc_ids', None)
if doc_ids is None:
return 'No doc_ids specified in payload: {}'.format(flask.request.json), 400
print('Received request to delete docs: {}'.format(doc_ids))
for doc_id in doc_ids:
db.collection(collection_name).document(doc_id).delete()
return 'Finished'
if __name__ == '__main__':
# Set environment variables for running locally
app.run(host='127.0.0.1', port=8080, debug=True)
Я пытался запустить несколько одновременных выполнений delete_collection (), но не уверен, что это даже помогает, поскольку я не уверен, что каждый раз, когда это происходит ограничивает количество вызовов (batch_size) .stream () для получения отдельного набора документов или, возможно, получения дубликатов.
Как я могу сделать это быстрее?