Celery WorkerLostError Worker преждевременно завершил работу: сигнал 6 (SIGABRT) - PullRequest
1 голос
/ 26 июня 2019

Я хочу, чтобы асинхронная работа выполнялась синхронно. Я использую идентификатор данных в couchbase в качестве ключа Redis. Я снова поддерживаю couchbase со значением redis. Он работает нормально, когда работает синхронно, но не работает, когда выполняется асинхронно.

from couchbase.cluster import Cluster
from couchbase.cluster import PasswordAuthenticator
from couchbase.n1ql import N1QLQuery
from couchbase.n1ql import N1QLRequest
from elasticsearch import Elasticsearch
from time import sleep
from urllib import request
from wikidata.client import Client
from openpyxl import Workbook
import base64
import json
import osmapi
import re
import redis
import urllib
import hashlib
import time
import requests
import csv
from functools import reduce

wikid_client = Client()
es_client = Elasticsearch("--")
cluster = Cluster('couchbase://--')
authenticator = PasswordAuthenticator('ATLAS-DEV','ATLAS-DEV')
cluster.authenticate(authenticator)
bucket_test = cluster.open_bucket('ATLAS-DEV')
redis_db = redis.StrictRedis(host='--', port=6379, db=10, decode_responses=True)
map_api = osmapi.OsmApi()
N1QLQuery.timeout = 3600


def deep_get(dictionary, keys, default=None):
    return reduce(lambda d, key: d.get(key, default) if isinstance(d, dict) else default, keys.split("."), dictionary)

query = N1QLQuery("SELECT meta().id, * FROM `ATLAS-DEV` WHERE class_type = 'REGION' AND (codes.osm IS NOT NULL OR codes.osm != '') LIMIT 20")
for k in bucket_test.n1ql_query(query):
    # print("k : ")
    # print(k)
    document_id = k.get("id")
    # print("document_id : " + document_id)
    osm_id = deep_get(k, "ATLAS-DEV.codes.osm")
    polygon = redis_db.hget(name="osm:polygons", key=osm_id)
    if polygon is not None:
        k['ATLAS-DEV'].update({'boundaries': polygon})
        bucket_test.upsert(document_id, k['ATLAS-DEV'])

это моя синхронная работа. это работает хорошо. может опрокинуть диван-кровать.

но вот мой код работы сельдерея.

from app.jobs.task import each_k
query = N1QLQuery("SELECT meta().id, * FROM `ATLAS-DEV` WHERE class_type = 'REGION' AND (codes.osm IS NOT NULL OR codes.osm != '') LIMIT 5")
for k in bucket_test.n1ql_query(query):
    # print("k : ")
    # print(k)
    each_k.delay(k)




# /app/jobs/task.py

def deep_get(dictionary, keys, default=None):
    return reduce(lambda d, key: d.get(key, default) if isinstance(d, dict) else default, keys.split("."), dictionary)

@app.task
@provide_redis
def each_k(redis_db, k):
    print("레디스 디비 : ")
    print(redis_db)
    document_id = k.get("id")
    print("document_id : " + document_id)
    osm_id = deep_get(k, "ATLAS-DEV.codes.osm")
    polygon = redis_db.hget(name="osm:polygons", key=osm_id)
    print("폴리곤 : ")
    print(polygon)
    if polygon is not None:
        k['ATLAS-DEV'].update({'boundaries': polygon})
        bucket_test.upsert(document_id, k['ATLAS-DEV'])

возвращает

[warn] kevent: Bad file descriptor
python-couchbase: self->nremaining == 0 at src/oputil.c:67. Abort[2019-06-26 15:13:03,675: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 6 (SIGABRT).')
Traceback (most recent call last):
  File "/Users/n18016/Library/Python/3.7/lib/python/site-packages/billiard/pool.py", line 1226, in mark_as_worker_lost
    human_status(exitcode)),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 6 (SIGABRT).
[2019-06-26 15:13:03,701: ERROR/MainProcess] Process 'ForkPoolWorker-11' pid:7697 exited with 'signal 6 (SIGABRT)'
[2019-06-26 15:13:03,702: ERROR/MainProcess] Process 'ForkPoolWorker-10' pid:7696 exited with 'signal 6 (SIGABRT)'
[2019-06-26 15:13:03,703: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 6 (SIGABRT).')
Traceback (most recent call last):
  File "/Users/n18016/Library/Python/3.7/lib/python/site-packages/billiard/pool.py", line 1226, in mark_as_worker_lost
    human_status(exitcode)),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 6 (SIGABRT).
[2019-06-26 15:13:03,704: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 6 (SIGABRT).')
Traceback (most recent call last):
  File "/Users/n18016/Library/Python/3.7/lib/python/site-packages/billiard/pool.py", line 1226, in mark_as_worker_lost
    human_status(exitcode)),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 6 (SIGABRT).

1 Ответ

1 голос
/ 26 июня 2019

https://github.com/celery/celery/issues/4113

celery worker --pool solo ...

эта опция спасет мою жизнь.это работает хорошо.эта команда хорошо выполняет асинхронный однопоточный поток.

Это может быть не ответом для всех ситуаций.Это ответ, если вы хотите сделать это в single thread.

...