from urllib.parse import quote import time import re import operator from operator import itemgetter from urllib.parse import quote import yaml import requests import json from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.edge.options import Options class Products: data = [] # Список, в котором будут словари def read_yaml_file(self, filename): """Read the data from a YAML file and return a list of dictionaries""" with open(filename, 'r') as file: self.data = yaml.load(file, Loader=yaml.FullLoader) return def write_yaml_file(self, filename): """Write the data in YAML format to a file""" with open(filename, 'w') as file: yaml.dump(self.data, file) def sortProducts(self,criteria: str,order :bool=False): """Sort list of products by given criteria: \n price - price of a product \n rating - total rating of a seller \n sold - the amount of items sold \n\n order - asc=0, desc=1 """ self.data.sort(key=operator.itemgetter(criteria),reverse=order) def printData(self): for item in self.data: print(item['name']+'\t'+item['link'] + '\n'+str(item['price'])+" "+str(item['rating'])+" "+str(item['sold'])+'\n\n') def ParsePage(self, query): """Find all elements on page and store them into the dictionary \n use parseAPI function instead if possible """ self.data=[] options = Options() options.add_argument("headless")# Показывать ли окно браузера browser = webdriver.Edge( executable_path='msedgedriver.exe', options=options) q=quote(query) url = f"https://plati.market/search/{q}" browser.get(url) time.sleep(1) #Searching through website while True: try: browser.find_element(By.ID, 'gdpr_accept_button').click()#Accept cookies if present, DO NOT REMOVE except: pass try: # :Список всех блоков с товаром allBlocks = browser.find_elements(By.CSS_SELECTOR, 'li.shadow') pageNumber = int(browser.find_elements(By.CSS_SELECTOR, 'a.active')[ 1].text) # :Номер текущей страницы except: print("Результаты не найдены") break for block in allBlocks: BlockTitle = block.find_element( By.TAG_NAME, 'h1') # """ Часть блока с ценой и названием""" BlockName = BlockTitle.find_element( By.TAG_NAME, 'a') # """Название блока""" BlockLink = BlockName.get_attribute( 'href') # """Ссылка на продукт""" BlockPrice = BlockTitle.find_element(By.TAG_NAME, 'span') RubPrice = re.search(" [0-9]+ ", BlockPrice.text) RubPrice = int(RubPrice.group(0)) BlockInfo = block.find_elements(By.TAG_NAME,"strong")#all additional information BlockRating= float(BlockInfo[0].text.replace(',','.'))#sellers rating try: BlockSold = int(BlockInfo[1].text.replace('>',''))#Total amount sold except: BlockSold=0#На случай если у блока нет информации pass self.data.append( {'name': BlockName.text, 'link': BlockLink, 'price': RubPrice,'rating':BlockRating,'sold':BlockSold}) try: browser.find_element(By.LINK_TEXT, str(pageNumber+1)).click()# Переход на следующую страницу except: break def parseAPI(self,query): """Find all elements on page and store them into the dictionary using plati.ru API""" self.data=[] """pagesize should be less than 500""" pagesize=499 contents=requests.get(f"https://plati.io/api/search.ashx?query={query}&pagesize={pagesize}&visibleOnly=true&response=json").json() total_pages=int(contents['Totalpages']) for entry in contents['items']: self.data.append( {'name': entry['name'], 'link': entry['url'], 'price': int(entry['price_rur']),'rating':float(entry['seller_rating']),'sold':int(entry['numsold'])}) if (total_pages>1): for i in range(2,total_pages+1): contents=requests.get(f"https://plati.io/api/search.ashx?query={query}&pagesize={pagesize}&pagenum={i}&visibleOnly=true&response=json").json() for entry in contents['items']: self.data.append( {'name': entry['name'], 'link': entry['url'], 'price': int(entry['price_rur']),'rating':float(entry['seller_rating']),'sold':int(entry['numsold'])})