侧边栏壁纸
博主头像
LYMTICS

海纳百川,有容乃大

  • 累计撰写 45 篇文章
  • 累计创建 37 个标签
  • 累计收到 19 条评论

目 录CONTENT

文章目录

Python爬虫笔记

LYMTICS
2021-02-28 / 0 评论 / 0 点赞 / 70 阅读 / 29,996 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-02-02,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

Python爬虫笔记

此文是本人当时学习Python爬虫时的学习资料的总结

作用是可以在自己写爬虫时可以起到回忆的作用

学习时看了B站上的相关教学

Requests库

小试牛刀

import requests

if __name__ == "__main__":
    # 指定url
    url = "http://www.sogou.com/"
    # 发起请求
    r = requests.get(url)
    # 获取响应数据, 字符串
    # print(r.text)
    # 持久化存储
    with open('./sogou.html', 'w', encoding="utf-8") as fp:
        fp.write(r.text)
    print("结束!")

项目:网页采集器

import requests

# UA伪装
# User-Agent()
if __name__ == "__main__":
    url = "http://www.sogou.com/web?"
    # 处理url携带的参数: 封装到字典
    kw = input('Enter a word: ')
    param = {
        'query': kw
    }
    # 可以多找几个headers然后random着用
    header = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                      'AppleWebKit/537.36 (KHTML, like Gecko) '
                      'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
    }
    # 发起请求
    r = requests.get(url=url, params=param, headers=header)
    print(r.status_code)
    page_text = r.text

    fileName = kw + '.html'
    with open(fileName, 'w', encoding='utf-8') as fp:
        fp.write(page_text)
    print(fileName, "保存成功!")

项目:百度翻译

import requests
import json

if __name__ == '__main__':
    # headers
    header = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                      'AppleWebKit/537.36 (KHTML, like Gecko) '
                      'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
    }
    # url
    url = 'https://fanyi.baidu.com/sug'
    # param
    word = input("请输入要查询的单词:")
    data = {
        'kw': word
    }

    # 发起请求
    r = requests.post(url=url, data=data, headers=header)
    # 获取响应数据 json()方法返回的是obj(如果确认响应数据是json类型的才可以使用json())
    dic_obj = r.json()
    print(dic_obj)

    # 持久化存储
    fp = open(word + '.json', 'w', encoding='utf-8')
    json.dump(dic_obj, fp, ensure_ascii=False)

    fp.close()
    print("Over!")

项目:豆瓣

import requests
import json

if __name__ == '__main__':
    # headers
    header = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                      'AppleWebKit/537.36 (KHTML, like Gecko) '
                      'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
    }
    # 获取url
    url = 'https://movie.douban.com/j/chart/top_list'
    # param
    param = {
        'type': '22',
        'interval_id': '100:90',
        'action': '',
        'start': '0',
        'limit': '20'
    }
    # 发起请求
    r = requests.get(url=url, params=param, headers=header)
    data = r.json()
    with open('./豆瓣.json', 'w', encoding='utf-8') as fp:
        json.dump(data, fp, ensure_ascii=False)
        fp.close()
    print("Over!!")

项目:肯德基

import requests
import json

if __name__ == '__main__':
    # headers
    header = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                      'AppleWebKit/537.36 (KHTML, like Gecko) '
                      'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
    }
    # url
    url = 'http://www.kfc.com.cn/kfccda/ashx/GetStoreList.ashx?op=keyword'
    # param
    param = {
        'cname': '',
        'pid': '',
        'keyword': '南京',
        'pageIndex': '1',
        'pageSize': 10
    }
    # 发起请求
    r = requests.post(url, param, headers = header)
    text = r.json()

    fp = open('./肯德基.json','w',encoding='utf-8')
    json.dump(text,fp,ensure_ascii=False)
    fp.close()
    print('Over!')

项目:药监局

import requests
import json

if __name__ == '__main__':
    # header
    header = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                      'AppleWebKit/537.36 (KHTML, like Gecko) '
                      'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
    }
    # url
    url = 'http://scxk.nmpa.gov.cn:81/xk/itownet/portalAction.do?method=getXkzsList'
    # param
    param = {
        'on': 'true',
        'page': '1',  # 页码
        'pageSize': '15',  # 一页数据
        'productName': '',
        'conditionType': '1',
        'applyname': ',',
        'applysn': ''
    }

    r = requests.post(url, param, headers=header)
    text = r.json()
    allData = []
    fp = open('./药监局化妆品.json', 'a', encoding='utf-8')
    for i in text["list"]:
        tmpURL = 'http://scxk.nmpa.gov.cn:81/xk/itownet/portalAction.do?method=getXkzsById'
        id = {
            'id': i["ID"]
        }
        print(i["ID"])
        tmpR = requests.post(tmpURL, params=id, headers=header)
        allData.append(tmpR.json())
    json.dump(allData, fp, ensure_ascii=False)
    fp.close()

数据解析

正则表达式

import requests
import re
import os

# 要求: 爬取所有图片数据

