Поток метода Save () безопасен или нет в django - PullRequest
0 голосов
/ 11 октября 2019

Безопасен ли метод сохранения в django или нет. Это означает, что если у меня несколько потоков, будет ли метод save () мешать данным, отправленным каждому из методов save ()? Столкнулись с этой проблемой при запуске метода save () в нескольких потоках. Я заметил, что некоторые значения полей взаимозаменяемы, что означает, что значение поля метода сохранения одного потока перешло к значению поля другого потока. Есть ли способ справиться с этой ситуацией?

from project.views import start_new_thread
from django.db import connection
@start_new_thread
def site(site,loop):
 try:
 from bs4 import BeautifulSoup
 from django.db.models import Max
 import re
 import pytz
 from datetime import datetime
 from .sentiment_analysis import calculate_sentiment
 import aiohttp
 import asyncio
 import async_timeout
 from project.models import thread,post
 import dateparser
 href=[]
 link=set()
 tthread=[]
 author={}

 def convert_si_to_number(x):
      total_stars = 0
      if 'K' in x:
         if len(x) > 1:
             total_stars = float(x.replace('K', '')) * 1000 # convert k to a thousand
      elif 'M' in x:
          if len(x) > 1:
              total_stars = float(x.replace('M', '')) * 1000000 # convert M to a million
      elif 'B' in x:
          total_stars = float(x.replace('B', '')) * 1000000000 # convert B to a Billion
      else:
          total_stars = int(x) # Less than 1000

      return int(total_stars)

async def fetch(session, url):
    async with async_timeout.timeout(30):
        async with session.get(url) as response:
            return await response.text()

async def forum(response,lockhref,lockthread):
   soup = BeautifulSoup(response,'html5lib')
   table = soup.findAll('a',href=re.compile("forums/"))

   for row in table:
       url ='site.com'+row['href']
       async with lockhref:
            if url not in link:
                 href.append(url)
                 link.add(url)

   Th = soup.findAll('div',{'class':re.compile('structItem structItem--thread')})
   for t in Th:
       json_data={}
       divs=t.findAll("div")
       url="site.com"+(divs[2].div.find('a',href=re.compile('threads/')))['href']
       json_data["id"]=url
       dl=divs[5].findAll("dl")
       json_data["views"]=convert_si_to_number(dl[1].dd.text)
       f=thread.objects.filter(id=url)
       async with lockthread:
            if url not in link:
                 link.add(url)
                 if not len(f):
                      tthread.append(url)
                 else:
                      try:
                           if f[0].posts<convert_si_to_number(dl[0].dd.text)+1:
                                    tthread.append(url)
                      except:
                           if f[0].posts<1:
                                      tthread.append(url)       
       json_data["thread_title"]=divs[2].div.a.text
       json_data["site"]="site.com"
       json_data["posts"]=0
       json_data["timestamp_ms"]=None
       json_data["author"]=None
       json_data["date_created"]=None
       if not len(f):              

    t=thread(id=json_data["id"],thread_title=json_data["thread_title"],posts=json_data["posts"]                  ,views=json_data["views"],site=json_data["site"],timestamp_ms=json_data["timestamp_ms"],              
  author=json_data["author"],date_created=json_data["date_created"])

            t.save()

       elif f[0].views<json_data["views"]:                                                                                                      
thread.objects.filter(id=json_data["id"]).update(views=json_data["views"])           

  async def showthread(url,j,lock):
     async with aiohttp.ClientSession() as session:
      try:
          response = await fetch(session, url)
          await threa(response,lock,url)

      except (aiohttp.ClientConnectionError,aiohttp.ClientConnectorError):
          print("Going to sleep for 5min")
          await asyncio.sleep(300)
          await showthread(url,j,lock)                   

      except asyncio.TimeoutError :
          print("Timeout Retrying")
          await showthread(url,j,lock)   

      except Exception as e:
          if j<2:
              j+=1
              print("error "+url+" "+str(e))
              await showthread(url,j,lock)

