Как обрабатывать запросы по одному в flask? - PullRequest
0 голосов
/ 10 марта 2020

Я разрабатываю приложение, которое отправляет данные на сервер flask. Затем flask вставляет полученное в поиск elasti c. Перед вставкой в ​​поиск elasti c он проверит, существует ли идентификатор или нет. Если идентификатор существует, он обновляется или вставляется в индекс.

Пример кода:

from flask import Flask
from flask import  jsonify, request
import jwt
from elasticsearch import Elasticsearch
app = Flask(__name__)
@app.route('/test',methods=['POST'])
def hello():       
    try:         
        id  = request.form['id']
        database = "sample"
        es =Elasticsearch("localhost",port = 9200)
        cols=es.search(index=database,  body={ "query": {  "match": { "id": id}}})
        present =False
        if cols['hits']['hits']:
            x1=cols['hits']['hits'][0]['_source']
            eid = cols['hits']['hits'][0]['_id']
            present =True        
        if present == False:                          
            newvalues = {"url":"hello",'id':id}
            es.index(index=database, doc_type="logs", body=newvalues)      
        else: #if already there append data                        
            newvalues ={}                           
            es.update(index=database,doc_type='logs',id=eid,body={"doc":newvalues})            
        return jsonify({'status': 'success'})
    except jwt.InvalidTokenError  as e:
        print(e)
        return jsonify({'success': 'false', 'message': 'Invalid Token!!!'})
if  __name__=="__main__":
    try:
        app.run(host="localhost",port=5005,debug=True,processes =1)
    except Exception as e:
        print("exception in test",e)

Проблема здесь в том, что запросы отправляются каждые 5 секунд с внешнего интерфейса. Поэтому иногда возникает конфликт (ie) всякий раз, когда запрос получен с идентификатором, и в то же время происходит процесс вставки идентификатора. Второй запрос предполагает, что идентификатор не присутствует в базе данных, поэтому он также вставляет, что превращает 2 данных с одинаковым идентификатором в индекс. Что я должен сделать, чтобы вставить по одному, а другой должен подождать?

python - 3,6

Отредактировано: пробовал с использованием семафора:

from flask import Flask
from flask import  jsonify, request
import jwt
from elasticsearch import Elasticsearch
import threading
sLock = threading.Semaphore()
app = Flask(__name__)
@app.route('/test',methods=['POST'])
def hello(): 
    sLock.acquire()  
    try:         
        id  = request.form['id']
        database = "sample"
        es =Elasticsearch("localhost",port = 9200)
        cols=es.search(index=database,  body={ "query": {  "match": { "id": id}}})
        present =False
        if cols['hits']['hits']:
            x1=cols['hits']['hits'][0]['_source']
            eid = cols['hits']['hits'][0]['_id']
            present =True        
        if present == False:                          
            newvalues = {"url":"hello",'id':id}
            es.index(index=database, doc_type="logs", body=newvalues)      
        else: #if already there append data                        
            newvalues ={}                           
            es.update(index=database,doc_type='logs',id=eid,body={"doc":newvalues})
        sLock.release()            
        return jsonify({'status': 'success'})
    except jwt.InvalidTokenError  as e:
        print(e)
        return jsonify({'success': 'false', 'message': 'Invalid Token!!!'})
if  __name__=="__main__":
    try:
        app.run(host="localhost",port=5005,debug=True,processes =1)
    except Exception as e:
        print("exception in test",e)

Заранее спасибо!

1 Ответ

1 голос
/ 10 марта 2020

Вы можете использовать метод mget и установить временной порог. Таким образом, вы не отправляете один запрос времени, но отправляете запрос со списком идентификаторов - выполните c здесь

from datetime import datetime, timedelta
from elasticsearch import Elasticsearch
from elasticsearch import helpers    

idL = [] # create it before the flask route declaration
threshold = 5 #set a threshold in the same way
now = datetime.now()
delta = timedelta(seconds=30) # set a time threshold of 1 minute

def update(result):
    for success, info in helpers.parallel_bulk(client= es, actions=createUpdateElem(result ):
        if not success:
            print(info)
def index(result):
    for success, info in helpers.parallel_bulk(client= es, actions=createIndexElem(result ):
        if not success:
            print(info)

def createIndexElem(result):
     for elem in result:
         yield {
           '_op_type': 'index',
           '_index': 'database',
           '_id': elem,
           '_source': {'question': 'The life, universe and everything.'}
          }


def createUpdateElem(result):
    for elem in result:
         yield {
           '_op_type': 'update',
           '_index': 'database',
           '_id': elem,
           'doc': {'question': 'The life, universe and everything.'}
          }


def checkResponse(response, idL):
    updateL = []
    for elem in response['docs']:
        if elem['_id'] in idL:
            updateL.append(elem['_id'])
    toBeIndexed = list(set(idL) - set(updateL))
    return toBeIndexed,updateL




def multiget(idL):        
     response = es.mget(index = 'database',body = {'ids': idL})
     doc2BeIndicized = checkResponse(response, idL)
     now = datetime.now()
     idL = []
     return doc2BeIndicized


 @app.route('/test',methods=['POST'])
 def hello():       
    try:  

id  = request.form['id']
idL.append(id)
if len(idL) > threshold:
    result = multiget(idL)
    if result:
        indexed, updated = result
        if updated:
            update(updated)
        if indexed:
            index(indexed)
elif (now + delta) > datetime.now():
    result = multiget(idL)
    if result:
        indexed, updated = result
        if updated:
            update(updated)
        if indexed:
            index(indexed)
else:
     continue

Таким же образом, как вы могли бы индексировать или обновите список документов массовым или параллельным пакетом, который лучше в службе, потому что он использует многопоточность. сделать c здесь . Помните, что вам нужно проанализировать ответ вызова mget, потому что в списке возможно, что только некоторые элементы представлены в es, а другие не

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...