diff --git a/class PageContentProducer(multiprocessin.py b/class PageContentProducer(multiprocessin.py new file mode 100644 index 0000000000000000000000000000000000000000..dadf02b18a53a861562594da0cc674f5ab1f34ae --- /dev/null +++ b/class PageContentProducer(multiprocessin.py @@ -0,0 +1,128 @@ +class PageContentProducer(multiprocessing.Process): + def __init__(self, page_list:list, output_queue:multiprocessing.JoinableQueue): + multiprocessing.Process.__init__(self) + self.daemon = True + self.page_list = page_list + self.content_list = [] + self.output_queue = output_queue + + def run(self): + ''' + 向队列中加入每一篇文章 + ''' + self.visit_all_page_to_get_content() + + for content in self.content_list: + print(f"已加入: {content['title']}") + self.output_queue.put(content) + + def visit_all_page_to_get_content(self): + ''' + 使用线程池处理所有的page, 并从每一页上提取所有的文章content + ''' + # 在 3.8 版更改: max_workers 的默认值已改为 min(32, os.cpu_count() + 4)。这个默认值会 + # 保留至少 5 个工作线程用于 I/O 密集型任务。对于那些释放了 GIL 的 CPU 密集型任务,它最多会 + # 使用 32 个 CPU 核心。这样能够避免在多核机器上不知不觉地使用大量资源。 + # 现在 ThreadPoolExecutor 在启动 max_workers 个工作线程之前也会重用空闲的工作线程。 + + # We can use a with statement to ensure threads are cleaned up promptly + with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: + # 向线程池提交任务 + future_to_page = {executor.submit(self.get_page_content, page_url) : page_url for page_url in self.page_list} + for future in concurrent.futures.as_completed(future_to_page): + page = future_to_page[future] + try: + # 获取任务执行结果 + result_list = future.result() + self.content_list += result_list + except Exception as e: + print(f'{page} generated an exception: {e}') + print(f'共提取到{len(self.content_list)}条content记录') + + def get_page_content(self, page_url) -> list: + ''' + 线程工作函数, 访问page_url, 并提取该页面上的所有文章, 以列表形式返回 + ''' + content_list = [] + try: + res = requests.get(url=page_url) + if 200 == res.status_code: + page_html = res.content.decode('utf-8') + soup = bs(page_html, 'lxml') + items = soup.find_all('li', onclick=re.compile('.*content_[0-9]*.*')) + print(f'从page: {page_url} 上提取到了[{len(items)}]个content') + + for item in items: + content = {} + # 提取标题 + item_title = item.find('a', href='#') + content['title'] = item_title.text + # 提取图片数目 + item_num = item.find('span') + content['num'] = item_num.text + # 提取url, 格式为location.href='content_48388.html'; + href = item['onclick'] + item_url = href.split("'")[1] + content['url'] = 'https://xxxx.xyz/' + item_url + content_list.append(content) + except Exception as e: + print(f'从page: {page_url} 上添加content失败') + print(repr(e)) + return content_list + + +class PageContentConsumer(multiprocessing.Process): + def __init__(self, dir, input_queue:multiprocessing.JoinableQueue): + multiprocessing.Process.__init__(self) + self.daemon = True + self.input_queue = input_queue + self.dir = dir + + def run(self): + while True: + try: + content = self.input_queue.get() + if content is None: + # 如果收到结束标志, 就退出当前任务 + break + self.content_worker_func(self.dir, content) + print(f"已处理: {content['title']}") + # 发出信号通知任务完成 + self.input_queue.task_done() + except Exception as e: + print(repr(e)) + + def content_worker_func(self, dir, content): + title = content['title'] + num = content['num'] + content_url = content['url'] + count = 0 + img_url_list = [] + pid = os.getpid() + + try: + folder_name = title + '_' + num + path = os.path.join(dir, folder_name) + if os.path.isdir(path): + pass + else: + os.makedirs(path) + + content_parser = ContentPageParser(content_url) + content_parser.visit_content_page_with_firefox() + img_url_list = content_parser.get_img_src() + except Exception as e: + print(repr(e)) + + pic_download_threads = [] + for img_url in img_url_list: + count += 1 + file_name = 'img_' + str(count) + thread_name = str(pid) + ':' + str(count) + pic_download_threads.append(PicDownloader(thread_name, img_url, file_name, path)) + + for working_thread in pic_download_threads: + working_thread.start() + + for working_thread in pic_download_threads: + working_thread.join(3) diff --git a/import multiprocessing.py b/import multiprocessing.py new file mode 100644 index 0000000000000000000000000000000000000000..351ead4a97436079d8463d8235f98b33d5674b52 --- /dev/null +++ b/import multiprocessing.py @@ -0,0 +1,47 @@ +import multiprocessing +import os +from home_page_parser import HomePageParser +from page_content_handler import PageContentConsumer, PageContentProducer + +def main(dir): + # 先解析出每一页的url, 生成一个page_list列表 + home_page_parser = HomePageParser() + home_page_parser.parse_home_page() + page_list = home_page_parser.get_page_list() + + # 创建一个队列, 用于生产者和消费者进行进程间同步 + queue = multiprocessing.JoinableQueue() + + # 创建多个消费者, 消费者的个数取决于cpu的数目 + consumer_num = os.cpu_count() + consumers = [] + + for i in range(0, consumer_num): + consumers.append(PageContentConsumer(dir, queue)) + + print(f'total {consumer_num} consumers') + + # 启动消费者进程 + for i in range(0, consumer_num): + consumers[i].start() + + # 启动生产者进程, 并等待它执行完毕 + producer = PageContentProducer(page_list, queue) + producer.start() + producer.join() + + # 在队列上放置标志,这里我们使用None作为标志,发出完成信号 + # 注意, 总共有多少个消费者,就需要放置多少个标志 + for i in range(0, consumer_num): + queue.put(None) + + # 等待所有消费者进程关闭 + for i in range(0, consumer_num): + consumers[i].join() + +if __name__ == '__main__': + multiprocessing.freeze_support() + + dir = 'd:test/consumer2' + main(dir) +