Python爬虫笔记
此文是本人当时学习Python爬虫时的学习资料的总结
作用是可以在自己写爬虫时可以起到回忆的作用
学习时看了B站上的相关教学
Requests库
小试牛刀
import requests
if __name__ == "__main__":
# 指定url
url = "http://www.sogou.com/"
# 发起请求
r = requests.get(url)
# 获取响应数据, 字符串
# print(r.text)
# 持久化存储
with open('./sogou.html', 'w', encoding="utf-8") as fp:
fp.write(r.text)
print("结束!")
项目:网页采集器
import requests
# UA伪装
# User-Agent()
if __name__ == "__main__":
url = "http://www.sogou.com/web?"
# 处理url携带的参数: 封装到字典
kw = input('Enter a word: ')
param = {
'query': kw
}
# 可以多找几个headers然后random着用
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
}
# 发起请求
r = requests.get(url=url, params=param, headers=header)
print(r.status_code)
page_text = r.text
fileName = kw + '.html'
with open(fileName, 'w', encoding='utf-8') as fp:
fp.write(page_text)
print(fileName, "保存成功!")
项目:百度翻译
import requests
import json
if __name__ == '__main__':
# headers
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
}
# url
url = 'https://fanyi.baidu.com/sug'
# param
word = input("请输入要查询的单词:")
data = {
'kw': word
}
# 发起请求
r = requests.post(url=url, data=data, headers=header)
# 获取响应数据 json()方法返回的是obj(如果确认响应数据是json类型的才可以使用json())
dic_obj = r.json()
print(dic_obj)
# 持久化存储
fp = open(word + '.json', 'w', encoding='utf-8')
json.dump(dic_obj, fp, ensure_ascii=False)
fp.close()
print("Over!")
项目:豆瓣
import requests
import json
if __name__ == '__main__':
# headers
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
}
# 获取url
url = 'https://movie.douban.com/j/chart/top_list'
# param
param = {
'type': '22',
'interval_id': '100:90',
'action': '',
'start': '0',
'limit': '20'
}
# 发起请求
r = requests.get(url=url, params=param, headers=header)
data = r.json()
with open('./豆瓣.json', 'w', encoding='utf-8') as fp:
json.dump(data, fp, ensure_ascii=False)
fp.close()
print("Over!!")
项目:肯德基
import requests
import json
if __name__ == '__main__':
# headers
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
}
# url
url = 'http://www.kfc.com.cn/kfccda/ashx/GetStoreList.ashx?op=keyword'
# param
param = {
'cname': '',
'pid': '',
'keyword': '南京',
'pageIndex': '1',
'pageSize': 10
}
# 发起请求
r = requests.post(url, param, headers = header)
text = r.json()
fp = open('./肯德基.json','w',encoding='utf-8')
json.dump(text,fp,ensure_ascii=False)
fp.close()
print('Over!')
项目:药监局
import requests
import json
if __name__ == '__main__':
# header
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
}
# url
url = 'http://scxk.nmpa.gov.cn:81/xk/itownet/portalAction.do?method=getXkzsList'
# param
param = {
'on': 'true',
'page': '1', # 页码
'pageSize': '15', # 一页数据
'productName': '',
'conditionType': '1',
'applyname': ',',
'applysn': ''
}
r = requests.post(url, param, headers=header)
text = r.json()
allData = []
fp = open('./药监局化妆品.json', 'a', encoding='utf-8')
for i in text["list"]:
tmpURL = 'http://scxk.nmpa.gov.cn:81/xk/itownet/portalAction.do?method=getXkzsById'
id = {
'id': i["ID"]
}
print(i["ID"])
tmpR = requests.post(tmpURL, params=id, headers=header)
allData.append(tmpR.json())
json.dump(allData, fp, ensure_ascii=False)
fp.close()
数据解析
正则表达式
import requests
import re
import os
# 要求: 爬取所有图片数据
if __name__ == '__main__':
# url
url = 'https://pvp.qq.com/web201605/herolist.shtml'
# headers
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
}
# 路径检查
if not os.path.exists("./2.1 爬取图片结果"):
os.mkdir("./2.1 爬取图片结果")
# 发起请求
r = requests.get(url, headers=header)
r.encoding = 'gbk'
# 用正则表达式获取图片链接列表
ex_URL = r'src="(.*?)" width="\d+"\s*height="\d+" alt=".{1,4}"'
ex_Name = r'src=".*?" width="\d+"\s*height="\d+" alt="(.*?)"'
img_list = re.findall(ex_URL, r.text, re.S)
name_list = re.findall(ex_Name, r.text, re.S)
for i in range(len(img_list)):
url = 'http:' + img_list[i]
name = "./2.1 爬取图片结果/" + name_list[i] + ".jpg"
# content返回的是二进制形式的图片数据
# text返回的是字符串形式的图片数据
# json()返回的是对象形式
# 爬取一个图片
img = requests.get(url, headers=header)
# 写入一个图片
img_file = open(name, "wb")
img_file.write(img.content)
print("第 " + str(i+1) + " 个英雄" + name_list[i] + "保存完毕!")
img_file.close()
bs4
# bs4 使用步骤
# 1. 实例化一个BeautifulSoup对象, 并将页面源码数据加载到该对象中
# 2. 通过调用BeautifulSoup对象中相关的属性或方法进行标签定位和数据提取
import requests
import lxml
from bs4 import BeautifulSoup
if __name__ == "__main__":
# 对象的实例化
# 1. 将本地html文档中的数据加载到该对象中
# fp = open('./test.html', 'r', encoding="utf-8")
# soup = BeautifulSoup(fp, 'lxml')
# 2. 将互联网上获取的页面源码加载到该对象中
# r = requests.get(url)
# soup = BeautifulSoup(r.text, 'lxml')
# header
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
}
# url
url = 'https://xy.lymtics.top/'
# 发起请求
r = requests.get(url, headers=header)
soup = BeautifulSoup(r.text, 'lxml')
# BeautifulSoup 属性与方法
# soup.tagName 返回文档中第一次出现的tag标签
print(soup.p)
# soup.find(tagName,[class_/id/attr=string]) 返回第一次出现的tag标签
print(soup.find('p'))
print(soup.find('p', id='lyrics_2'))
# soup.find_all(tagName,[class_/id/attr=string]) 找到符合条件的列表
print(soup.find_all('p'))
# soup.select('selector') 根据选择器返回列表(可以是id, class, 标签等等选择器
print(soup.select('#lyrics_2 > span')[0])
# 获取标签之间的文本数据
# 两个属性, 一个方法
print(soup.p.text)
print(soup.p.string) # 区别: 只能获取直系的文本内容
print(soup.p.get_text())
# 获取标签的属性值
print(soup.audio['src'])
# 项目:爬取小说所有章节
import requests
import os
from bs4 import BeautifulSoup
if __name__ == '__main__':
# header
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
}
# url
url = 'https://www.shicimingju.com/book/sanguoyanyi.html'
r = requests.get(url, headers=header)
r.encoding = r.apparent_encoding
soup = BeautifulSoup(r.text, 'lxml')
chapters = soup.find('div', class_='book-mulu').find_all("a")
# 创建目录
if not os.path.exists("./2.3 小说保存结果"):
os.mkdir("./2.3 小说保存结果")
for chapter in chapters:
url = 'http://www.shicimingju.com' + chapter["href"]
r = requests.get(url, headers= header)
r.encoding = r.apparent_encoding
Tmp_soup = BeautifulSoup(r.text, 'lxml')
text = Tmp_soup.select(".chapter_content")[0].text
# 打开文件, 写入内容
fileName = "./2.3 小说保存结果/" + chapter.text + '.txt'
with open(fileName, 'w', encoding="utf-8") as fp:
fp.write(text)
fp.close()
print(chapter.text + " 保存成功!")
print("-----!程序结束!-----")
xpath
import requests
from lxml import etree
if __name__ == '__main__':
# headers
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
}
# url
url = "http://xy.lymtics.top"
r = requests.get(url, headers=header)
# 实例化一个etree对象
# 1. 将本地的html文档中的源码数据加载到etree对象中:
# etree.parse(filePath)
# 2. 将互联网上的源码加载到该对象
t = etree.HTML(r.text)
# xpath表达式
# - /:表示的是从根节点开始定位; 或表示的是一个层级
# - //:表示多个层级; 或表示从任意位置开始定位
# - tag[@class='className']: 属性定位
# - 取文本: /text()
# - 取属性: /@attriName
print(t.xpath('html/div'))
print(t.xpath('//p[@id="lyrics_1"]'))
项目:58同城
import requests
from lxml import etree
if __name__ == '__main__':
# headers
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
}
# url
url = "https://nj.58.com/ershoufang/"
r = requests.get(url, headers=header)
tree = etree.HTML(r.text)
# 分别保存名称, 价格, 均价
name_list = tree.xpath("//h3[@class='property-content-title-name']/text()")
price_list = tree.xpath("//span[@class='property-price-total-num']/text()")
perPrice_list = tree.xpath("//p[@class='property-price-average']/text()")
fp = open("./58二手房南京.txt", 'w', encoding="utf-8")
for i in range(len(name_list)):
fp.write("¥" + price_list[i] + '万\t\t\t' + perPrice_list[i] + '\t\t\t' + name_list[i] + '\n')
fp.close()
项目:百度图片
import requests
import os
import re
from lxml import etree
if __name__ == '__main__':
# url
url = 'https://image.baidu.com/search/acjson'
# headers
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
}
# keyword
keyword = '张慧雯'
num = 30 # 需要的图片数量, 对应param中的pn参数, 建议是30的倍数
# param
param = {
'tn': 'resultjson_com',
'logid': '8816883031229968680',
'ipn': 'rj',
'ct': '201326592',
'is': '',
'fp': 'result',
'queryWord': keyword,
'cl': '2',
'lm': '-1',
'ie': 'utf-8',
'oe': 'utf-8',
'adpicid': '',
'st': '',
'z': '',
'ic': '',
'hd': '',
'latest': '',
'copyright': '',
'word': keyword,
's': '',
'se': '',
'tab': '',
'width': '',
'height': '',
'face': '',
'istype': '',
'qc': '',
'nc': '1',
'fr': '',
'expermode': '',
'force': '',
'pn': num, # 结束的位置, 每次ajax请求返回30个图片
'rn': '30', # 这个是开始的位置
'gsm': '3c',
'1611283749172': ''
}
r = requests.get(url, headers=header, params=param)
with open(keyword+"搜索结果.html", 'w', encoding='utf-8') as fp:
fp.write(r.text)
reg = r'thumbURL":"(.*?)"'
regName = r'"fromPageTitleEnc":"(.*?\s?.*?)"'
urlList = re.findall(reg, r.text, re.S)
nameList = re.findall(regName, r.text, re.S)
path = "./2.6" + keyword + " 搜索结果"
if not os.path.exists(path):
os.mkdir(path)
for i in range(30):
newURL = urlList[i]
newName = str(i+1) + nameList[i].replace('\\', '').replace('?', '').replace('/','').replace('/','').replace(':', '') + ".jpg"
print("第 " + str(i+1) + " 个")
print("目标: " + newName + "\nURL: " + newURL)
newResponse = requests.get(newURL, headers=header)
with open((path + "/" + newName), 'wb') as fp:
fp.write(newResponse.content)
fp.close()
print(" 保存完毕!")
模拟登陆
项目:古诗文网验证码识别
from chaojiying import Chaojiying_Client
import requests
from lxml import etree
if __name__ == '__main__':
# url
url = 'https://so.gushiwen.cn/user/login.aspx'
loginURL = 'https://so.gushiwen.cn/user/login.aspx?from=http%3a%2f%2fso.gushiwen.cn%2fuser%2fcollect.aspx'
# headers
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
}
r = requests.get(url, headers=header)
tree = etree.HTML(r.text)
# 找到验证码链接
imgURL = 'https://so.gushiwen.cn' + tree.xpath('//*[@id="imgCode"]/@src')[0]
# 获取图片内容并保存图片
img = requests.get(imgURL, headers=header).content
with open('./验证码.png', 'wb') as fp:
fp.write(img)
fp.close()
im = open('./验证码.png', 'rb').read()
# 调用平台提供的方法进行数据识别, 返回str
chaojiying = Chaojiying_Client('LymticS', ';cd{6VsX*UGBG#&2WDB`', '912043')
Vcode = chaojiying.PostPic(im, 1004)["pic_str"]
print("Pic_Str : " + Vcode)
# param
param = {
'__VIEWSTATE': 'WTzu914A3wdgjqmODej1C/6wxkP0vcqN6+eQCItA42mkUmkpcBBnTgcMFLhoEBM4dVSmwh8eV+sEpmw8t5nta1oupDDQKYF3HGtoKR17QxsXdmtAwvDdH6HL52Y=',
'__VIEWSTATEGENERATOR': 'C93BE1AE',
'from': 'http://so.gushiwen.cn/user/collect.aspx',
'email': 'DDD',
'pwd': '{JCM5g}Bb4l?"n>K',
'code': Vcode,
'denglu': '登录',
}
response = requests.post(loginURL, headers=header, params=param)
print("Status: " + str(response.status_code))
with open("古诗文网.html", 'w', encoding='utf-8') as fp2:
fp2.write(response.text)
fp2.close()
项目:校园网登录
import requests
if __name__ == '__main__':
# url
url = 'http://jwk.njfu.edu.cn/'
loginURL = 'http://jwk.njfu.edu.cn/_data/login_home.aspx'
# headers
header = {
'Host': 'jwk.njfu.edu.cn',
'Origin': 'http://jwk.njfu.edu.cn',
'Referer': 'http://jwk.njfu.edu.cn/_data/login_home.aspx',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
}
# param
data = {
'__VIEWSTATE': '/wEPDwUKLTMzNjY4NzgxOWRkvUsU20I2vYDlxpA1sjoWhQit5wI71Yw2NIm9hDi0zws=',
'__VIEWSTATEGENERATOR': '56911C19',
'__EVENTVALIDATION': '/wEdAAIRYxBzHPv4zphuJg7oAk9kZ5IuKWa4Qm28BhxLxh2oFLftNW2DMo/ERJBF+XkFQfVqp4AMzvkUCbvPwTpUGfFr',
'pcInfo': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.96 Safari/537.36 Edg/88.0.705.50undefined5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.96 Safari/537.36 Edg/88.0.705.50 SN:NULL',
'txt_mm_expression': '',
'txt_mm_length': '',
'txt_mm_userzh': '',
'typeName': '%D1%A7%C9%FA', # 解密后文字: 学生
'dsdsdsdsdxcxdfgfg': '537711E150923B6EAD19B2B6550EC3', # md5 混合加密1次
'fgfggfdgtyuuyyuuckjg': '',
'validcodestate': '0',
'Sel_Type': 'STU',
'txt_asmcdefsddsd': '19xx',
'txt_pewerwedsdfsdff': '',
'txt_psasas': '%C7%EB%CA%E4%C8%EB%C3%DC%C2%EB' # 解密后文字: 请输入密码
}
session = requests.Session()
print("Start")
r = session.post(loginURL, headers=header, data=data)
print(r.status_code)
with open("./校园网.html", 'w', encoding='utf-8') as fp:
fp.write(r.text)
fp.close()
项目:批改网登录 cookie
import requests
if __name__ == '__main__':
# url
url = 'http://www.pigai.org/index.php?a=login'
# headers
header = {
'Host': 'www.pigai.org',
'Origin': 'http://www.pigai.org',
'Referer': 'http://www.pigai.org/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
}
# param
data = {
'username': 'XXX',
'password': '',
'remember': '1',
'checkhash': '',
'password_encrypt': 'jMpxr5R5Vn1SbCxasdfhpIJPQbvNMykglZ0ASfq6CNYDs0P60u3SzNS/2osdMqw/nrGPruboiVX4PeQp0clQnFGte2zxfRve42+8AUkOL2ow06gkc6qJXy+ChRL6Vyp0ZFo0KSIC5fXV0uRHbfteUsgFqRNkxmBKZ/zPshAqepBI=',
}
session = requests.Session()
r =session.post(url, headers=header, data = data)
with open("./批改网登录.html", 'w', encoding="utf-8") as fp:
fp.write(r.text)
fp.close()
异步爬取
前置知识
协程
# 协程:
# 也被称为微线程, 是一种用户态内的上下文切换技术,简而言之, 就是通过一个线程实现代码块相互切换执行
# 实现协程有这么几种方法
# - greeniet 早期模块
# - yield 关键字
# - asyncio装饰器 (Python3.4)
# - 遇到 IO耗时操作 会自动切换到其他任务
# - async, awit关键字 (Python3.5)[推荐]
import asyncio
if __name__ == '__main__':
'''低情商做法:(Python3.8-) '''
# @asyncio.coroutine
# def func1():
# print(1)
# yield from asyncio.sleep(2) # 遇到IO耗时操作, 自动化切换到其他任务
# print(2)
'''高情商做法:(Python3.8+) '''
async def func1():
print(1)
await asyncio.sleep(2) # 遇到IO耗时操作, 自动化切换到其他任务
print(2)
async def func2():
print(3)
print(4)
tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
协程各功能介绍
import asyncio
if __name__ == '__main__':
'''协程函数async与协程对象'''
# 协程函数: 定义函数的时候async def 函数名
async def func():
pass
# 协程对象: 执行协程函数得到的协程对象
result = func() # 内部代码不会执行, 只会得到一个协程对象
# 任务列表
tasks = [
]
'''await关键字: 等后面的东西有结果了再继续往下走(此时可能会切换到其他任务继续执行)'''
# await + 可等待的对象(协程对象, Future, Task对象等)
# done, pending = await asyncio.wait(task_list, timeout = 2)
async def func2():
print("Hello")
response = await asyncio.sleep(2) # 在这儿会跳到其他函数, 不会继续执行
print("结束: " + response)
'''Task对象'''
# asyncio.create_task(协程对象) 创建task对象
# 更底层, 不建议用:
# loop.create_task()
# ensure_future()
'''Future对象'''
'''Python3.7-'''
# 生成一个事件循环
loop = asyncio.get_event_loop()
# 将任务放到人物列表
loop.run_until_complete(tasks)
'''Python3.7+(完成了上面的两个功能)'''
asyncio.run(tasks)
线程池的使用
from multiprocessing.dummy import Pool
import time
if __name__ == '__main__':
stime = time.time()
# 模拟阻塞任务
def A_Process(id):
print("开始执行 : " + str(id))
time.sleep(1)
print("执行结束 : " + str(id))
# 模拟任务列表
tasks = {1, 2, 3, 4, 5}
# 实例化进程池
pool = Pool(4)
# 将列表中每一个列表元素传递给第一个函数进行处理
pool.map(A_Process, tasks)
etime = time.time()
print("执行时间: " + str(etime - stime))
项目:爬取梨视频
常规方法:
import requests
from lxml import etree
import random
import re
import time
# 不使用多线程:
# 下载完成! 统计下载用时 : 35.81399393081665 秒
# 下载完成! 统计下载用时 : 18.53400182723999 秒
if __name__ == '__main__':
stime = time.time()
# url
url = 'https://www.pearvideo.com/popular_loading.jsp'
videoIdURL = 'https://www.pearvideo.com/'
getVideoURL = 'https://www.pearvideo.com/videoStatus.jsp'
# headers
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
}
# 保存id 之后还会将id替换为url
IdList = []
'''
获取视频页面
'''
# param
mrd = str(random.random())
param = {
'reqType': '1',
'categoryId': '',
'start': '10',
'sort': '0',
'mrd': mrd
}
r = requests.get(url, headers=header, params=param)
tree = etree.HTML(r.text)
videoList = tree.xpath('/html/body/li/a/@href')
'''
获取视频Id码
'''
for video in videoList:
video = videoIdURL + video
r = requests.get(video, headers=header)
# 获取contId
contId = re.findall(r'contId = "(.*?)"', r.text, re.S)[0]
name = re.findall(r'<h1 class="video-tt">(.*?)</h1>', r.text, re.S)[0]
dic = {
'id': contId,
'name': name
}
IdList.append(dic)
'''
获取视频链接
'''
for contId in IdList:
# header加固
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75',
'Host': 'www.pearvideo.com',
'Referer': 'https://www.pearvideo.com/video_' + str(contId["id"]),
}
session = requests.Session()
mrd = str(random.random())
param = {
'contId': contId["id"],
'mrd': mrd
}
r = session.get(getVideoURL, headers=header, params=param)
contId["link"] = r.json()["videoInfo"]["videos"]["srcUrl"]
contId["time"] = r.json()["systemTime"]
'''
处理url
'''
for i in IdList:
i["link"] = i["link"].replace(i['time'], 'cont-' + i['id'])
print(i)
'''
下载视频
'''
for link in IdList:
fileName = "./" + link["name"] + ".mp4"
print(link["link"])
r = requests.get(link["link"])
with open(fileName, 'wb') as fp:
fp.write(r.content)
etime = time.time()
print("下载完成! 统计下载用时 : {} 秒".format(etime - stime))
线程池:
from multiprocessing.dummy import Pool
import requests
from lxml import etree
import random
import re
import time
import os
# 使用多线程:
# 下载完成! 统计下载用时 : 4.989999771118164 秒
# 下载完成! 统计下载用时 : 5.061960697174072 秒
if __name__ == '__main__':
stime = time.time()
# url
url = 'https://www.pearvideo.com/popular_loading.jsp'
videoIdURL = 'https://www.pearvideo.com/'
getVideoURL = 'https://www.pearvideo.com/videoStatus.jsp'
# headers
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
}
# 保存id 之后还会将id替换为url
IdList = []
'''
获取视频页面
'''
# param
mrd = str(random.random())
param = {
'reqType': '1',
'categoryId': '',
'start': '10',
'sort': '0',
'mrd': mrd
}
r = requests.get(url, headers=header, params=param)
tree = etree.HTML(r.text)
videoList = tree.xpath('/html/body/li/a/@href')
'''
获取视频Id码
'''
for video in videoList:
video = videoIdURL + video
r = requests.get(video, headers=header)
# 获取contId
contId = re.findall(r'contId = "(.*?)"', r.text, re.S)[0]
name = re.findall(r'<h1 class="video-tt">(.*?)</h1>', r.text, re.S)[0]
dic = {
'id': contId,
'name': name
}
IdList.append(dic)
'''
获取视频链接
'''
for contId in IdList:
# header加固
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75',
'Host': 'www.pearvideo.com',
'Referer': 'https://www.pearvideo.com/video_' + str(contId["id"]),
}
session = requests.Session()
mrd = str(random.random())
param = {
'contId': contId["id"],
'mrd': mrd
}
r = session.get(getVideoURL, headers=header, params=param)
contId["link"] = r.json()["videoInfo"]["videos"]["srcUrl"]
contId["time"] = r.json()["systemTime"]
'''
处理url
'''
for i in IdList:
i["link"] = i["link"].replace(i['time'], 'cont-' + i['id'])
print(i)
'''
下载视频
'''
# 检查目录
if not os.path.exists("./4.3 梨视频结果"):
os.mkdir("./4.3 梨视频结果")
# for link in IdList:
def downLoadVideo(link):
fileName = "./4.3 梨视频结果/" + link["name"] + ".mp4"
print("开始下载 : " + link["name"])
r = requests.get(link["link"])
with open(fileName, 'wb') as fp:
fp.write(r.content)
print("下载完成 : " + link["name"])
pool = Pool(5)
pool.map(downLoadVideo, IdList)
etime = time.time()
print("下载完成! 统计下载用时 : {} 秒".format(etime - stime))
Selenium
项目:再见药监局
from selenium import webdriver
from lxml import etree
from time import sleep
# 实例化一个浏览器对象(传入浏览器驱动)
bro = webdriver.Edge(executable_path="./msedgedriver.exe")
# 打开一个网页
bro.get("http://scxk.nmpa.gov.cn:81/xk/")
# 获取当前页面源码数据
page_text = bro.page_source
# 解析企业名称
tree = etree.HTML(page_text)
list = tree.xpath(r"//*[@id='gzlist']/li/dl/a/text()")
for li in list:
print(li)
sleep(2)
bro.quit()
项目:淘宝
from selenium import webdriver
from time import sleep
bro = webdriver.Edge("./msedgedriver.exe")
bro.get("https://www.taobao.com/")
# 标签定位
search_input = bro.find_element_by_id('q')
# 标签交互
search_input.send_keys('Iphone')
# 执行一组JS代码
bro.execute_script('window.scrollTo(0, document.body.scrollHeight)')
sleep(2)
btn = bro.find_element_by_css_selector('.btn-search')
btn.click()
# 前进后退
bro.get('http://www.baidu.com')
sleep(2)
bro.back()
sleep(2)
bro.forward()
sleep(2)
sleep(2)
bro.quit()
处理iframe和动作链
from selenium import webdriver
from selenium.webdriver import ActionChains
from time import sleep
bro = webdriver.Edge("./msedgedriver.exe")
bro.get("https://www.runoob.com/try/try.php?filename=jqueryui-api-droppable")
# 如果定位的标签是存在于iframe标签中的则必须使用如下操作
bro.switch_to.frame('iframeResult') # 切换作用域
div = bro.find_element_by_id('draggable')
# 动作链
action = ActionChains(bro)
# 点击长按指定的标签
action.click_and_hold(div)
for i in range(4):
# perform()立即执行动作连操作
action.move_by_offset(17, 0).perform()
# 释放动作链
action.release().perform()
项目:QQ空间
from selenium import webdriver
from time import sleep
if __name__ == '__main__':
# 准备工作
bro = webdriver.Edge("./msedgedriver.exe")
bro.get("https://i.qq.com/")
# 切换到账号密码登录
bro.switch_to.frame("login_frame")
switch = bro.find_element_by_id("switcher_plogin")
switch.click()
# 完成账号密码登录
userName = bro.find_element_by_id('u')
passWord = bro.find_element_by_id('p')
userName.send_keys('XXXXX')
passWord.send_keys('')
btn = bro.find_element_by_id('login_button')
btn.click()
sleep(2)
# 切换到留言板
bro.get("https://user.qzone.qq.com/1665650743/334")
# 按页面爬取全部
项目:12306
# 主要是解决验证码的问题
from selenium import webdriver
from selenium.webdriver import ActionChains
from time import sleep
from PIL import Image
from chaojiying import Chaojiying_Client
if __name__ == '__main__':
# 准备工作
bro = webdriver.Edge("./msedgedriver.exe")
bro.get("https://kyfw.12306.cn/otn/resources/login.html")
sleep(1)
bro.find_element_by_xpath(r'/html/body/div[2]/div[2]/ul/li[2]/a').click()
sleep(1)
'''截图'''
# 截图
bro.save_screenshot("./12306.png")
# 获取验证码坐标
codeImgElem = bro.find_element_by_xpath(r'//*[@id="J-loginImg"]')
codeImgLT = codeImgElem.location # 左上角坐标
print(codeImgLT)
size = codeImgElem.size # 验证码的大小
print(size)
rangle = (
codeImgLT['x'], codeImgLT['y'],
codeImgLT['x'] + size['width'],
codeImgLT['y'] + size['height']
)
# 剪切图片
img = Image.open('./12306.png')
imgName = './12306Code.png'
frame = img.crop(rangle)
frame.save(imgName)
'''上传到超级鹰并获取图片信息'''
chaojiying = Chaojiying_Client('XXXX', ';XXXX`', 'XXXX')
im = open('D:\\CodeLib\\Py\\ScratchTheWeb\\5. Selenium模块\\12306Code.png', 'rb').read()
Result = chaojiying.PostPic(im, 9004)
print(Result)
Result = Result["pic_str"].split('|')
allLocate = []
for i in Result:
print("(" + i + ")")
x = i.split(',')[0]
y = i.split(',')[1]
allLocate.append({
'x': x,
'y': y
})
'''在网页上点击'''
for locate in allLocate:
ActionChains(bro).move_to_element_with_offset(codeImgElem, int(locate['x']), int(locate['y'])).click().perform()
sleep(2)
Scrapy
太过专业,不用上,故略
一些小项目
单词爬虫
刚学爬虫编写的,很菜很菜
import requests
import lxml
from bs4 import BeautifulSoup
import time
kv = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36 Edg/83.0.478.56'}
t = time.gmtime()
all_file = open("WordList.txt", "r", encoding="utf-8")
Date_file = open("DateTemp.txt", "r+", encoding="utf-8")
times = eval(Date_file.read().strip("\n"))
Date_file.seek(0)
Date_file.write(str(times+1))
Date_file.close()
new_file = open("EveryDayWord-{}.md".format(time.strftime("%Y.%m.%d", t)), "w", encoding="utf-8")
def initial():
global t
new_file.write("---\n")
new_file.write("title: EveryDayWord-" + time.strftime("%Y.%m.%d", t)+"\n")
new_file.write("tag: [English, 每日必做]\n")
new_file.write("categories: 英语每日单词\n")
new_file.write("hide: index\n")
new_file.write("---\n\n")
new_file.write("<br>\n\n")
new_file.write("<center style=\"font-size:200% \"><b>加油! 这是坚持的第<span style=\"color:#B5D46E\"> {} </span>天</b></center>\n\n".format(times))
new_file.write("<br>\n")
class Word:
def __init__(self, value):
self.value = value.strip('\n')
def OnVocabulary(self):
url = "https://www.vocabulary.com/dictionary/" + self.value
content = requests.get(url, headers = kv)
soup = BeautifulSoup(content.text, 'lxml')
txt = str(soup.select('.short')[0])
new_file.write(">\n")
new_file.write(">" + txt+"\n")
new_file.write("\n"+"<br>"+"\n")
new_file.write("\n")
def OnNetEase(self):
url = "http://dict.youdao.com/w/" + self.value
content = requests.get(url, headers=kv)
soup = BeautifulSoup(content.text, 'lxml')
# 这里是给他加音标的
txt_list = soup.select(".pronounce")
for i in txt_list:
new_file.write(" {}: ".format(i.contents[0][0]))
new_file.write(i.span.string + ' ')
new_file.write("\n")
# 这里是加汉语意思的
txt_list = soup.select('.trans-container')[0].ul.contents
for i in txt_list:
if i != '\n':
txt = i.string.split('.')
new_file.write(">\n")
new_file.write("><span style=\"color:red\"><b>"+txt[0]+". </b></span>"+txt[1]+"\n")
new_file.write(">\n")
# 初始化开头
print("开始初始化文章结构")
initial()
# 处理Word部分
new_file.write("\n")
new_file.write("## Some Words\n\n")
All_Words = all_file.readlines()
for i in All_Words:
if i == '\n':
break
print("正在处理单词: "+i.strip('\n'))
new_file.write(">**"+i.strip('\n')+"**")
a = Word(i)
print("开始从有道词典获取资源")
try:
a.OnNetEase()
except:
print("无法查到单词: "+i)
print("开始从Vocabulay.com获取资源")
try:
a.OnVocabulary()
except:
print("无法从Vocabulay.com查到单词: "+i)
new_file.write("\n<br>\n\n")
# print("开始从有道词典获取资源(例句)")
# a.OnNetEase()
new_file.write("<br>\n")
new_file.write("<center><i>释义, 例句等可能参考包括但不限于有道翻译,百度翻译等工具</i></center>\n")
new_file.write("<br>\n")
new_file.write("<center><i>特别推荐: <a href=\"http://www.vocabulary.com\">Vocabulary.com</a> 本文的英文释义选自于该网站</i></center>\n")
all_file.close()
new_file.close()
自动发邮件
似乎没用上爬虫。。。不过是那会儿写的
# coding=utf-8
import smtplib
from email.mime.text import MIMEText
import time
class SendQQEmail:
# 创建一个类, 需要用户名和密码
def __init__(self, send_email, send_pwd):
self.send_email = send_email
self.send_pwd = send_pwd
# 该方法用于向目标发送邮件(需要: 收信地址, 主题, 内容)
def sendToEmail(self, re_email, subject, content):
msg = MIMEText(content)
msg['Subject'] = subject
msg['From'] = self.send_email
msg['To'] = re_email
try:
server = smtplib.SMTP_SSL("smtp.qq.com", 465)
server.login(self.send_email, self.send_pwd)
server.sendmail(self.send_email, re_email, msg.as_string())
self.LOG("Successfully Sent!", subject, content, e=None)
except Exception as e:
self.LOG("Failed while Sending!", subject, content, e)
pass
# 日志文件, 收看发送的内容, 状况以及报错信息等
def LOG(self, logMsg, subject, content, e):
with open('./msg.log', 'a', encoding='utf-8') as fp:
fp.write("[{}]\t{}\n".format(time.asctime(time.localtime(time.time())), logMsg))
if e:
fp.write("<ERROR> {}\n".format(e))
fp.write("#{}#\n".format(subject))
fp.write("{}\n\n".format(content))
if __name__ == '__main__':
user = SendQQEmail('XXX@qq.com', 'Password')
subject = "Python默认邮件"
content = "我使用Python发送了一封默认邮件"
user.sendToEmail('XXXX@qq.com', subject, content)
抢座位
有日志。为了隐私把学校网址和配置文件去掉了!
# Preference 需要自行填写座位ID
import requests
import json
import datetime
import time
import logging
LOGIN_API = "http://XXX.edu.cn/ClientWeb/pro/ajax/login.aspx"
ORDER_API = "http://XXX.edu.cn/ClientWeb/pro/ajax/reserve.aspx"
CANCEL_API = ""
HEADER = {
"User-Agent": "Mozilla/5.0 (Linux; Android 6.0.1; Moto G (4)) \
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.55\
Mobile Safari/537.36 Edg/96.0.1054.34"
}
# CONFIG = {}
# 登录,获取携带了Cookie的Session
def login():
LOGIN_DATA = {
"act" : "login",
"id" : CONFIG["userInfo"]["id"],
"pwd": CONFIG["userInfo"]["password"],
"role": "512",
}
try:
session = requests.Session()
result = session.get(LOGIN_API, headers=HEADER, data=LOGIN_DATA)
jsonData = json.loads(result.text)
if jsonData["msg"] != "ok":
raise Exception(jsonData["msg"])
logging.info(">>>>>LOGIN: 登录成功!")
logging.info("name: " + jsonData["data"]["name"] + "\t" + "id: " + jsonData["data"]["id"])
except Exception as err:
logging.error(">>>>>登录失败!\t msg: " + str(err))
exit()
return session
# 预订指定的位置
def order(session, start, end, seat):
ORDER_DATA = {
"act": "set_resv",
"dev_id": seat,
"start": start,
"end": end,
}
try:
result = session.get(ORDER_API, headers=HEADER, data=ORDER_DATA)
jsonData = json.loads(result.text)
if jsonData["msg"] != "操作成功!":
raise Exception(jsonData["msg"])
logging.info(">>>>>ORDER: 操作成功! Congratulations!")
logging.info(" seatId: " + seat)
except Exception as err:
logging.debug(">>>>>操作失败!\tseatId: " + seatId + " msg: " + str(err))
return False
return True
# 初始化
def init():
with open('./settings.json', 'r', encoding='utf-8') as fp:
jsonData = json.load(fp)
LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
# logging.basicConfig(filename="log.txt", level=logging.DEBUG, format=LOG_FORMAT, filemode="a")
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
return jsonData
if __name__ == '__main__':
try:
CONFIG = init()
session = login()
seatList = CONFIG["limit"]["preference"]
tomorrow = datetime.datetime.now() + datetime.timedelta(days = 1)
startTime = str(tomorrow).split(" ")[0] + " " + CONFIG["limit"]["startTime"]
endTime = str(tomorrow).split(" ")[0] + " " + CONFIG["limit"]["endTime"]
timeInfo = time.localtime(time.time())
todayTime = time.mktime((timeInfo.tm_year, timeInfo.tm_mon, timeInfo.tm_mday, 7, 0, 0, 0, 0, 0))
while time.time() < todayTime:
logging.info(str(datetime.datetime.now()) + "\n>>>>>WAITING")
time.sleep(0.75)
# 遍历preference级别的预选座位
result = False;
for i in range(2):
logging.info(">>>>>工作中\t第" + str(1 + i) + "次...")
for seatId in seatList:
result = order(session, startTime, endTime, seatId)
if result == True:
break;
time.sleep(0.3)
if result == True:
break;
# 遍历acceptable级别预选的范围(TODO)
logging.info(">>>>>Finished")
except Exception as e:
logging.error(">>>>>未知异常!可能是配置文件的格式错误!\t msg: " + str(e))
评论区