加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
spider_movie_server.py 8.46 KB
一键复制 编辑 原始数据 按行查看 历史
mason101 提交于 2020-09-26 10:04 . 视频资源抓取
# -*- coding: utf-8 -*-
# Project : tornado_video
# FileName : spider_movie_server.py
# Time : 2020/9/25 10:18
# Author  : Mason101
import time
import requests
from lxml import etree
from commom.MysqlHandler import DBSession, Video, Director, Actor, VideoResource, Category, Tag, TVideoDirector, TVideoActor, TVideoTag
class BaseHandler(object):
def __init__(self):
self.db = DBSession()
def save_video(self, name, *args, **kwargs):
video = self.db.query(Video).filter_by(name=name).first()
if video:
print(f"video: {video.name}")
else:
video = Video(name=name,
cover=kwargs.get("cover"),
score=kwargs.get("score"),
introduction=kwargs.get("introduction"),
newest_episode=kwargs.get("newest_episode"),
category_id=kwargs.get("category").id if kwargs.get("category") else None)
self.db.add(video)
self.db.commit()
print(f"影片【{name}】新增成功!")
# 绑定资源
if kwargs.get("episodes") and kwargs.get("episodes_url"):
episodes = kwargs.get("episodes")
episodes_url = kwargs.get("episodes_url")
for n in range(len(episodes)):
self.save_resource(episodes[n], episodes_url[n], video)
# 绑定导演
if kwargs.get("director"):
director = self.get_director(kwargs.get("director"))
video_director = TVideoDirector(video_id=video.id, director_id=director.id)
self.db.add(video_director)
self.db.commit()
print(f"影片【{name}】绑定导演成功!")
# 绑定演员
if kwargs.get("actors"):
middles = []
for n in kwargs.get("actors"):
actor = self.get_actor(n)
middles.append(TVideoActor(video_id=video.id, actor_id=actor.id))
self.db.add_all(middles)
self.db.commit()
print(f"影片【{name}】绑定主演成功!")
# 绑定标签
if kwargs.get("tags"):
middles = []
for n in kwargs.get("tags"):
tag = self.get_tag(n)
middles.append(TVideoTag(video_id=video.id, tag_id=tag.id))
self.db.add_all(middles)
self.db.commit()
print(f"影片【{name}】绑定标签成功!")
return video
def save_resource(self, name, url, video):
resource = self.db.query(VideoResource).filter_by(name=name, resource=url).first()
if not resource:
resource = VideoResource(name=name, resource=url, video_id=video.id)
self.db.add(resource)
self.db.commit()
print(f"视频资源【{name}】新增成功!")
return resource
def get_director(self, name, *args, **kwargs):
director = self.db.query(Director).filter_by(name=name).first()
if not director:
director = Director(name=name)
self.db.add(director)
self.db.commit()
print(f"导演【{name}】新增成功!")
return director
def get_actor(self, name, *args, **kwargs):
actor = self.db.query(Actor).filter_by(name=name).first()
if not actor:
actor = Actor(name=name)
self.db.add(actor)
self.db.commit()
print(f"演员【{name}】新增成功!")
return actor
def get_category(self, name, *args, **kwargs):
category = self.db.query(Category).filter_by(name=name).first()
if not category:
category = Category(name=name)
self.db.add(category)
self.db.commit()
print(f"影片类型【{name}】新增成功!")
return category
def get_tag(self, name, *args, **kwargs):
tag = self.db.query(Tag).filter_by(name=name).first()
if not tag:
tag = Tag(name=name)
self.db.add(tag)
self.db.commit()
print(f"影片标签【{name}】新增成功!")
return tag
class ZXZJSpiderHandler(BaseHandler):
def __init__(self, url):
self.url = url
self.headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36"
}
super(ZXZJSpiderHandler, self).__init__()
def get_page(self, i, url):
response = requests.get(url, headers=self.headers)
html = etree.HTML(response.text)
menu = html.xpath("//ul[@class='stui-header__menu']//a/text()")
data = html.xpath("//li[@class='active num']//a/text()")
page = data[0].split("/")[1]
print(f"---------------- 【{menu[i]}】累计:{page} 页 ---------------------------")
return int(page)
def get_response(self, url, params=None):
response = requests.get(url, params=params, headers=self.headers)
print(response.text)
return response
def get_video_detail(self, url):
response = requests.get(url, headers=self.headers)
html = etree.HTML(response.text)
data = html.xpath("//div[@class='data']/text()")
data_more = html.xpath("//div[@class='data-more']//p//text()")
episodes = html.xpath("//div[@class='play-item cont active']//a/text()")
episodes_url = html.xpath("//div[@class='play-item cont active']//a/@href")
data = data[0].split(" / ")
arr = {
"score": float("%.2f" % float(data[0].replace("分", ""))),
"year": data[1],
"category": data[2],
"tag": data[3].split(","),
}
sp_list, s_index = [], 0
for m in range(len(data_more)):
if ":" in data_more[m]:
if m == 0:
s_index = 1
else:
sp_list.append(data_more[s_index: m])
s_index = m + 1
arr["actor"] = [ac for ac in sp_list[0] if "\xa0" not in ac]
arr["director"] = sp_list[1][0]
arr["info"] = data_more[-1].split(":")[1]
arr["episodes"] = episodes
arr["episodes_url"] = episodes_url
print(arr)
return arr
def spider_data(self, url):
print("")
print(f"url: {url}")
response = self.get_response(url)
html = etree.HTML(response.text)
titles = html.xpath("//a[@class='stui-vodlist__thumb lazyload']//@title")
hrefs = html.xpath("//a[@class='stui-vodlist__thumb lazyload']//@href")
cover_img = html.xpath("//a[@class='stui-vodlist__thumb lazyload']//@data-original")
newest_episodes = html.xpath("//a[@class='stui-vodlist__thumb lazyload']//span[2]/text()")
print(titles)
print(hrefs)
print(cover_img)
print(newest_episodes)
for i in range(len(titles)):
try:
video_detail = self.get_video_detail(self.url + hrefs[i])
self.save_video(titles[i],
cover=cover_img[i],
newest_episode=newest_episodes[i],
score=video_detail["score"],
category=self.get_category(video_detail["category"]),
tags=video_detail["tag"],
director=video_detail["director"],
actors=video_detail["actor"],
introduction=video_detail["info"],
episodes=video_detail["episodes"],
episodes_url=video_detail["episodes_url"],
)
time.sleep(0.2)
except Exception as e:
self.db.rollback()
print(e)
def run():
url = "https://www.zxzj.me/"
# url = "https://www.zxzj.me/list/1-1.html"
# obj = ZXZJSpiderHandler(url)
# obj.get_video_detail("https://www.zxzj.me/video/1623-1-1.html")
# obj.spider_data("https://www.zxzj.me/list/5-1.html")
for tt in range(6, 7):
obj = ZXZJSpiderHandler(url)
pages = obj.get_page(tt, f"https://www.zxzj.me/list/{tt}-1.html")
for p in range(pages):
url = f"https://www.zxzj.me/list/{tt}-{p}.html"
obj.spider_data(url)
time.sleep(0.8)
if __name__ == '__main__':
run()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化