if __name__ == '__main__':
    # url
    url = 'https://pvp.qq.com/web201605/herolist.shtml'
    # headers
    header = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                      'AppleWebKit/537.36 (KHTML, like Gecko) '
                      'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
    }
    # 路径检查
    if not os.path.exists("./2.1 爬取图片结果"):
        os.mkdir("./2.1 爬取图片结果")

    # 发起请求
    r = requests.get(url, headers=header)
    r.encoding = 'gbk'

    # 用正则表达式获取图片链接列表
    ex_URL = r'src="(.*?)" width="\d+"\s*height="\d+" alt=".{1,4}"'
    ex_Name = r'src=".*?" width="\d+"\s*height="\d+" alt="(.*?)"'
    img_list = re.findall(ex_URL, r.text, re.S)
    name_list = re.findall(ex_Name, r.text, re.S)
    for i in range(len(img_list)):
        url = 'http:' + img_list[i]
        name = "./2.1 爬取图片结果/" + name_list[i] + ".jpg"
        # content返回的是二进制形式的图片数据
        # text返回的是字符串形式的图片数据
        # json()返回的是对象形式

        # 爬取一个图片
        img = requests.get(url, headers=header)
        # 写入一个图片
        img_file = open(name, "wb")
        img_file.write(img.content)
        print("第 " + str(i+1) + " 个英雄" + name_list[i] + "保存完毕!")
        img_file.close()

bs4

# bs4 使用步骤
# 1. 实例化一个BeautifulSoup对象, 并将页面源码数据加载到该对象中
# 2. 通过调用BeautifulSoup对象中相关的属性或方法进行标签定位和数据提取

import requests
import lxml
from bs4 import BeautifulSoup

if __name__ == "__main__":
    # 对象的实例化
    # 1. 将本地html文档中的数据加载到该对象中
    # fp = open('./test.html', 'r', encoding="utf-8")
    # soup = BeautifulSoup(fp, 'lxml')
    # 2. 将互联网上获取的页面源码加载到该对象中
    # r = requests.get(url)
    # soup = BeautifulSoup(r.text, 'lxml')

    # header
    header = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                      'AppleWebKit/537.36 (KHTML, like Gecko) '
                      'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
    }
    # url
    url = 'https://xy.lymtics.top/'
    # 发起请求
    r = requests.get(url, headers=header)
    soup = BeautifulSoup(r.text, 'lxml')

    # BeautifulSoup 属性与方法
    # soup.tagName 返回文档中第一次出现的tag标签
    print(soup.p)
    # soup.find(tagName,[class_/id/attr=string]) 返回第一次出现的tag标签
    print(soup.find('p'))
    print(soup.find('p', id='lyrics_2'))
    # soup.find_all(tagName,[class_/id/attr=string]) 找到符合条件的列表
    print(soup.find_all('p'))
    # soup.select('selector') 根据选择器返回列表(可以是id, class, 标签等等选择器
    print(soup.select('#lyrics_2 > span')[0])

    # 获取标签之间的文本数据
    # 两个属性, 一个方法
    print(soup.p.text)
    print(soup.p.string)  # 区别: 只能获取直系的文本内容
    print(soup.p.get_text())

    # 获取标签的属性值
    print(soup.audio['src'])
# 项目:爬取小说所有章节
import requests
import os
from bs4 import BeautifulSoup

if __name__ == '__main__':
    # header
    header = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                      'AppleWebKit/537.36 (KHTML, like Gecko) '
                      'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
    }
    # url
    url = 'https://www.shicimingju.com/book/sanguoyanyi.html'

    r = requests.get(url, headers=header)
    r.encoding = r.apparent_encoding
    soup = BeautifulSoup(r.text, 'lxml')
    chapters = soup.find('div', class_='book-mulu').find_all("a")

    # 创建目录
    if not os.path.exists("./2.3 小说保存结果"):
        os.mkdir("./2.3 小说保存结果")

    for chapter in chapters:
        url = 'http://www.shicimingju.com' + chapter["href"]
        r = requests.get(url, headers= header)
        r.encoding = r.apparent_encoding
        Tmp_soup = BeautifulSoup(r.text, 'lxml')
        text = Tmp_soup.select(".chapter_content")[0].text
        # 打开文件, 写入内容
        fileName = "./2.3 小说保存结果/" + chapter.text + '.txt'
        with open(fileName, 'w', encoding="utf-8") as fp:
            fp.write(text)
            fp.close()
        print(chapter.text + " 保存成功!")
    print("-----!程序结束!-----")

xpath

import requests
from lxml import etree

if __name__ == '__main__':
    # headers
    header = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                      'AppleWebKit/537.36 (KHTML, like Gecko) '
                      'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
    }
    # url
    url = "http://xy.lymtics.top"

    r = requests.get(url, headers=header)

    # 实例化一个etree对象
    # 1. 将本地的html文档中的源码数据加载到etree对象中:
    # etree.parse(filePath)
    # 2. 将互联网上的源码加载到该对象
    t = etree.HTML(r.text)
    # xpath表达式
    # - /:表示的是从根节点开始定位; 或表示的是一个层级
    # - //:表示多个层级; 或表示从任意位置开始定位
    # - tag[@class='className']: 属性定位
    # - 取文本: /text()
    # - 取属性: /@attriName
    print(t.xpath('html/div'))
    print(t.xpath('//p[@id="lyrics_1"]'))

项目:58同城

import requests
from lxml import etree