async def threa(response,lock,url):
   soup = BeautifulSoup(response, 'html5lib')
   table = soup.findAll('a',href=re.compile(url+"page-\d+"))
   for row in table:
        async with lockthread:
            if "site.com"+row["href"] not in link:
                tthread.append("site.com"+row["href"])
                link.add("site.com"+row["href"]) 

   table = soup.findAll('div',{'class':'message-inner'})
   match=re.finditer("page",url)
   index=-1
   for m in match:
         index=m.start()
   if index==-1:
         id=url
   else:
         id=url[:index]

   count=0
   for t in table:
        count=count+1
        json_data={}
        h4=t.find("h4",{'class':'message-name'})
        try:
            json_data["screen_name"]=h4.text
        except:
            json_data["screen_name"]="None"
        div=t.find('div',{'class':'message-attribution-main'})
        try:
           json_data["created_at"]=dateparser.parse(' '.join(div.text.split()))
           json_data["created_at"]=pytz.utc.localize(json_data["created_at"])
        except Exception as e:
           print(str(e)) 
           json_data['created_at']=datetime(1970, 1, 1, 1, 1, 1, 0,pytz.UTC)
        json_data['timestamp_ms']= datetime.timestamp(json_data['created_at'])

        div=t.find('div',{'class':'bbWrapper'})
        try:
            full_text=''.join((div.text).split())
        except:
            full_text=''
        text,sentiment=calculate_sentiment('ar',full_text)
        json_data['sentiment_analysis']=sentiment
        json_data["text"]=full_text
        json_data["cleaned_text"]=text.split()
        json_data["hashtags"]=''
        json_data["id"]=id
        try:
            ul=t.find('ul',{'class':re.compile('message-attribution-opposite')})
            li=ul.find('a',{'class':'qimahostma'})
            no=int((li.text).replace("#",''))
        except:
            f=post.objects.filter(link=id)
            if not len(f):
                 no=1
            else:
                 max=f.aggregate(Max('no'))
                 no=max['no__max']+1

        if int(no)==1:
               json_data["quoted_screen_name"]=''
               json_data["is_quote"]=False
               author[url]=json_data["screen_name"]
               thread_existing,created=thread.objects.get_or_create(id=json_data["id"])
               if created:
                   thread_existing.date_created=json_data["created_at"]
                   thread_existing.timestamp=json_data["timestamp_ms"]
                   thread_existing.author=json_data["screen_name"]
                   thread_existing.thread_title=(soup.find('h1',{'class':'p-title-value'})).text
                   thread_existing.posts=0
                   thread_existing.views=-1
                   thread_existing.site="site.com"
                   thread_existing.save()
               else:
                   thread_existing.thread_title=thread_existing.thread_title
                   thread_existing.posts=thread_existing.posts
                   thread_existing.views=thread_existing.views
                   thread_existing.site="site.com"
                   thread_existing.date_created=json_data["created_at"]
                   thread_existing.timestamp=json_data["timestamp_ms"]
                   thread_existing.author=json_data["screen_name"]
                   thread_existing.save()
        else:
               json_data["quoted_screen_name"]=author[url]
               json_data["is_quote"]=True

        json_data["no"]=int(no)
        json_data["site"]="site.com"
        try:
              p=post(link=json_data["id"],no=json_data["no"],created_at=json_data["created_at"],hashtags=[],text=json_data["text"],cleaned_text=json_data["cleaned_text"]
              ,sentiment_analysis=json_data["sentiment_analysis"],quoted_screen_name=json_data["quoted_screen_name"],is_quote=json_data["is_quote"],site=json_data["site"],
              timestamp_ms=json_data["timestamp_ms"],screen_name=json_data["screen_name"])
              p.save()
        except Exception as e:
              print(e)

   if count>0:  
           t=thread.objects.get(id=id)
           t.posts=t.posts+count
           t.save()


async def scrapping(url,j,lockhref,lockthread):
   async with aiohttp.ClientSession() as session:
      try:
          response = await fetch(session, url)
          await forum(response,lockhref,lockthread)
      except (aiohttp.ClientConnectionError,aiohttp.ClientConnectorError):
          print("Going to sleep for 5min")
          await asyncio.sleep(300)
          await scrapping(url,j,lockhref,lockthread)        
      except asyncio.TimeoutError :
          print("Timeout Retrying")
          await scrapping(url,j,lockhref,lockthread)           
      except Exception as e:
           if j<2:
              j+=1
              print("error "+url+" "+str(e))
              await scrapping(url,j,lockhref,lockthread)

href.append("site.com/index.php")
link.add("site.com/index.php")
asyncio.set_event_loop(loop)
lockhref = asyncio.Lock()
lockthread=asyncio.Lock()
no_of_concurrent_connections=50
i=0
while i<len(href):
  if i+no_of_concurrent_connections<len(href):
       tasks=[loop.create_task(scrapping(href[j],0,lockhref,lockthread)) for j in range(i,i+no_of_concurrent_connections)]
       i+=no_of_concurrent_connections
  else:
     tasks=[loop.create_task(scrapping(href[j],0,lockhref,lockthread)) for j in range(i,len(href))]
       i=len(href)    
  loop.run_until_complete(asyncio.gather(*tasks))

i=0
while i<len(tthread):
  if i+no_of_concurrent_connections<len(tthread):
       tasks=[loop.create_task(showthread(tthread[j],0,lockthread)) for j in range(i,i+no_of_concurrent_connections)]
       i+=no_of_concurrent_connections

  else:
       tasks=[loop.create_task(showthread(tthread[j],0,lockthread)) for j in range(i,len(tthread))]
       i=len(tthread)    
  loop.run_until_complete(asyncio.gather(*tasks))

 finally:
      print('ended')
      connection.close()

Эти темы создаются с использованием декоратора

def start_new_thread(function):
    def decorator(*args, **kwargs):
        name=str(args[0])
        t = Thread(name=name,target = function, args=args, kwargs=kwargs,daemon=True)
        list[name]=t
        t.start()
    return decorator

1 Ответ

0 голосов
/ 23 октября 2019

Исправлена ​​проблема с закрытием соединения всякий раз, когда я создавал новый процесс или поток, а затем в новом процессе django автоматически создавал новое соединение всякий раз, когда это необходимо

...