from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.sup... ...
from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.options import Options from selenium.common.exceptions import NoSuchElementException from lxml import etree import time, json JD_URL_Login = "https://www.jd.com/" class CustomizeException(Exception): def __init__(self, status, msg): self.status = status self.msg = msg class JD: def __init__(self): self.browser = None self.__init_browser() def __init_browser(self): options = Options() options.add_argument("--headless") options.add_experimental_option('excludeSwitches', ['enable-automation']) # 設置為無圖模式 options.add_experimental_option("prefs", {"profile.managed_default_content_settings.images": 2}) self.browser = webdriver.Chrome(options=options) # 設置瀏覽器最大化視窗 self.browser.maximize_window() # 隱式等待時間為3s self.browser.implicitly_wait(3) self.browser.get(JD_URL_Login) self.wait = WebDriverWait(self.browser, 10) def __search_goods(self, goods): '''搜索商品的方法''' self.file = open("jd-{}.json".format(goods), "a", encoding="utf-8") self.wait.until(EC.presence_of_all_elements_located((By.ID, "key"))) serach_input = self.browser.find_element_by_id("key") serach_input.clear() serach_input.send_keys(goods, Keys.ENTER) def __get_goods_info(self, page_source): '''從網頁源碼中獲取到想要的數據''' selector_html = etree.HTML(page_source) # 商品名字 不要獲取title屬性,以後再改吧,最好是獲取到商品名的文本內容 goods_name = selector_html.xpath("//div[@class='gl-i-wrap']//div[contains(@class,'p-name')]/a/@title") # 商品價格 goods_price = selector_html.xpath("//div[@class='gl-i-wrap']//div[@class='p-price']/strong/i/text()") # 商品評價數量 comment_num_selector = selector_html.xpath("//div[@class='p-commit']/strong") comment_num = [selector.xpath("string(.)") for selector in comment_num_selector] # 商品店鋪 shop_name = selector_html.xpath("//a[@class='curr-shop']/text()") goods_zip = zip(goods_name, goods_price, comment_num, shop_name) for goods_info in goods_zip: dic = {} dic["goods_name"] = goods_info[0] dic["goods_price"] = goods_info[1] dic["comment_num"] = goods_info[2] dic["shop_name"] = goods_info[3] # print("商品名字>>:", goods_info[0]) # print("商品價格>>:", goods_info[1]) # print("商品評價數量>>:", goods_info[2]) # print("商品店鋪>>:", goods_info[3]) # print("*" * 100) yield dic def __swipe_page(self): '''上下滑動頁面,將完整的網頁源碼返回''' height = self.browser.execute_script("return document.body.scrollHeight;") js = "window.scrollTo(0, {});".format(height) self.browser.execute_script(js) while True: time.sleep(1) now_height = self.browser.execute_script("return document.body.scrollHeight;") if height == now_height: return self.browser.page_source js = "window.scrollTo({}, {});".format(height, now_height) self.browser.execute_script(js) height = now_height def __is_element_exists(self, xpath): '''檢測一個xpath是否能夠找到''' try: self.browser.find_element_by_xpath(xpath=xpath) return True except NoSuchElementException: return False def __click_next_page(self): '''點擊下一頁,實現翻頁功能''' self.wait.until(EC.presence_of_all_elements_located((By.CLASS_NAME, "pn-next"))) xpath = "//a[@class='pn-next']" if not self.__is_element_exists(xpath): raise CustomizeException(10000, "該商品訪問完畢") self.browser.find_element_by_xpath(xpath).click() def __write_to_json(self, dic: dict): data_json = json.dumps(dic, ensure_ascii=False) self.file.write(data_json + "\n") def run(self, goods): self.__search_goods(goods) n = 1 while True: print("正在爬取商品 <{}>---第{}頁......".format(goods, n)) time.sleep(3) html = self.__swipe_page() for dic in self.__get_goods_info(html): self.__write_to_json(dic) try: self.__click_next_page() except CustomizeException: try: goods = goods_list.pop(0) self.run(goods) except IndexError: return n += 1 def __del__(self): self.browser.close() self.file.close() if __name__ == '__main__': jd = JD() goods_list = ["純牛奶", "酸奶", "奶茶", "床上用品", "電磁爐", "電視", "小米筆記本", "華碩筆記本", "聯想筆記本", "男士洗面奶", "女士洗面奶", "沐浴露", "洗髮露", "牙刷", "牙膏", "拖鞋", "剃鬚刀", "水手服", "運動服", "紅龍果", "蘋果", "香蕉", "洗衣液", "電飯煲"] try: goods = goods_list.pop(0) except IndexError: raise CustomizeException(20000, "goods_list不能為空") try: jd.run(goods) finally: del jd