if __name__ == '__main__':
    # headers
    header = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                      'AppleWebKit/537.36 (KHTML, like Gecko) '
                      'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
    }
    # url
    url = "https://nj.58.com/ershoufang/"
    r = requests.get(url, headers=header)
    tree = etree.HTML(r.text)

    # 分别保存名称, 价格, 均价
    name_list = tree.xpath("//h3[@class='property-content-title-name']/text()")
    price_list = tree.xpath("//span[@class='property-price-total-num']/text()")
    perPrice_list = tree.xpath("//p[@class='property-price-average']/text()")

    fp = open("./58二手房南京.txt", 'w', encoding="utf-8")
    for i in range(len(name_list)):
        fp.write("¥" + price_list[i] + '万\t\t\t' + perPrice_list[i] + '\t\t\t' + name_list[i] + '\n')

    fp.close()

项目:百度图片

import requests
import os
import re
from lxml import etree

if __name__ == '__main__':
    # url
    url = 'https://image.baidu.com/search/acjson'
    # headers
    header = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                      'AppleWebKit/537.36 (KHTML, like Gecko) '
                      'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
    }
    # keyword
    keyword = '张慧雯'
    num = 30  # 需要的图片数量, 对应param中的pn参数, 建议是30的倍数
    # param
    param = {
        'tn': 'resultjson_com',
        'logid': '8816883031229968680',
        'ipn': 'rj',
        'ct': '201326592',
        'is': '',
        'fp': 'result',
        'queryWord': keyword,
        'cl': '2',
        'lm': '-1',
        'ie': 'utf-8',
        'oe': 'utf-8',
        'adpicid': '',
        'st': '',
        'z': '',
        'ic': '',
        'hd': '',
        'latest': '',
        'copyright': '',
        'word': keyword,
        's': '',
        'se': '',
        'tab': '',
        'width': '',
        'height': '',
        'face': '',
        'istype': '',
        'qc': '',
        'nc': '1',
        'fr': '',
        'expermode': '',
        'force': '',
        'pn': num,  # 结束的位置, 每次ajax请求返回30个图片
        'rn': '30',  # 这个是开始的位置
        'gsm': '3c',
        '1611283749172': ''
    }

    r = requests.get(url, headers=header, params=param)

    with open(keyword+"搜索结果.html", 'w', encoding='utf-8') as fp:
        fp.write(r.text)

    reg = r'thumbURL":"(.*?)"'
    regName = r'"fromPageTitleEnc":"(.*?\s?.*?)"'
    urlList = re.findall(reg, r.text, re.S)
    nameList = re.findall(regName, r.text, re.S)

    path = "./2.6" + keyword + " 搜索结果"
    if not os.path.exists(path):
        os.mkdir(path)

    for i in range(30):
        newURL = urlList[i]
        newName = str(i+1) + nameList[i].replace('\\', '').replace('?', '').replace('/','').replace('/','').replace(':', '') + ".jpg"
        print("第 " + str(i+1) + " 个")
        print("目标: " + newName + "\nURL: " + newURL)

        newResponse = requests.get(newURL, headers=header)
        with open((path + "/" + newName), 'wb') as fp:
            fp.write(newResponse.content)
            fp.close()
        print(" 保存完毕!")

模拟登陆

项目:古诗文网验证码识别

from chaojiying import Chaojiying_Client
import requests
from lxml import etree

if __name__ == '__main__':
    # url
    url = 'https://so.gushiwen.cn/user/login.aspx'
    loginURL = 'https://so.gushiwen.cn/user/login.aspx?from=http%3a%2f%2fso.gushiwen.cn%2fuser%2fcollect.aspx'
    # headers
    header = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                      'AppleWebKit/537.36 (KHTML, like Gecko) '
                      'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
    }
    r = requests.get(url, headers=header)

    tree = etree.HTML(r.text)
    # 找到验证码链接
    imgURL = 'https://so.gushiwen.cn' + tree.xpath('//*[@id="imgCode"]/@src')[0]
    # 获取图片内容并保存图片
    img = requests.get(imgURL, headers=header).content
    with open('./验证码.png', 'wb') as fp:
        fp.write(img)
        fp.close()
    im = open('./验证码.png', 'rb').read()
    # 调用平台提供的方法进行数据识别, 返回str
    chaojiying = Chaojiying_Client('LymticS', ';cd{6VsX*UGBG#&2WDB`', '912043')
    Vcode = chaojiying.PostPic(im, 1004)["pic_str"]
    print("Pic_Str : " + Vcode)
    # param
    param = {
        '__VIEWSTATE': 'WTzu914A3wdgjqmODej1C/6wxkP0vcqN6+eQCItA42mkUmkpcBBnTgcMFLhoEBM4dVSmwh8eV+sEpmw8t5nta1oupDDQKYF3HGtoKR17QxsXdmtAwvDdH6HL52Y=',
        '__VIEWSTATEGENERATOR': 'C93BE1AE',
        'from': 'http://so.gushiwen.cn/user/collect.aspx',
        'email': 'DDD',
        'pwd': '{JCM5g}Bb4l?"n>K',
        'code': Vcode,
        'denglu': '登录',
    }

    response = requests.post(loginURL, headers=header, params=param)
    print("Status: " + str(response.status_code))
    with open("古诗文网.html", 'w', encoding='utf-8') as fp2:
        fp2.write(response.text)
        fp2.close()

