Разработал сайт django, на котором запущен некоторый очищающий код. Я заметил проблему, заключающуюся в том, что когда два очищающих кода выполняются параллельно (в разных потоках), запросы на вставку в модель django смешиваются, например, в одном потоке, если у меня есть значение a = 1, b = 1, а в другом -имеют значение a = 2, b = 2, значение, сохраненное в базе данных, иногда (1 из 1000 и ухудшается при увеличении потоков) a = 1, b = 2. Проверили значения перед отправкой в метод model.save (), значения в порядке, но после этого искажаются. Я использую django 2.2 и для интеграции mongodb использовал djongo. Также для поддержки индекса эластичного поиска я использую djangoasticsearch dsl. Не знаю, на каком этапе запрос искажается. Также в каждом потоке используется асинхронный поток. Пример моего сценария:
from project.views import start_new_thread
@start_new_thread
def site(site,loop):
try:
import pytz
from urllib import parse
from bs4 import BeautifulSoup
import re
from project.models import thread,post
from datetime import datetime
from .sentiment_analysis import calculate_sentiment
import aiohttp
import asyncio
import async_timeout
href=[]
tthread=[]
link=set()
author={}
async def fetch(session, url):
async with async_timeout.timeout(30):
async with session.get(url) as response:
return await response.text()
async def forum(response,lockhref,lockthread):
soup = BeautifulSoup(response,'html5lib')
table = soup.findAll('a',href=re.compile("^forumdisplay.php"))
for row in table:
url ='http://somesite.com'
async with lockhref:
if url not in link:
link.add(url)
href.append(url)
Th = soup.findAll('tbody',id=re.compile("^threadbits_forum_"))
for t in Th:
Trs=t.findAll('tr')
for tr in Trs:
td = tr.findAll('td')
if len(td)>5:
param=parse.parse_qs(parse.urlsplit(td[2].div.a["href"]).query)
if 't' in param:
f=thread.objects.filter(id='somesite.com'
url ='http://somesite.com'
async with lockthread:
if url not in link:
link.add(url)
if not len(f):
tthread.append(url)
else:
try:
if
f[0].posts<int((td[4].text).replace(',', ''))+1:
tthread.append(url)
except:
if f[0].posts<1:
json_data={}
json_data["thread_title"]=td[2].div.a.text
json_data["id"]='http://somesite.com'
json_data["posts"]=0
json_data["site"]="site.com"
try:
json_data["views"]=int((td[5].text).replace(',', ''))
except:
json_data["views"]=0
json_data["timestamp_ms"]=None
json_data["author"]=None
json_data["date_created"]=None
try:
if not len(f): t=thread(id=json_data["id"],thread_title=json_data["thread_title"],posts=json_data["posts"] ,views=json_data["views"],site=json_data["site"],timestamp_ms=json_data["timestamp_ms"], author=json_data["author"],date_created=json_data["date_created"])
t.save()
elif f[0].views<json_data["views"]:
thread.objects.filter(id=json_data["id"]).update(views=json_data["views"])
except Exception as e:
print(str(e))
async def showthread(url,j,lock):
async with aiohttp.ClientSession() as session:
try:
response = await fetch(session, url)
await threa(response,lock,url)
except asyncio.TimeoutError :
print("Timeout Retrying")
await showthread(url,j,lock)
except Exception as e:
if j<2:
j+=1
print("error "+url+" "+str(e))
await showthread(url,j,lock)
async def threa(response,lock,url):
soup = BeautifulSoup(response, 'html5lib')
posts = soup.findAll('div',attrs={'class':'component'})
count=0
for pst in posts:
count+=1
json_data={}
tds = pst.findAll('td')
date=tds[0].text
date=' '.join(date.split())
try:
json_data['created_at']=datetime.strptime(date, '%m-%d-%Y')
json_data["created_at"]=pytz.utc.localize(json_data["created_at"])
except:
json_data['created_at']=datetime.datetime(1970, 1, 1, 1, 1, 1, 0,pytz.UTC)
json_data['timestamp_ms']= datetime.timestamp(json_data['created_at'])
no=(tds[1].a['name'])
full_text=' '.join((tds[3].find('div',id=re.compile("post_message")).text).split())
text,sentiment=calculate_sentiment('ar',full_text)
json_data['sentiment_analysis']=sentiment
json_data["text"]=full_text
json_data["cleaned_text"]=text.split()
try:
name=(tds[2].fieldset.div.a.text)
json_data["screen_name"]=name
except:
name=(' '.join(tds[2].fieldset.div.text.split()))
json_data["screen_name"]=name
json_data["id"]=id
if int(no)==1:
json_data["quoted_screen_name"]=''
json_data["is_quote"]=False
author[id]=name
thread_existing,created=thread.objects.get_or_create(id=json_data["id"])
if created:
thread_existing.date_created=json_data["created_at"]
thread_existing.timestamp=json_data["timestamp_ms"]
thread_existing.author=json_data["screen_name"]
thread_existing.thread_title=(tds[3].find('div',{'class':re.compile('smallfont')})).text
thread_existing.posts=0
thread_existing.views=-1,
thread_existing.site="site.com"
thread_existing.save()
else:
thread_existing.date_created=json_data["created_at"]
thread_existing.timestamp=json_data["timestamp_ms"]
thread_existing.author=json_data["screen_name"]
thread_existing.save()
else:
json_data["quoted_screen_name"]=author[id]
json_data["is_quote"]=True
json_data["no"]=int(no)
json_data["site"]="site.com"
try:
p=post(link=json_data["id"],no=json_data["no"],created_at=json_data["created_at"],hashtags=[],text=json_data["text"],cleaned_text=json_data["cleaned_text"]
,sentiment_analysis=json_data["sentiment_analysis"],quoted_screen_name=json_data["quoted_screen_name"],is_quote=json_data["is_quote"],site=json_data["site"],
timestamp_ms=json_data["timestamp_ms"],screen_name=json_data["screen_name"])
p.save()
except Exception as e:
print(str(e))
if count>0:
t=thread.objects.get(id=id)
t.posts=t.posts+count
t.save()
async def scrapping(url,j,lockhref,lockthread):
async with aiohttp.ClientSession() as session:
try:
response = await fetch(session, url)
await forum(response,lockhref,lockthread)
except asyncio.TimeoutError :
print("Timeout Retrying")
await scrapping(url,j,lockhref,lockthread)
except Exception as e:
if j<2:
j+=1
print("error "+url+" "+str(e))
await scrapping(url,j,lockhref,lockthread)
href.append("http://somesite.com")
link.add("http://somesite.com")
asyncio.set_event_loop(loop)
lockhref = asyncio.Lock()
lockthread=asyncio.Lock()
no_of_concurrent_connections=100
i=0
while i<len(href):
if i+no_of_concurrent_connections<len(href):
tasks= [loop.create_task(scrapping(href[j],0,lockhref,lockthread)) for j in range(i,i+no_of_concurrent_connections)]
i+=no_of_concurrent_connections
else:
tasks=[loop.create_task(scrapping(href[j],0,lockhref,lockthread)) for j in range(i,len(href))]
i=len(href)
loop.run_until_complete(asyncio.gather(*tasks))
i=0
while i<len(tthread):
if i+no_of_concurrent_connections<len(tthread):
tasks=[loop.create_task(showthread(tthread[j],0,lockthread)) for j in range(i,i+no_of_concurrent_connections)]
i+=no_of_concurrent_connections
else:
tasks=[loop.create_task(showthread(tthread[j],0,lockthread)) for j in range(i,len(tthread))]
i=len(tthread)
loop.run_until_complete(asyncio.gather(*tasks))
finally:
print('ended')