100字范文,内容丰富有趣,生活中的好帮手!
100字范文 > python实现网页爬虫_python实现页面爬虫(selenium pyppeteer)

python实现网页爬虫_python实现页面爬虫(selenium pyppeteer)

时间:2018-08-08 14:19:01

相关推荐

python实现网页爬虫_python实现页面爬虫(selenium pyppeteer)

获取百度搜索的url

方法一:使用selenium

技术点:selenium+requests+pandas

import time

import asyncio

from selenium import webdriver

from selenium.webdriver.chrome.options import Options

from lxml import etree

import time

from mon.keys import Keys

import requests

import pandas as pd

import random

from user_agent import user_agent

headers = {

'user-agent': random.choice(user_agent),

'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',

'Accept-Encoding': 'gzip, deflate',

}

def filter_url(url_list):

new_list = []

for url in url_list:

baidu_url = requests.get(url=url, headers=headers, allow_redirects=False)

real_url = baidu_url.headers['Location'] # 得到网页真正地址

if real_url.startswith('https'):

new_list.append(real_url)

return new_list

def get_url(html):

url_list=html.xpath('//*[@id]/h3/a/@href')

header_url_list=html.xpath('//*[@id]/div/article/section/div/div[2]/a[1]/@href')

return header_url_list,url_list

def get_html(word):

chromedriver = r'D:\SoftWare\chromedriver.exe'

chrome_options = Options()

chrome_options.add_argument('--headless')

driver = webdriver.Chrome(chromedriver, chrome_options=chrome_options)

driver.get("")

time.sleep(2)

driver.find_element_by_id('kw').send_keys(word)

driver.find_element_by_id('kw').send_keys(Keys.ENTER)

time.sleep(2)

page_html = driver.page_source

html = etree.HTML(page_html)

driver.close()

return html

def main(line):

line = line.strip().split('\t')

word = line[0]

html = get_html(word)

header_url_list, url_list = get_url(html)

header_url_list = filter_url(header_url_list)

url_list = filter_url(url_list)

if header_url_list:

header_url_list.extend(url_list)

header_url_list.insert(0, line[0])

return header_url_list

else:

url_list.insert(0, line[0])

return url_list

if __name__ == '__main__':

lines = ["毛不易新歌","火箭少女101 炙热的我们 团王"]

write_list = []

write_columns = ["关键词", "URL1", "URL2", "URL3", "URL4", "URL5", "URL6", "URL7", "URL8", "URL9", "URL10"]

for line in lines:

url_list = main(line)

write_list.append(url_list)

dt = pd.DataFrame(write_list, columns=write_columns)

dt.to_excel('res.xlsx', index=None)

结果:每一条搜索执行时间大约10秒

优化 方法二:使用协程+Pyppeteer

安装

pip install pyppeteer

pyppeteer-install

使用:

import time

import requests

import aiohttp

import asyncio

from pyppeteer import launch

from bs4 import BeautifulSoup

import random

from user_agent import user_agent

headers = {

'user-agent': random.choice(user_agent),

'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',

'Accept-Encoding': 'gzip, deflate',

}

async def get_html():

'''

fun: 生成浏览器对象,独立出来避免每次搜索都打开一次浏览器

'''

browser = await launch({"dumpio":True,"userDataDir": "./userdata",'args': [

'--disable-extensions',

'--hide-scrollbars',

"--disable-infobars",

'--disable-bundled-ppapi-flash',

'--mute-audio',

'--no-sandbox',

'--disable-setuid-sandbox',

'--disable-gpu',

],})

return browser

async def close_page(browser):

# 结束后关闭浏览器

await browser.close()

async def get_url(browser, word):

# 每次搜索打开一个窗口,结束后关闭

page = await browser.newPage()

await page.evaluateOnNewDocument('() =>{ Object.defineProperties(navigator,'

'{ webdriver:{ get: () => false } }) }')

# 使用goto打开网页,timeout默认为30秒

await page.goto("",{'timeout':1000*60})