项目:校园网登录

import requests

if __name__ == '__main__':
    # url
    url = 'http://jwk.njfu.edu.cn/'
    loginURL = 'http://jwk.njfu.edu.cn/_data/login_home.aspx'
    # headers
    header = {
        'Host': 'jwk.njfu.edu.cn',
        'Origin': 'http://jwk.njfu.edu.cn',
        'Referer': 'http://jwk.njfu.edu.cn/_data/login_home.aspx',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                      'AppleWebKit/537.36 (KHTML, like Gecko) '
                      'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
    }
    # param
    data = {
        '__VIEWSTATE': '/wEPDwUKLTMzNjY4NzgxOWRkvUsU20I2vYDlxpA1sjoWhQit5wI71Yw2NIm9hDi0zws=',
        '__VIEWSTATEGENERATOR': '56911C19',
        '__EVENTVALIDATION': '/wEdAAIRYxBzHPv4zphuJg7oAk9kZ5IuKWa4Qm28BhxLxh2oFLftNW2DMo/ERJBF+XkFQfVqp4AMzvkUCbvPwTpUGfFr',
        'pcInfo': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.96 Safari/537.36 Edg/88.0.705.50undefined5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.96 Safari/537.36 Edg/88.0.705.50 SN:NULL',
        'txt_mm_expression': '',
        'txt_mm_length': '',
        'txt_mm_userzh': '',
        'typeName': '%D1%A7%C9%FA',  # 解密后文字: 学生
        'dsdsdsdsdxcxdfgfg': '537711E150923B6EAD19B2B6550EC3',  # md5 混合加密1次
        'fgfggfdgtyuuyyuuckjg': '',
        'validcodestate': '0',
        'Sel_Type': 'STU',
        'txt_asmcdefsddsd': '19xx',
        'txt_pewerwedsdfsdff': '',
        'txt_psasas': '%C7%EB%CA%E4%C8%EB%C3%DC%C2%EB'  # 解密后文字: 请输入密码
    }
    session = requests.Session()
    print("Start")
    r = session.post(loginURL, headers=header, data=data)
    print(r.status_code)
    with open("./校园网.html", 'w', encoding='utf-8') as fp:
        fp.write(r.text)
        fp.close()
import requests

if __name__ == '__main__':
    # url
    url = 'http://www.pigai.org/index.php?a=login'
    # headers
    header = {
        'Host': 'www.pigai.org',
        'Origin': 'http://www.pigai.org',
        'Referer': 'http://www.pigai.org/',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                      'AppleWebKit/537.36 (KHTML, like Gecko) '
                      'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
    }
    # param
    data = {
        'username': 'XXX',
        'password': '',
        'remember': '1',
        'checkhash': '',
        'password_encrypt': 'jMpxr5R5Vn1SbCxasdfhpIJPQbvNMykglZ0ASfq6CNYDs0P60u3SzNS/2osdMqw/nrGPruboiVX4PeQp0clQnFGte2zxfRve42+8AUkOL2ow06gkc6qJXy+ChRL6Vyp0ZFo0KSIC5fXV0uRHbfteUsgFqRNkxmBKZ/zPshAqepBI=',
    }
    session = requests.Session()
    r =session.post(url, headers=header, data = data)
    with open("./批改网登录.html", 'w', encoding="utf-8") as fp:
        fp.write(r.text)
        fp.close()

异步爬取

前置知识

协程

# 协程:
# 也被称为微线程, 是一种用户态内的上下文切换技术,简而言之, 就是通过一个线程实现代码块相互切换执行

# 实现协程有这么几种方法
# - greeniet 早期模块
# - yield 关键字
# - asyncio装饰器 (Python3.4)
#       - 遇到 IO耗时操作 会自动切换到其他任务
# - async, awit关键字 (Python3.5)[推荐]

import asyncio

if __name__ == '__main__':
    '''低情商做法:(Python3.8-) '''
    # @asyncio.coroutine
    # def func1():
    #     print(1)
    #     yield from asyncio.sleep(2) # 遇到IO耗时操作, 自动化切换到其他任务
    #     print(2)

    '''高情商做法:(Python3.8+) '''


    async def func1():
        print(1)
        await asyncio.sleep(2)  # 遇到IO耗时操作, 自动化切换到其他任务
        print(2)


    async def func2():
        print(3)
        print(4)


    tasks = [
        asyncio.ensure_future(func1()),
        asyncio.ensure_future(func2())
    ]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))

协程各功能介绍

import asyncio

if __name__ == '__main__':
    '''协程函数async与协程对象'''
    # 协程函数: 定义函数的时候async def 函数名
    async def func():
        pass
    # 协程对象: 执行协程函数得到的协程对象
    result = func()  # 内部代码不会执行, 只会得到一个协程对象
    # 任务列表
    tasks = [

    ]

    '''await关键字: 等后面的东西有结果了再继续往下走(此时可能会切换到其他任务继续执行)'''
    # await + 可等待的对象(协程对象, Future, Task对象等)
    # done, pending = await asyncio.wait(task_list, timeout = 2)
    async def func2():
        print("Hello")
        response = await asyncio.sleep(2)   # 在这儿会跳到其他函数, 不会继续执行
        print("结束: " + response)

    '''Task对象'''
    # asyncio.create_task(协程对象) 创建task对象
    # 更底层, 不建议用:
    # loop.create_task()
    # ensure_future()

    '''Future对象'''

    '''Python3.7-'''
    # 生成一个事件循环
    loop = asyncio.get_event_loop()
    # 将任务放到人物列表
    loop.run_until_complete(tasks)

    '''Python3.7+(完成了上面的两个功能)'''
    asyncio.run(tasks)

