Итак, у меня есть этот скрипт на python, который очищает список от определенного URL-адреса craigslist, который создает пользователь (местоположение, максимальная цена, тип элемента и т. Д.). Затем он переходит на URL, очищает информацию о списках (цена, дата размещения и т. Д.) И возвращает три результата. Одним из них является «х» количество предметов вокруг средней цены (пользователь определяет количество товаров и диапазон цен, например, 100 долларов США от средней цены). Далее идут списки шкафов 'x' на основе почтового индекса, предоставленного пользователем в начале (пользователь также определяет количество отображаемых элементов на основе близости к почтовому индексу). Наконец, URL-адрес craigslist выводится пользователю, чтобы он мог зайти на страницу и посмотреть элементы, которые ему отображались ранее. Данные скрапа хранятся в файле data.json и файле data.csv. Содержимое одинаково, только в разных форматах, я хотел бы выгружать эти данные в базу данных каждый раз, когда выполняется скребок. Либо Cloud Firestore, либо AWS DynamoDB, так как я хочу разместить это веб-приложение в будущем
Что я хочу сделать, это разрешить пользователю иметь несколько экземпляров одних и тех же сценариев, все с уникальными URL-адресами Craigslist, работающими на одном и том жевремя. Весь код один и тот же, единственное отличие - это URL-адрес craigslist, который скрипт очищает.
Я создал метод, который повторяет создание атрибутов (местоположение, максимальная цена и т. Д.) И возвращает потерянные URL-адреса, но в моем основном я вызываю конструктор, и ему нужны все эти атрибуты, поэтомуЯ должен ловить это от URL, и это казалось чрезмерным.
Затем я попытался включить цикл в свою главную. пользователь определяет, сколько URL-ссылок он хочет создать, и добавляет завершенные ссылки в список. Снова столкнулся с той же проблемой.
class CraigslistScraper(object):
# Contructor of the URL that is being scraped
def __init__(self, location, postal_code, max_price, query, radius):
self.location = location # Location(i.e. City) being searched
self.postal_code = postal_code # Postal code of location being searched
self.max_price = max_price # Max price of the items that will be searched
self.query = query # Search for the type of items that will be searched
self.radius = radius # Radius of the area searched derived from the postal code given previously
self.url = f"https://{location}.craigslist.org/search/sss?&max_price={max_price}&postal={postal_code}&query={query}&20card&search_distance={radius}"
self.driver = webdriver.Chrome(r"C:\Program Files\chromedriver") # Path of Firefox web driver
self.delay = 7 # The delay the driver gives when loading the web page
# Load up the web page
# Gets all relevant data on the page
# Goes to next page until we are at the last page
def load_craigslist_url(self):
data = []
# url_list = []
self.driver.get(self.url)
while True:
try:
wait = WebDriverWait(self.driver, self.delay)
wait.until(EC.presence_of_element_located((By.ID, "searchform")))
data.append(self.extract_post_titles())
# url_list.append(self.extract_post_urls())
WebDriverWait(self.driver, 2).until(
EC.element_to_be_clickable((By.XPATH, '//*[@id="searchform"]/div[3]/div[3]/span[2]/a[3]'))).click()
except:
break
return data
# Extracts all relevant information from the web-page and returns them as individual lists
def extract_post_titles(self):
all_posts = self.driver.find_elements_by_class_name("result-row")
dates_list = []
titles_list = []
prices_list = []
distance_list = []
for post in all_posts:
title = post.text.split("$")
if title[0] == '':
title = title[1]
else:
title = title[0]
title = title.split("\n")
price = title[0]
title = title[-1]
title = title.split(" ")
month = title[0]
day = title[1]
title = ' '.join(title[2:])
date = month + " " + day
if not price[:1].isdigit():
price = "0"
int(price)
raw_distance = post.find_element_by_class_name(
'maptag').text
distance = raw_distance[:-2]
titles_list.append(title)
prices_list.append(price)
dates_list.append(date)
distance_list.append(distance)
return titles_list, prices_list, dates_list, distance_list
# Gets all of the url links of each listing on the page
# def extract_post_urls(self):
# soup_list = []
# html_page = urllib.request.urlopen(self.driver.current_url)
# soup = BeautifulSoup(html_page, "html.parser")
# for link in soup.findAll("a", {"class": "result-title hdrlnk"}):
# soup_list.append(link["href"])
#
# return soup_list
# Kills browser
def kill(self):
self.driver.close()
# Gets price value from dictionary and computes average
@staticmethod
def get_average(sample_dict):
price = list(map(lambda x: x['Price'], sample_dict))
sum_of_prices = sum(price)
length_of_list = len(price)
average = round(sum_of_prices / length_of_list)
return average
# Displays items around the average price of all the items in prices_list
@staticmethod
def get_items_around_average(avg, sample_dict, counter, give):
print("Items around average price: ")
print("-------------------------------------------")
raw_list = []
for z in range(len(sample_dict)):
current_price = sample_dict[z].get('Price')
if abs(current_price - avg) <= give:
raw_list.append(sample_dict[z])
final_list = raw_list[:counter]
for index in range(len(final_list)):
print('\n')
for key in final_list[index]:
print(key, ':', final_list[index][key])
# Displays nearest items to the zip provided
@staticmethod
def get_items_around_zip(sample_dict, counter):
final_list = []
print('\n')
print("Closest listings: ")
print("-------------------------------------------")
x = 0
while x < counter:
final_list.append(sample_dict[x])
x += 1
for index in range(len(final_list)):
print('\n')
for key in final_list[index]:
print(key, ':', final_list[index][key])
# Converts all_of_the_data list of dictionaries to json file
@staticmethod
def convert_to_json(sample_list):
with open(r"C:\Users\diego\development\WebScraper\data.json", 'w') as file_out:
file_out.write(json.dumps(sample_list, indent=4))
@staticmethod
def convert_to_csv(sample_list):
df = pd.DataFrame(sample_list)
df.to_csv("data.csv", index=False, header=True)
# Main where the big list data is broken down to its individual parts to be converted to a .csv file
также параметры веб-сайта установлены
если name == " main ":
location = input("Enter the location you would like to search: ") # Location Craigslist searches
zip_code = input(
"Enter the zip code you would like to base radius off of: ") # Postal code Craigslist uses as a base for 'MILES FROM ZIP'
type_of_item = input(
"Enter the item you would like to search (ex. furniture, bicycles, cars, etc.): ") # Type of item you are looking for
max_price = input(
"Enter the max price you would like the search to use: ") # Max price Craigslist limits the items too
radius = input(
"Enter the radius you would like the search to use (based off of zip code provided earlier): ") # Radius from postal code Craigslist limits the search to
scraper = CraigslistScraper(location, zip_code, max_price, type_of_item,
radius) # Constructs the URL with the given parameters
results = scraper.load_craigslist_url() # Inserts the result of the scrapping into a large multidimensional list
titles_list = results[0][0]
prices_list = list(map(int, results[0][1]))
dates_list = results[0][2]
distance_list = list(map(float, results[0][3]))
scraper.kill()
# Merge all of the lists into a dictionary
# Dictionary is then sorted by distance from smallest -> largest
list_of_attributes = []
for i in range(len(titles_list)):
content = {'Listing': titles_list[i], 'Price': prices_list[i], 'Date posted': dates_list[i],
'Distance from zip': distance_list[i]}
list_of_attributes.append(content)
list_of_attributes.sort(key=lambda x: x['Distance from zip'])
scraper.convert_to_json(list_of_attributes)
scraper.convert_to_csv(list_of_attributes)
# scraper.export_to_mongodb()
# Below function calls:
# Get average price and prints it
# Gets/prints listings around said average price
# Gets/prints nearest listings
average = scraper.get_average(list_of_attributes)
print(f'Average price of items searched: ${average}')
num_items_around_average = int(input("How many listings around the average price would you like to see?: "))
avg_range = int(input("Range of listings around the average price: "))
scraper.get_items_around_average(average, list_of_attributes, num_items_around_average, avg_range)
print("\n")
num_items = int(input("How many items would you like to display based off of proximity to zip code?: "))
print(f"Items around you: ")
scraper.get_items_around_zip(list_of_attributes, num_items)
print("\n")
print(f"Link of listings : {scraper.url}")
То, что я хочу, это программа, чтобы получить количество URL-адресов, которые пользователь хочет очистить. Этот вход будет определять количество экземпляров этого скрипта, который должен быть запущен.
Затем пользователь будет запускать подсказку каждого скребка, например указывать URL («в каком месте вы хотите искать ?:»). После того, как они закончат создавать URL-адреса, каждый скребок будет запускаться со своим конкретным URL-адресом и отображать три выходных сигнала, описанных выше, для URL-адреса, который был назначен шабером.
В будущем я хотел бы добавить функцию времени, и пользователь определяет, как часто он хочет запускать скрипт (каждый час, каждый день, каждый день и т. Д.). Подключитесь к базе данных и вместо этого просто запросите из базы данных «x» # списков вокруг среднего ценового диапазона и «x» ближайших списков, основанных на близости, основанных на результатах определенного URL.