# 页面等待1秒

await page.waitFor(1000)

# type方法输入框输入值,第一个参数为选择器,第二个参数为输入值

await page.type('#kw', word)

# 点击搜索按钮

await page.click('#form > span.bg.s_btn_wr')

await page.waitFor(3000)

page_html = await page.content()

html = etree.HTML(page_html)

await page.close()

return html

def get_html_url(html):

url_list=html.xpath('//*[@id]/h3/a/@href')

header_url_list=html.xpath('//*[@id]/div/article/section/div/div[2]/a[1]/@href')

return header_url_list,url_list

async def filter_url(url_list):

'''

fun: 获取真正的url,而不是link url

'''

new_list = []

try:

for url in url_list:

baidu_url = requests.get(url=url, headers=headers, allow_redirects=False)

real_url = baidu_url.headers['Location'] # 得到网页原始地址

if real_url.startswith('https'):

new_list.append(real_url)

except Exception as e:

print(e.args)

return new_list

def get_hanzi_count(word):

'''

fun: 判断字符串中汉字的个数,一个汉字占两个字节

'''

count = 0

for s in word:

if '\u4e00' <= s <= '\u9fff':

count += 1

return count

async def main(browser, line):

line = line.strip().split('\t')

word = line[0]

# 如果搜索文字超过百度最长搜索限制,进行切割

if len(word.encode("gbk")) > 76:

c = get_hanzi_count(word)

word = word[:76-c]

html = await get_url(browser,word)

header_url_list, url_list = get_html_url(html)

header_url_list = await filter_url(header_url_list)

url_list = await filter_url(url_list)

if header_url_list:

header_url_list.extend(url_list)

header_url_list.insert(0, word)

return "\t".join(header_url_list)

else:

url_list.insert(0, word)

return "\t".join(url_list)

def write_excel(path,words):

with open(path, "a", encoding="utf-8") as file:

file.write("\n".join(words))

file.write("\n")

def read_text(path):

with open(path, encoding="utf-8") as file:

infos = file.readlines()

return [info.split("\t")[0] for info in infos if info!="" and info!=" " and info!="\n"]

if __name__ == '__main__':

start = time.time()

lines = read_text(r"D:\document\test")

path = r"D:\document\test.csv"

new_loop = asyncio.new_event_loop()

asyncio.set_event_loop(new_loop)

loop = asyncio.get_event_loop()

browser = loop.run_until_complete(get_html())

current = 0

max_worker = 20 # 设置一次执行多少条

while True:

cors = []

count = 0

new_current = current + max_worker

for line in lines:

cors.append(main(browser, line))

count += 1

if count > max_worker:

# result

result = loop.run_until_complete(asyncio.gather(*cors))

write_excel(path,result)

count = 0

cors = []

else:

result = loop.run_until_complete(asyncio.gather(*cors))

write_excel(path,result)

break

loop.run_until_complete(close_page(browser))

loop.close()

print("finished>>>>",time.time()-start)

结果:搜索三百条耗时三百多秒

注意事项:

launch函数要加{"dumpio":True},否则页面打开太多会卡

launch({"dumpio":True,"userDataDir": "./userdata",'args': [

'--disable-extensions',

'--hide-scrollbars',

"--disable-infobars", # 不显示浏览器正在受到自动软件控制

'--disable-bundled-ppapi-flash',

'--mute-audio',

'--no-sandbox',

'--disable-setuid-sandbox',

'--disable-gpu',

],})

初始化生成一个browser对象,后面使用这个对象的newPage方法创建新建窗口,操作完成后关闭窗口,避免出现页面卡住情况

等待不能用time.sleep(),使用time.sleep()会报超时错误

按钮点击页面跳转后必须要page.waitFor(1000), 1000代表1秒, 使用waitForXPath,waitForSelector以及waitForNavigation都会出现一直等待直到超时的情况,后面会继续再探究为什么,有懂的大牛可以指点迷津

来源:/Victor-ZH/p/13306647.html

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。