Python 多线程并行下载代码模板
这个脚本是一个方便复用的并行下载模板,适合从目录页里批量把文件拉到本地。
以下是完整的代码:
import os
import queue
import requests
import threading
from bs4 import BeautifulSoup
def parse_html(html):
datas = {}
soup = BeautifulSoup(html, 'html.parser')
trs = soup.find_all('tr')
for tr in trs:
a_tag = tr.find('a')
href = a_tag.get('href')
label = a_tag.get_text()
datas.setdefault(label, href)
return datas
def make_request(url):
r = requests.get(url)
if r.status_code != requests.codes.OK:
raise RuntimeError(f"http code with error: {r.status_code}")
html = r.text
hrefs = parse_html(html)
return hrefs
def make_download(jobs: queue.Queue):
while (
not jobs.empty()
):
item = jobs.get()
print('download ... ', item)
url = item.get('url')
save_to = item.get('path')
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(save_to, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
def main(nbThread = 6):
tiff_urls = make_request('<http://data.ess.tsinghua.edu.cn/fromglc10_2017v01.html>')
out_dir = os.path.join(os.path.dirname(__file__), 'data.ess.tsinghua')
jobs = queue.Queue()
for key, val in tiff_urls.items():
jobs.put({'path': os.path.join(out_dir, key), 'url': val})
threads = []
for i in range(nbThread):
t = threading.Thread(
target=make_download, args=(jobs, )
)
t.setDaemon(True)
threads.append(t)
t.start()
for t in threads:
t.join()
if __name__ == '__main__':
main()
主要功能
- 从网页目录页里解析出所有文件链接(
parse_html+make_request) - 把「下载任务」塞进一个
queue.Queue - 用
threading.Thread启动多个线程,并行下载文件到本地目录
关键步骤
- 解析目录页
def parse_html(html):
soup = BeautifulSoup(html, 'html.parser')
datas = {}
for tr in soup.find_all('tr'):
a_tag = tr.find('a')
href = a_tag.get('href')
label = a_tag.get_text()
datas.setdefault(label, href)
return datas
- 默认假设一行一个
<tr>,里面有<a>。 label当作本地文件名,href当作下载 URL。
复用时要记得:如果目标网页结构不一样,这里是最可能需要改的地方。
- 构造下载任务队列
jobs = queue.Queue()
for key, val in tiff_urls.items():
jobs.put({'path': os.path.join(out_dir, key), 'url': val})
- 队列元素是一个 dict:
{"path": 本地文件路径, "url": 下载地址}。 - 后面线程都从这个队列里消费任务。
- 多线程下载核心
def make_download(jobs: queue.Queue):
while not jobs.empty():
item = jobs.get()
url = item.get('url')
save_to = item.get('path')
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(save_to, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
stream=True+iter_content,避免一次性把大文件读进内存。- 逻辑很直白:从队列取任务 → 请求 → 按块写入文件。
- 启动线程 & 等待完成
threads = []
for i in range(nbThread):
t = threading.Thread(target=make_download, args=(jobs,))
t.setDaemon(True)
threads.append(t)
t.start()
for t in threads:
t.join()
nbThread控制并发度,网络和服务器扛得住的情况下可以适当调大。join()确保主线程等所有下载线程跑完再退出。
如何复用
- 目标目录页 URL
tiff_urls = make_request('http://data.ess.tsinghua.edu.cn/fromglc10_2017v01.html')
- 本地输出目录
out_dir = os.path.join(os.path.dirname(__file__), 'data.ess.tsinghua')
- 解析逻辑(HTML 结构变了就要调)
parse_html(html)
- 并发线程数
main(nbThread=6)