Я разрабатываю приложение, которое отправляет данные на сервер flask. Затем flask вставляет полученное в поиск elasti c. Перед вставкой в поиск elasti c он проверит, существует ли идентификатор или нет. Если идентификатор существует, он обновляется или вставляется в индекс.
Пример кода:
from flask import Flask
from flask import jsonify, request
import jwt
from elasticsearch import Elasticsearch
app = Flask(__name__)
@app.route('/test',methods=['POST'])
def hello():
try:
id = request.form['id']
database = "sample"
es =Elasticsearch("localhost",port = 9200)
cols=es.search(index=database, body={ "query": { "match": { "id": id}}})
present =False
if cols['hits']['hits']:
x1=cols['hits']['hits'][0]['_source']
eid = cols['hits']['hits'][0]['_id']
present =True
if present == False:
newvalues = {"url":"hello",'id':id}
es.index(index=database, doc_type="logs", body=newvalues)
else: #if already there append data
newvalues ={}
es.update(index=database,doc_type='logs',id=eid,body={"doc":newvalues})
return jsonify({'status': 'success'})
except jwt.InvalidTokenError as e:
print(e)
return jsonify({'success': 'false', 'message': 'Invalid Token!!!'})
if __name__=="__main__":
try:
app.run(host="localhost",port=5005,debug=True,processes =1)
except Exception as e:
print("exception in test",e)
Проблема здесь в том, что запросы отправляются каждые 5 секунд с внешнего интерфейса. Поэтому иногда возникает конфликт (ie) всякий раз, когда запрос получен с идентификатором, и в то же время происходит процесс вставки идентификатора. Второй запрос предполагает, что идентификатор не присутствует в базе данных, поэтому он также вставляет, что превращает 2 данных с одинаковым идентификатором в индекс. Что я должен сделать, чтобы вставить по одному, а другой должен подождать?
python - 3,6
Отредактировано: пробовал с использованием семафора:
from flask import Flask
from flask import jsonify, request
import jwt
from elasticsearch import Elasticsearch
import threading
sLock = threading.Semaphore()
app = Flask(__name__)
@app.route('/test',methods=['POST'])
def hello():
sLock.acquire()
try:
id = request.form['id']
database = "sample"
es =Elasticsearch("localhost",port = 9200)
cols=es.search(index=database, body={ "query": { "match": { "id": id}}})
present =False
if cols['hits']['hits']:
x1=cols['hits']['hits'][0]['_source']
eid = cols['hits']['hits'][0]['_id']
present =True
if present == False:
newvalues = {"url":"hello",'id':id}
es.index(index=database, doc_type="logs", body=newvalues)
else: #if already there append data
newvalues ={}
es.update(index=database,doc_type='logs',id=eid,body={"doc":newvalues})
sLock.release()
return jsonify({'status': 'success'})
except jwt.InvalidTokenError as e:
print(e)
return jsonify({'success': 'false', 'message': 'Invalid Token!!!'})
if __name__=="__main__":
try:
app.run(host="localhost",port=5005,debug=True,processes =1)
except Exception as e:
print("exception in test",e)
Заранее спасибо!