线程池的使用

from multiprocessing.dummy import Pool
import time

if __name__ == '__main__':
    stime = time.time()


    # 模拟阻塞任务
    def A_Process(id):
        print("开始执行 : " + str(id))
        time.sleep(1)
        print("执行结束 : " + str(id))


    # 模拟任务列表
    tasks = {1, 2, 3, 4, 5}

    # 实例化进程池
    pool = Pool(4)
    # 将列表中每一个列表元素传递给第一个函数进行处理
    pool.map(A_Process, tasks)

    etime = time.time()

    print("执行时间: " + str(etime - stime))

项目:爬取梨视频

常规方法:

import requests
from lxml import etree
import random
import re
import time

# 不使用多线程:
# 下载完成! 统计下载用时 : 35.81399393081665 秒
# 下载完成! 统计下载用时 : 18.53400182723999 秒

if __name__ == '__main__':
    stime = time.time()
    # url
    url = 'https://www.pearvideo.com/popular_loading.jsp'
    videoIdURL = 'https://www.pearvideo.com/'
    getVideoURL = 'https://www.pearvideo.com/videoStatus.jsp'
    # headers
    header = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                      'AppleWebKit/537.36 (KHTML, like Gecko) '
                      'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
    }
    # 保存id 之后还会将id替换为url
    IdList = []

    '''
    获取视频页面
    '''

    # param
    mrd = str(random.random())
    param = {
        'reqType': '1',
        'categoryId': '',
        'start': '10',
        'sort': '0',
        'mrd': mrd
    }

    r = requests.get(url, headers=header, params=param)

    tree = etree.HTML(r.text)
    videoList = tree.xpath('/html/body/li/a/@href')

    '''
    获取视频Id码
    '''

    for video in videoList:
        video = videoIdURL + video
        r = requests.get(video, headers=header)
        # 获取contId
        contId = re.findall(r'contId = "(.*?)"', r.text, re.S)[0]
        name = re.findall(r'<h1 class="video-tt">(.*?)</h1>', r.text, re.S)[0]
        dic = {
            'id': contId,
            'name': name
        }
        IdList.append(dic)

    '''
    获取视频链接
    '''
    for contId in IdList:
        # header加固
        header = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                          'AppleWebKit/537.36 (KHTML, like Gecko) '
                          'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75',
            'Host': 'www.pearvideo.com',
            'Referer': 'https://www.pearvideo.com/video_' + str(contId["id"]),
        }
        session = requests.Session()
        mrd = str(random.random())
        param = {
            'contId': contId["id"],
            'mrd': mrd
        }
        r = session.get(getVideoURL, headers=header, params=param)
        contId["link"] = r.json()["videoInfo"]["videos"]["srcUrl"]
        contId["time"] = r.json()["systemTime"]

    '''
    处理url
    '''
    for i in IdList:
        i["link"] = i["link"].replace(i['time'], 'cont-' + i['id'])
        print(i)
    '''
    下载视频
    '''
    for link in IdList:
        fileName = "./" + link["name"] + ".mp4"
        print(link["link"])
        r = requests.get(link["link"])

        with open(fileName, 'wb') as fp:
            fp.write(r.content)

    etime = time.time()

    print("下载完成! 统计下载用时 : {} 秒".format(etime - stime))

线程池:

from multiprocessing.dummy import Pool
import requests
from lxml import etree
import random
import re
import time
import os

# 使用多线程:
# 下载完成! 统计下载用时 : 4.989999771118164 秒
# 下载完成! 统计下载用时 : 5.061960697174072 秒

