加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
20210340640杨浩然.txt 6.84 KB
一键复制 编辑 原始数据 按行查看 历史
杨浩然 提交于 2023-09-18 15:51 . 20210340640杨浩然
import multiprocessing
import os
from home_page_parser import HomePageParser
from page_content_handler import PageContentConsumer, PageContentProducer
def main(dir):
# 先解析出每一页的url, 生成一个page_list列表
home_page_parser = HomePageParser()
home_page_parser.parse_home_page()
page_list = home_page_parser.get_page_list()
# 创建一个队列, 用于生产者和消费者进行进程间同步
queue = multiprocessing.JoinableQueue()
# 创建多个消费者, 消费者的个数取决于cpu的数目
consumer_num = os.cpu_count()
consumers = []
for i in range(0, consumer_num):
consumers.append(PageContentConsumer(dir, queue))
print(f'total {consumer_num} consumers')
# 启动消费者进程
for i in range(0, consumer_num):
consumers[i].start()
# 启动生产者进程, 并等待它执行完毕
producer = PageContentProducer(page_list, queue)
producer.start()
producer.join()
# 在队列上放置标志,这里我们使用None作为标志,发出完成信号
# 注意, 总共有多少个消费者,就需要放置多少个标志
for i in range(0, consumer_num):
queue.put(None)
# 等待所有消费者进程关闭
for i in range(0, consumer_num):
consumers[i].join()
if __name__ == '__main__':
multiprocessing.freeze_support()
dir = 'd:test/consumer2'
main(dir)
class PageContentProducer(multiprocessing.Process):
def __init__(self, page_list:list, output_queue:multiprocessing.JoinableQueue):
multiprocessing.Process.__init__(self)
self.daemon = True
self.page_list = page_list
self.content_list = []
self.output_queue = output_queue
def run(self):
'''
向队列中加入每一篇文章
'''
self.visit_all_page_to_get_content()
for content in self.content_list:
print(f"已加入: {content['title']}")
self.output_queue.put(content)
def visit_all_page_to_get_content(self):
'''
使用线程池处理所有的page, 并从每一页上提取所有的文章content
'''
# 在 3.8 版更改: max_workers 的默认值已改为 min(32, os.cpu_count() + 4)。这个默认值会
# 保留至少 5 个工作线程用于 I/O 密集型任务。对于那些释放了 GIL 的 CPU 密集型任务,它最多会
# 使用 32 个 CPU 核心。这样能够避免在多核机器上不知不觉地使用大量资源。
# 现在 ThreadPoolExecutor 在启动 max_workers 个工作线程之前也会重用空闲的工作线程。
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
# 向线程池提交任务
future_to_page = {executor.submit(self.get_page_content, page_url) : page_url for page_url in self.page_list}
for future in concurrent.futures.as_completed(future_to_page):
page = future_to_page[future]
try:
# 获取任务执行结果
result_list = future.result()
self.content_list += result_list
except Exception as e:
print(f'{page} generated an exception: {e}')
print(f'共提取到{len(self.content_list)}条content记录')
def get_page_content(self, page_url) -> list:
'''
线程工作函数, 访问page_url, 并提取该页面上的所有文章, 以列表形式返回
'''
content_list = []
try:
res = requests.get(url=page_url)
if 200 == res.status_code:
page_html = res.content.decode('utf-8')
soup = bs(page_html, 'lxml')
items = soup.find_all('li', onclick=re.compile('.*content_[0-9]*.*'))
print(f'从page: {page_url} 上提取到了[{len(items)}]个content')
for item in items:
content = {}
# 提取标题
item_title = item.find('a', href='#')
content['title'] = item_title.text
# 提取图片数目
item_num = item.find('span')
content['num'] = item_num.text
# 提取url, 格式为location.href='content_48388.html';
href = item['onclick']
item_url = href.split("'")[1]
content['url'] = 'https://xxxx.xyz/' + item_url
content_list.append(content)
except Exception as e:
print(f'从page: {page_url} 上添加content失败')
print(repr(e))
return content_list
class PageContentConsumer(multiprocessing.Process):
def __init__(self, dir, input_queue:multiprocessing.JoinableQueue):
multiprocessing.Process.__init__(self)
self.daemon = True
self.input_queue = input_queue
self.dir = dir
def run(self):
while True:
try:
content = self.input_queue.get()
if content is None:
# 如果收到结束标志, 就退出当前任务
break
self.content_worker_func(self.dir, content)
print(f"已处理: {content['title']}")
# 发出信号通知任务完成
self.input_queue.task_done()
except Exception as e:
print(repr(e))
def content_worker_func(self, dir, content):
title = content['title']
num = content['num']
content_url = content['url']
count = 0
img_url_list = []
pid = os.getpid()
try:
folder_name = title + '_' + num
path = os.path.join(dir, folder_name)
if os.path.isdir(path):
pass
else:
os.makedirs(path)
content_parser = ContentPageParser(content_url)
content_parser.visit_content_page_with_firefox()
img_url_list = content_parser.get_img_src()
except Exception as e:
print(repr(e))
pic_download_threads = []
for img_url in img_url_list:
count += 1
file_name = 'img_' + str(count)
thread_name = str(pid) + ':' + str(count)
pic_download_threads.append(PicDownloader(thread_name, img_url, file_name, path))
for working_thread in pic_download_threads:
working_thread.start()
for working_thread in pic_download_threads:
working_thread.join(3)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化