Смешивание запросов на вставку в потоках в Django - PullRequest
0 голосов
/ 06 октября 2019

Разработал сайт django, на котором запущен некоторый очищающий код. Я заметил проблему, заключающуюся в том, что когда два очищающих кода выполняются параллельно (в разных потоках), запросы на вставку в модель django смешиваются, например, в одном потоке, если у меня есть значение a = 1, b = 1, а в другом -имеют значение a = 2, b = 2, значение, сохраненное в базе данных, иногда (1 из 1000 и ухудшается при увеличении потоков) a = 1, b = 2. Проверили значения перед отправкой в ​​метод model.save (), значения в порядке, но после этого искажаются. Я использую django 2.2 и для интеграции mongodb использовал djongo. Также для поддержки индекса эластичного поиска я использую djangoasticsearch dsl. Не знаю, на каком этапе запрос искажается. Также в каждом потоке используется асинхронный поток. Пример моего сценария:

from project.views import start_new_thread
@start_new_thread
def site(site,loop):
 try:
  import pytz
  from urllib import parse
  from bs4 import BeautifulSoup
  import re
  from project.models import thread,post
  from datetime import datetime
  from .sentiment_analysis import calculate_sentiment
  import aiohttp
  import asyncio
  import async_timeout

  href=[]
  tthread=[]
  link=set()
  author={}

  async def fetch(session, url):
      async with async_timeout.timeout(30):
          async with session.get(url) as response:
              return await response.text()

  async def forum(response,lockhref,lockthread):
     soup = BeautifulSoup(response,'html5lib')
     table = soup.findAll('a',href=re.compile("^forumdisplay.php"))
     for row in table:
           url ='http://somesite.com' 
           async with lockhref:
                 if url not in link:
                       link.add(url)
                       href.append(url)


 Th = soup.findAll('tbody',id=re.compile("^threadbits_forum_"))
 for t in Th:
     Trs=t.findAll('tr')
     for tr in  Trs:
         td = tr.findAll('td')
         if len(td)>5:
          param=parse.parse_qs(parse.urlsplit(td[2].div.a["href"]).query)
          if 't' in param:
             f=thread.objects.filter(id='somesite.com'
                 url ='http://somesite.com'
                 async with lockthread:
                      if url not in link:
                           link.add(url)
                           if not len(f):
                               tthread.append(url)
                           else:
                              try:
                                  if 
                          f[0].posts<int((td[4].text).replace(',', ''))+1:
                                      tthread.append(url)
                              except:
                                  if f[0].posts<1:

             json_data={}
             json_data["thread_title"]=td[2].div.a.text
             json_data["id"]='http://somesite.com'                 
             json_data["posts"]=0
             json_data["site"]="site.com"
             try:
                json_data["views"]=int((td[5].text).replace(',', ''))
             except:
                json_data["views"]=0
             json_data["timestamp_ms"]=None
             json_data["author"]=None
             json_data["date_created"]=None
             try:
                 if not len(f):       t=thread(id=json_data["id"],thread_title=json_data["thread_title"],posts=json_data["posts"]   ,views=json_data["views"],site=json_data["site"],timestamp_ms=json_data["timestamp_ms"],              author=json_data["author"],date_created=json_data["date_created"])
                     t.save()
                 elif f[0].views<json_data["views"]:
                      thread.objects.filter(id=json_data["id"]).update(views=json_data["views"])
             except Exception as e:
                 print(str(e))


  async def showthread(url,j,lock):
     async with aiohttp.ClientSession() as session:
          try:
              response = await fetch(session, url)
              await threa(response,lock,url)
          except asyncio.TimeoutError :
              print("Timeout Retrying")
              await showthread(url,j,lock)

          except Exception as e:
              if j<2:
                  j+=1
                  print("error "+url+" "+str(e))
                  await showthread(url,j,lock)

  async def threa(response,lock,url):
          soup = BeautifulSoup(response, 'html5lib')
          posts = soup.findAll('div',attrs={'class':'component'})
          count=0
          for pst in posts:
            count+=1
            json_data={}
            tds = pst.findAll('td')
            date=tds[0].text
            date=' '.join(date.split())
            try:
               json_data['created_at']=datetime.strptime(date, '%m-%d-%Y')                    
json_data["created_at"]=pytz.utc.localize(json_data["created_at"])
            except:
               json_data['created_at']=datetime.datetime(1970, 1, 1, 1, 1, 1, 0,pytz.UTC)

            json_data['timestamp_ms']= datetime.timestamp(json_data['created_at'])
            no=(tds[1].a['name'])
            full_text=' '.join((tds[3].find('div',id=re.compile("post_message")).text).split())
            text,sentiment=calculate_sentiment('ar',full_text)
            json_data['sentiment_analysis']=sentiment
            json_data["text"]=full_text
            json_data["cleaned_text"]=text.split()
            try:
               name=(tds[2].fieldset.div.a.text)
               json_data["screen_name"]=name
            except:
               name=(' '.join(tds[2].fieldset.div.text.split()))
               json_data["screen_name"]=name
            json_data["id"]=id
            if int(no)==1:
               json_data["quoted_screen_name"]=''
               json_data["is_quote"]=False
               author[id]=name
               thread_existing,created=thread.objects.get_or_create(id=json_data["id"])
               if created:
                   thread_existing.date_created=json_data["created_at"]
                   thread_existing.timestamp=json_data["timestamp_ms"]
                   thread_existing.author=json_data["screen_name"]
                   thread_existing.thread_title=(tds[3].find('div',{'class':re.compile('smallfont')})).text
                   thread_existing.posts=0
                   thread_existing.views=-1,
                   thread_existing.site="site.com"
                   thread_existing.save()
               else:
                   thread_existing.date_created=json_data["created_at"]
                   thread_existing.timestamp=json_data["timestamp_ms"]
                   thread_existing.author=json_data["screen_name"]
                   thread_existing.save()
            else:
               json_data["quoted_screen_name"]=author[id]
               json_data["is_quote"]=True

            json_data["no"]=int(no)
            json_data["site"]="site.com"
            try:
               p=post(link=json_data["id"],no=json_data["no"],created_at=json_data["created_at"],hashtags=[],text=json_data["text"],cleaned_text=json_data["cleaned_text"]
        ,sentiment_analysis=json_data["sentiment_analysis"],quoted_screen_name=json_data["quoted_screen_name"],is_quote=json_data["is_quote"],site=json_data["site"],
        timestamp_ms=json_data["timestamp_ms"],screen_name=json_data["screen_name"])
               p.save()
            except Exception as e:
                print(str(e))
      if count>0:
           t=thread.objects.get(id=id)
           t.posts=t.posts+count
           t.save()


  async def scrapping(url,j,lockhref,lockthread):
     async with aiohttp.ClientSession() as session:
          try:
              response = await fetch(session, url)
              await forum(response,lockhref,lockthread)

          except asyncio.TimeoutError :
              print("Timeout Retrying")
              await scrapping(url,j,lockhref,lockthread)

          except Exception as e:
               if j<2:
                  j+=1
                  print("error "+url+" "+str(e))
                  await scrapping(url,j,lockhref,lockthread)
  href.append("http://somesite.com")
  link.add("http://somesite.com")
  asyncio.set_event_loop(loop)
  lockhref = asyncio.Lock()
  lockthread=asyncio.Lock()
  no_of_concurrent_connections=100
  i=0
  while i<len(href):
      if i+no_of_concurrent_connections<len(href):
       tasks= [loop.create_task(scrapping(href[j],0,lockhref,lockthread)) for j in range(i,i+no_of_concurrent_connections)]
       i+=no_of_concurrent_connections
      else:
       tasks=[loop.create_task(scrapping(href[j],0,lockhref,lockthread)) for j in range(i,len(href))]
       i=len(href)
  loop.run_until_complete(asyncio.gather(*tasks))
i=0
while i<len(tthread):
  if i+no_of_concurrent_connections<len(tthread):
       tasks=[loop.create_task(showthread(tthread[j],0,lockthread)) for j in range(i,i+no_of_concurrent_connections)]
       i+=no_of_concurrent_connections

  else:
       tasks=[loop.create_task(showthread(tthread[j],0,lockthread)) for j in range(i,len(tthread))]
       i=len(tthread)
  loop.run_until_complete(asyncio.gather(*tasks))

finally:
    print('ended')
...