if __name__ == '__main__':
    stime = time.time()
    # url
    url = 'https://www.pearvideo.com/popular_loading.jsp'
    videoIdURL = 'https://www.pearvideo.com/'
    getVideoURL = 'https://www.pearvideo.com/videoStatus.jsp'
    # headers
    header = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                      'AppleWebKit/537.36 (KHTML, like Gecko) '
                      'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75'
    }
    # 保存id 之后还会将id替换为url
    IdList = []

    '''
    获取视频页面
    '''

    # param
    mrd = str(random.random())
    param = {
        'reqType': '1',
        'categoryId': '',
        'start': '10',
        'sort': '0',
        'mrd': mrd
    }

    r = requests.get(url, headers=header, params=param)

    tree = etree.HTML(r.text)
    videoList = tree.xpath('/html/body/li/a/@href')

    '''
    获取视频Id码
    '''

    for video in videoList:
        video = videoIdURL + video
        r = requests.get(video, headers=header)
        # 获取contId
        contId = re.findall(r'contId = "(.*?)"', r.text, re.S)[0]
        name = re.findall(r'<h1 class="video-tt">(.*?)</h1>', r.text, re.S)[0]
        dic = {
            'id': contId,
            'name': name
        }
        IdList.append(dic)

    '''
    获取视频链接
    '''
    for contId in IdList:
        # header加固
        header = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                          'AppleWebKit/537.36 (KHTML, like Gecko) '
                          'Chrome/87.0.4280.141 Safari/537.36 Edg/87.0.664.75',
            'Host': 'www.pearvideo.com',
            'Referer': 'https://www.pearvideo.com/video_' + str(contId["id"]),
        }
        session = requests.Session()
        mrd = str(random.random())
        param = {
            'contId': contId["id"],
            'mrd': mrd
        }
        r = session.get(getVideoURL, headers=header, params=param)
        contId["link"] = r.json()["videoInfo"]["videos"]["srcUrl"]
        contId["time"] = r.json()["systemTime"]

    '''
    处理url
    '''
    for i in IdList:
        i["link"] = i["link"].replace(i['time'], 'cont-' + i['id'])
        print(i)
    '''
    下载视频
    '''
    # 检查目录
    if not os.path.exists("./4.3 梨视频结果"):
        os.mkdir("./4.3 梨视频结果")
    # for link in IdList:
    def downLoadVideo(link):
        fileName = "./4.3 梨视频结果/" + link["name"] + ".mp4"
        print("开始下载 : " + link["name"])
        r = requests.get(link["link"])

        with open(fileName, 'wb') as fp:
            fp.write(r.content)
        print("下载完成 : " + link["name"])

    pool = Pool(5)
    pool.map(downLoadVideo, IdList)

    etime = time.time()

    print("下载完成! 统计下载用时 : {} 秒".format(etime - stime))

Selenium

项目:再见药监局

from selenium import webdriver
from lxml import etree
from time import sleep

# 实例化一个浏览器对象(传入浏览器驱动)
bro = webdriver.Edge(executable_path="./msedgedriver.exe")
# 打开一个网页
bro.get("http://scxk.nmpa.gov.cn:81/xk/")
# 获取当前页面源码数据
page_text = bro.page_source
# 解析企业名称
tree = etree.HTML(page_text)
list = tree.xpath(r"//*[@id='gzlist']/li/dl/a/text()")
for li in list:
    print(li)
sleep(2)
bro.quit()

项目:淘宝

from selenium import webdriver
from time import sleep

bro = webdriver.Edge("./msedgedriver.exe")
bro.get("https://www.taobao.com/")

# 标签定位
search_input = bro.find_element_by_id('q')

# 标签交互
search_input.send_keys('Iphone')

# 执行一组JS代码
bro.execute_script('window.scrollTo(0, document.body.scrollHeight)')
sleep(2)

btn = bro.find_element_by_css_selector('.btn-search')
btn.click()

# 前进后退
bro.get('http://www.baidu.com')
sleep(2)
bro.back()
sleep(2)
bro.forward()
sleep(2)


sleep(2)
bro.quit()

处理iframe和动作链

from selenium import webdriver
from selenium.webdriver import ActionChains
from time import sleep

bro = webdriver.Edge("./msedgedriver.exe")
bro.get("https://www.runoob.com/try/try.php?filename=jqueryui-api-droppable")

# 如果定位的标签是存在于iframe标签中的则必须使用如下操作
bro.switch_to.frame('iframeResult')  # 切换作用域
div = bro.find_element_by_id('draggable')

# 动作链
action = ActionChains(bro)
# 点击长按指定的标签
action.click_and_hold(div)

for i in range(4):
    # perform()立即执行动作连操作
    action.move_by_offset(17, 0).perform()

# 释放动作链
action.release().perform()

项目:QQ空间

from selenium import webdriver
from time import sleep

if __name__ == '__main__':
    # 准备工作
    bro = webdriver.Edge("./msedgedriver.exe")
    bro.get("https://i.qq.com/")
    # 切换到账号密码登录
    bro.switch_to.frame("login_frame")
    switch = bro.find_element_by_id("switcher_plogin")
    switch.click()
    # 完成账号密码登录
    userName = bro.find_element_by_id('u')
    passWord = bro.find_element_by_id('p')
    userName.send_keys('XXXXX')
    passWord.send_keys('')
    btn = bro.find_element_by_id('login_button')
    btn.click()
    sleep(2)
    # 切换到留言板
    bro.get("https://user.qzone.qq.com/1665650743/334")
    # 按页面爬取全部

项目:12306

# 主要是解决验证码的问题
from selenium import webdriver
from selenium.webdriver import ActionChains
from time import sleep
from PIL import Image
from chaojiying import Chaojiying_Client

if __name__ == '__main__':
    # 准备工作
    bro = webdriver.Edge("./msedgedriver.exe")
    bro.get("https://kyfw.12306.cn/otn/resources/login.html")
    sleep(1)
    bro.find_element_by_xpath(r'/html/body/div[2]/div[2]/ul/li[2]/a').click()
    sleep(1)

    '''截图'''
    # 截图
    bro.save_screenshot("./12306.png")
    # 获取验证码坐标
    codeImgElem = bro.find_element_by_xpath(r'//*[@id="J-loginImg"]')
    codeImgLT = codeImgElem.location  # 左上角坐标
    print(codeImgLT)
    size = codeImgElem.size  # 验证码的大小
    print(size)
    rangle = (
        codeImgLT['x'], codeImgLT['y'],
        codeImgLT['x'] + size['width'],
        codeImgLT['y'] + size['height']
    )
    # 剪切图片
    img = Image.open('./12306.png')
    imgName = './12306Code.png'
    frame = img.crop(rangle)
    frame.save(imgName)

    '''上传到超级鹰并获取图片信息'''
    chaojiying = Chaojiying_Client('XXXX', ';XXXX`', 'XXXX')
    im = open('D:\\CodeLib\\Py\\ScratchTheWeb\\5. Selenium模块\\12306Code.png', 'rb').read()
    Result = chaojiying.PostPic(im, 9004)
    print(Result)
    Result = Result["pic_str"].split('|')
    allLocate = []
    for i in Result:
        print("(" + i + ")")
        x = i.split(',')[0]
        y = i.split(',')[1]
        allLocate.append({
            'x': x,
            'y': y
        })

    '''在网页上点击'''
    for locate in allLocate:
        ActionChains(bro).move_to_element_with_offset(codeImgElem, int(locate['x']), int(locate['y'])).click().perform()
        sleep(2)

Scrapy

太过专业,不用上,故略

一些小项目

单词爬虫

刚学爬虫编写的,很菜很菜

import requests
import lxml
from bs4 import BeautifulSoup
import time

kv = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36 Edg/83.0.478.56'}
t = time.gmtime()

all_file = open("WordList.txt", "r", encoding="utf-8")
Date_file = open("DateTemp.txt", "r+", encoding="utf-8")
times = eval(Date_file.read().strip("\n"))
Date_file.seek(0)
Date_file.write(str(times+1))
Date_file.close()
new_file = open("EveryDayWord-{}.md".format(time.strftime("%Y.%m.%d", t)), "w", encoding="utf-8")

def initial():
    global t
    new_file.write("---\n")
    new_file.write("title: EveryDayWord-" + time.strftime("%Y.%m.%d", t)+"\n")
    new_file.write("tag: [English, 每日必做]\n")
    new_file.write("categories: 英语每日单词\n")
    new_file.write("hide: index\n")
    new_file.write("---\n\n")
    new_file.write("<br>\n\n")
    new_file.write("<center style=\"font-size:200% \"><b>加油! 这是坚持的第<span style=\"color:#B5D46E\"> {} </span>天</b></center>\n\n".format(times))
    new_file.write("<br>\n")

class Word:
    def __init__(self, value):
        self.value = value.strip('\n')

    def OnVocabulary(self):
        url = "https://www.vocabulary.com/dictionary/" + self.value
        content = requests.get(url, headers = kv)
        soup = BeautifulSoup(content.text, 'lxml')
        txt = str(soup.select('.short')[0])
        new_file.write(">\n")
        new_file.write(">" + txt+"\n")
        new_file.write("\n"+"<br>"+"\n")
        new_file.write("\n")

    def OnNetEase(self):
        url = "http://dict.youdao.com/w/" + self.value
        content = requests.get(url, headers=kv)
        soup = BeautifulSoup(content.text, 'lxml')
        # 这里是给他加音标的
        txt_list = soup.select(".pronounce")
        for i in txt_list:
            new_file.write("  {}: ".format(i.contents[0][0]))
            new_file.write(i.span.string + ' ')
        new_file.write("\n")
        # 这里是加汉语意思的
        txt_list = soup.select('.trans-container')[0].ul.contents
        for i in txt_list:
            if i != '\n':
                txt = i.string.split('.')
                new_file.write(">\n")
                new_file.write("><span style=\"color:red\"><b>"+txt[0]+".&nbsp;</b></span>"+txt[1]+"\n")
                new_file.write(">\n")

# 初始化开头
print("开始初始化文章结构")
initial()


# 处理Word部分
new_file.write("\n")
new_file.write("## Some Words\n\n")

All_Words = all_file.readlines()
for i in All_Words:
    if i == '\n':
        break
    print("正在处理单词: "+i.strip('\n'))
    new_file.write(">**"+i.strip('\n')+"**")
    a = Word(i)
    print("开始从有道词典获取资源")
    try:
        a.OnNetEase()
    except:
        print("无法查到单词: "+i)
    print("开始从Vocabulay.com获取资源")
    try:
        a.OnVocabulary()
    except:
        print("无法从Vocabulay.com查到单词: "+i)
        new_file.write("\n<br>\n\n")
    # print("开始从有道词典获取资源(例句)")
    # a.OnNetEase()

new_file.write("<br>\n")
new_file.write("<center><i>释义, 例句等可能参考包括但不限于有道翻译,百度翻译等工具</i></center>\n")
new_file.write("<br>\n")
new_file.write("<center><i>特别推荐: <a href=\"http://www.vocabulary.com\">Vocabulary.com</a> 本文的英文释义选自于该网站</i></center>\n")

all_file.close()
new_file.close()

自动发邮件

似乎没用上爬虫。。。不过是那会儿写的

# coding=utf-8
import smtplib
from email.mime.text import MIMEText
import time


class SendQQEmail:
    # 创建一个类, 需要用户名和密码
    def __init__(self, send_email, send_pwd):
        self.send_email = send_email
        self.send_pwd = send_pwd

    # 该方法用于向目标发送邮件(需要: 收信地址, 主题, 内容)
    def sendToEmail(self, re_email, subject, content):
        msg = MIMEText(content)
        msg['Subject'] = subject
        msg['From'] = self.send_email
        msg['To'] = re_email
        try:
            server = smtplib.SMTP_SSL("smtp.qq.com", 465)
            server.login(self.send_email, self.send_pwd)
            server.sendmail(self.send_email, re_email, msg.as_string())
            self.LOG("Successfully Sent!", subject, content, e=None)
        except Exception as e:
            self.LOG("Failed while Sending!", subject, content, e)
            pass

    # 日志文件, 收看发送的内容, 状况以及报错信息等
    def LOG(self, logMsg, subject, content, e):
        with open('./msg.log', 'a', encoding='utf-8') as fp:
            fp.write("[{}]\t{}\n".format(time.asctime(time.localtime(time.time())), logMsg))
            if e:
                fp.write("<ERROR> {}\n".format(e))
            fp.write("#{}#\n".format(subject))
            fp.write("{}\n\n".format(content))


if __name__ == '__main__':
    user = SendQQEmail('XXX@qq.com', 'Password')
    subject = "Python默认邮件"
    content = "我使用Python发送了一封默认邮件"
    user.sendToEmail('XXXX@qq.com', subject, content)

抢座位

有日志。为了隐私把学校网址和配置文件去掉了!

# Preference 需要自行填写座位ID

import requests
import json
import datetime
import time
import logging

LOGIN_API = "http://XXX.edu.cn/ClientWeb/pro/ajax/login.aspx"
ORDER_API = "http://XXX.edu.cn/ClientWeb/pro/ajax/reserve.aspx"
CANCEL_API = ""

HEADER = {
    "User-Agent": "Mozilla/5.0 (Linux; Android 6.0.1; Moto G (4)) \
    AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.55\
    Mobile Safari/537.36 Edg/96.0.1054.34"
}

# CONFIG = {}


# 登录,获取携带了Cookie的Session
def login():
    LOGIN_DATA = {
        "act" : "login",
        "id" : CONFIG["userInfo"]["id"],
        "pwd": CONFIG["userInfo"]["password"],
        "role": "512",
    }
    try:
        session = requests.Session()
        result = session.get(LOGIN_API, headers=HEADER, data=LOGIN_DATA)
        jsonData = json.loads(result.text)
        if  jsonData["msg"] != "ok":
            raise Exception(jsonData["msg"])
        logging.info(">>>>>LOGIN: 登录成功!")
        logging.info("name: " + jsonData["data"]["name"] + "\t" + "id: " + jsonData["data"]["id"])
    except Exception as err:
        logging.error(">>>>>登录失败!\t msg: " + str(err))
        exit()
    return session

# 预订指定的位置
def order(session, start, end, seat):
    ORDER_DATA = {
        "act": "set_resv",
        "dev_id": seat,
        "start": start,
        "end": end,
    }
    try:
        result = session.get(ORDER_API, headers=HEADER, data=ORDER_DATA)
        jsonData = json.loads(result.text)
        if  jsonData["msg"] != "操作成功!":
            raise Exception(jsonData["msg"])
        logging.info(">>>>>ORDER: 操作成功! Congratulations!")
        logging.info("  seatId: " + seat)
    except Exception as err:
        logging.debug(">>>>>操作失败!\tseatId: " + seatId + " msg: " + str(err))
        return False
    return True


# 初始化
def init():
    with open('./settings.json', 'r', encoding='utf-8') as fp:
        jsonData = json.load(fp)
    LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
    # logging.basicConfig(filename="log.txt", level=logging.DEBUG, format=LOG_FORMAT, filemode="a")
    logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
    return jsonData

if __name__ == '__main__':
    try:
        CONFIG = init()
        session = login()
        seatList = CONFIG["limit"]["preference"]
        tomorrow = datetime.datetime.now() + datetime.timedelta(days = 1)
        startTime = str(tomorrow).split(" ")[0] + " " + CONFIG["limit"]["startTime"]
        endTime = str(tomorrow).split(" ")[0] + " " + CONFIG["limit"]["endTime"]

        timeInfo = time.localtime(time.time())
        todayTime = time.mktime((timeInfo.tm_year, timeInfo.tm_mon, timeInfo.tm_mday, 7, 0, 0, 0, 0, 0))

        while time.time() < todayTime:
            logging.info(str(datetime.datetime.now()) + "\n>>>>>WAITING")
            time.sleep(0.75)
        # 遍历preference级别的预选座位
        result = False;
        for i in range(2):
            logging.info(">>>>>工作中\t第" + str(1 + i) + "次...")
            for seatId in seatList:
                result = order(session, startTime, endTime, seatId)
                if result == True:
                    break;
                time.sleep(0.3)
            if result == True:
                break;
            # 遍历acceptable级别预选的范围(TODO)
            
            logging.info(">>>>>Finished")
    except Exception as e:
        logging.error(">>>>>未知异常!可能是配置文件的格式错误!\t msg: " + str(e))
0

评论区