代码拉取完成,页面将自动刷新
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import codecs
import copy
import csv
import json
import logging
import logging.config
import math
import os
import random
import re
import sqlite3
import sys
import warnings
from collections import OrderedDict
from datetime import date, datetime, timedelta
from pathlib import Path
from queue import Queue
from time import sleep
import requests
from lxml import etree
from requests.adapters import HTTPAdapter
from tqdm import tqdm
import pymysql
import const
from util import csvutil
from util.dateutil import convert_to_days_ago
from util.notify import push_deer
from util.notify import push_msg
from concurrent.futures import ThreadPoolExecutor,as_completed
import twitterMedia
warnings.filterwarnings("ignore")
# 如果日志文件夹不存在,则创建
if not os.path.isdir("log/"):
os.makedirs("log/")
logging_path = os.path.split(os.path.realpath(__file__))[0] + os.sep + "logging.conf"
logging.config.fileConfig(logging_path)
logger = logging.getLogger("twitter")
class TwitterCrawler(object):
def __init__(self, config):
"""Weibo类初始化"""
self.validate_config(config)
self.filter = config["filter"] # 取值范围为0、1,程序默认值为0,代表要爬取用户的全部微博,1代表只爬取用户的原创微博
since_date = config["since_date"]
if isinstance(since_date, int):
since_date = date.today() - timedelta(since_date)
since_date = str(since_date)
self.since_date = since_date # 起始时间,即爬取发布日期从该值到现在的微博,形式为yyyy-mm-dd
self.write_mode = config["write_mode"] # 结果信息保存类型,为list形式,可包含csv、mongo和mysql三种类型
self.pic_download = config["pic_download"] # 取值范围为0、1, 0代表不下载原创微博图片,1代表下载
self.video_download = config["video_download"] # 取值范围为0、1, 0代表不下载原创微博图片,1代表下载
self.cookie = config["cookie"] # cookie
self.authorization = config["authorization"] # authorization
self.x_csrf_token = config["x_csrf_token"] # x_csrf_token
self.update_user_follow = config["update_user_follow"] # 更新用的 follow
self.update_user_follow_update = config["update_user_follow_update"] # 更新用的 follow
self.update_user_follow_day = config["update_user_follow_day"] # 更新用的 follow 天数
self.update_user_info = config["update_user_info"] # 更新用户信息
self.update_my_follow_user_info = config["update_my_follow_user_info"] # 更新用户信息
self.update_user_media = config["update_user_media"] # update_user_media
self.update_my_follow_user_media = config["update_my_follow_user_media"] # update_my_follow_user_media
self.update_day = config["update_day"] # 几天内更新过media,就不进行更新
self.down_dir = config["down_dir"] # 下载目录
self.media_exist_one_return = config["media_exist_one_return"] # 存在media是否跳过
self.min_following = config["min_following"] # 最少following进行抓取
self.save_at_user = config["save_at_user"] # 是否保存media@的用户
self.update_database_following_user = config["update_database_following_user"] # 是否抓取数据库user的following
self.update_database_following_day = config["update_database_following_day"] # 是否抓取数据库user的following 天数
self.database_user_following_min_following = config["database_user_following_min_following"] # 抓取数据库user的following,最少following
self.re_down_file = config["re_down_file"] # 重新下载文件
# 请求数据headers
self.headers = {
"Host": "twitter.com",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:104.0) Gecko/20100101 Firefox/104.0",
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
"content-type": "application/json",
"x-twitter-auth-type": "OAuth2Session",
"x-twitter-client-language": "en",
"x-twitter-active-user": "yes",
"x-csrf-token": self.x_csrf_token,
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"authorization": self.authorization,
"Referer": "https://twitter.com/",
"Connection": "keep-alive",
"Pragma": "no-cache",
"Cache-Control": "no-cache",
"TE": "trailers",
"Cookie": self.cookie
}
self.follow_headers = {
"Host": "twitter.com",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:103.0) Gecko/20100101 Firefox/103.0",
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
"content-type": "application/x-www-form-urlencoded",
"x-twitter-auth-type": "OAuth2Session",
"x-twitter-client-language": "en",
"x-twitter-active-user": "no",
"x-csrf-token": self.x_csrf_token,
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"authorization": self.authorization,
"Connection": "keep-alive",
"Pragma": "no-cache",
"Cache-Control": "no-cache",
"TE": "trailers",
"Cookie": self.cookie
}
self.headers_bk = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:103.0) Gecko/20100101 Firefox/103.0",
"Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
"content-type": "application/json",
"x-twitter-auth-type": "OAuth2Session",
"x-csrf-token": self.x_csrf_token,
"authorization": self.authorization,
"Cookie": self.cookie,
}
# 下载图片数据headers
self.down_headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:103.0) Gecko/20100101 Firefox/103.0",
}
# 创建MySQL链接
self.mysql_config = config.get("mysql_config") # MySQL数据库连接配置,可以不填
self.con = pymysql.connect(host=self.mysql_config["host"],
user=self.mysql_config["user"],
passwd=self.mysql_config["password"],
db=self.mysql_config["db"],
port=self.mysql_config["port"],
charset=self.mysql_config["charset"])
# 通过cursor()创建一个游标对象
self.cur = self.con.cursor()
user_id_list = config["user_id_list"]
query_list = config.get("query_list") or []
if isinstance(query_list, str):
query_list = query_list.split(",")
self.query_list = query_list
if not isinstance(user_id_list, list):
if not os.path.isabs(user_id_list):
user_id_list = (
os.path.split(os.path.realpath(__file__))[0] + os.sep + user_id_list
)
self.user_config_file_path = user_id_list # 用户配置文件路径
user_config_list = self.get_user_config_list(user_id_list)
else:
self.user_config_file_path = ""
user_config_list = [
{
"user_id": user_id,
"since_date": self.since_date,
"query_list": query_list,
}
for user_id in user_id_list
]
self.user_config_list = user_config_list
self.cursor = ""
self.userid = ""
self.threadPool = ThreadPoolExecutor(max_workers=10, thread_name_prefix="down_")
self.conQueue = Queue(15)
for num in range(0, 10):
self.conQueue.put(pymysql.connect(host=self.mysql_config["host"],
user=self.mysql_config["user"],
passwd=self.mysql_config["password"],
db=self.mysql_config["db"],
port=self.mysql_config["port"],
charset=self.mysql_config["charset"]))
def get_user_config_list(self, file_path):
"""获取文件中的id信息"""
with open(file_path, "rb") as f:
try:
lines = f.read().splitlines()
lines = [line.decode("utf-8-sig") for line in lines]
except UnicodeDecodeError:
logger.error("%s文件应为utf-8编码,请先将文件编码转为utf-8再运行程序", file_path)
sys.exit()
user_config_list = []
for line in lines:
info = line.split(" ")
if len(info) > 0 and info[0].isdigit():
user_config = {}
user_config["user_id"] = info[0]
if len(info) > 2:
if self.is_date(info[2]):
user_config["since_date"] = info[2]
elif info[2].isdigit():
since_date = date.today() - timedelta(int(info[2]))
user_config["since_date"] = str(since_date)
else:
user_config["since_date"] = self.since_date
if len(info) > 3:
user_config["query_list"] = info[3].split(",")
else:
user_config["query_list"] = self.query_list
if user_config not in user_config_list:
user_config_list.append(user_config)
return user_config_list
def validate_config(self, config):
"""验证配置是否正确"""
# 验证
# filter 获取类型
# pic_download 是否下载
# video_download 是否下载
argument_list = [
"filter",
"pic_download",
"video_download ",
]
# 验证user_id_list
user_id_list = config["user_id_list"]
if (not isinstance(user_id_list, list)) and (not user_id_list.endswith(".txt")):
logger.warning("user_id_list值应为list类型或txt文件路径")
sys.exit()
if not isinstance(user_id_list, list):
if not os.path.isabs(user_id_list):
user_id_list = (
os.path.split(os.path.realpath(__file__))[0] + os.sep + user_id_list
)
if not os.path.isfile(user_id_list):
logger.warning("不存在%s文件", user_id_list)
sys.exit()
def is_date(self, since_date):
"""判断日期格式是否正确"""
try:
datetime.strptime(since_date, "%Y-%m-%d")
return True
except ValueError:
return False
def start(self):
"""运行爬虫"""
try:
logger.info("获取数据库用户following")
if self.update_database_following_user:
self.get_datebase_user_following()
# 配置文件中的用户信息
logger.info("用户关注的数据同步")
if self.update_user_follow_update:
for user_config in self.user_config_list:
self.initialize_info(user_config)
self.get_live_user_info()
#logger.info("更新my follow用户信息")
# if self.update_my_follow_user_info:
# self.get_my_follow_update()
logger.info("更新my follow_user media用户信息")
if self.update_my_follow_user_media:
self.get_my_follow_media()
# logger.info("更新用户信息")
# if self.update_user_info:
# self.get_user_update()
# logger.info("获取数据库用户following")
# if self.update_database_following_user:
# self.get_datebase_user_following()
#logger.info("同步UserMedia 数据")
# if self.update_user_media:
# self.get_user_media()
#logger.info("删除下载错误文件,重新下载")
# if self.re_down_file:
# self.re_down_files(self.down_dir)
#
#
except Exception as e:
logger.exception(e)
def stop(self):
"""运行停止"""
try:
self.cur.close()
self.con.close()
except Exception as e:
logger.exception(e)
# 获取用户关注的用户信息
def get_follow_json(self):
proxies = {'http': 'http://127.0.0.1:10887', 'https': 'http://127.0.0.1:10887'}
url = 'https://twitter.com/i/api/graphql/oVFqtmOJQqJCWADpG3RUYQ/Following?variables={"userId":"' + self.userid + '",'
if len(self.cursor) > 3:
url = url + '"cursor":"' + self.cursor + '",'
url = url + '"count":20,"includePromotedContent":false,"withSuperFollowsUserFields":true,"withDownvotePerspective":false,"withReactionsMetadata":false,"withReactionsPerspective":false,"withSuperFollowsTweetFields":true}&features={"dont_mention_me_view_api_enabled":true,"interactive_text_enabled":true,"responsive_web_uc_gql_enabled":true,"vibe_api_enabled":true,"responsive_web_edit_tweet_api_enabled":true,"standardized_nudges_misinfo":true,"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled":false,"responsive_web_enhance_cards_enabled":false}'
r = requests.get(url, params="", headers=self.headers, proxies=proxies, verify=False)
self.cursor = ''
return r.json(), r.status_code
# 获取用户的关注人
def get_live_user_info(self):
try:
js, _ = self.get_follow_json()
instructions = js["data"]["user"]["result"]["timeline"]["timeline"]["instructions"]
for itme in instructions:
try:
itme["entries"]
except Exception as e:
logger.error("不存在在 entries节点")
continue
self.get_live_user_info_save(itme,js)
except Exception as e:
logger.error("数据解析失败!")
logger.exception(e)
# 获取用户的关注人
def get_live_user_info_save(self, instruction,js):
try:
user_id_list = instruction["entries"]
# 用户信息
for itme in user_id_list:
try:
itme["content"]["itemContent"]["user_results"]["result"]["rest_id"]
except Exception as e:
logger.error("rest_id 不存在在节点")
continue
user_id = itme["content"]["itemContent"]["user_results"]["result"]["rest_id"]
user_name = itme["content"]["itemContent"]["user_results"]["result"]["legacy"]["name"]
user_account = itme["content"]["itemContent"]["user_results"]["result"]["legacy"]["screen_name"]
user_location = itme["content"]["itemContent"]["user_results"]["result"]["legacy"]["location"]
followers_count = itme["content"]["itemContent"]["user_results"]["result"]["legacy"][
"normal_followers_count"]
friends_count = itme["content"]["itemContent"]["user_results"]["result"]["legacy"][
"friends_count"]
description = itme["content"]["itemContent"]["user_results"]["result"]["legacy"]["description"]
self.save_my_follow(1, user_id, user_name, user_account, user_location, followers_count, description, friends_count = friends_count,my_follow=1)
# 随机休眠
#sleep(random.randint(1, 5))
# 是否有下一页
jsString = str(js)
value_list = re.findall(r"'value': '(.*?)'", jsString)
cursorType_list = re.findall(r"'cursorType': '(.*?)'", jsString)
# 进行下一页同步
if len(cursorType_list) > 1 and len(cursorType_list) > 1 and len(value_list[0]) > 32:
self.cursor = value_list[0]
self.get_live_user_info()
# 存入数据库
except Exception as e:
logger.error("数据解析失败!")
logger.exception(e)
# 更新用户信息
def save_my_follow(self, update, user_id, user_name, user_account, user_location, followers_count, description,
friends_count=0,my_follow=0):
self.cur.execute('SELECT * FROM tw_my_follow where user_id = %s' % user_id)
rowcount = self.cur.rowcount
logger.info(user_id)
# 插入不存在的
if (rowcount < 1):
logger.info("===============================save user_id====================")
# -100 填后续可以更新
import datetime
someDay = (datetime.datetime.now() + datetime.timedelta(days=-100)).strftime("%Y-%m-%d %H:%M:%S")
self.cur.execute(
"""
INSERT INTO tw_my_follow (user_id,user_name,user_account,user_location,followers_count,
friends_count,user_describe,modify_time,my_follow)
VALUES (%s,%s,%s,%s,%s,
%s,%s,%s,%s)
""",
(user_id, user_name, user_account, user_location, followers_count,
friends_count, description, someDay,my_follow))
else:
if (update):
self.cur.execute(
'''
UPDATE tw_my_follow SET user_name= %s , user_account= %s,user_location= %s,
followers_count= %s,friends_count= %s,user_describe= %s,my_follow= %s
WHERE user_id = %s
''',
(user_name, user_account, user_location,
followers_count, friends_count, description, my_follow,
user_id))
self.con.commit()
# 获取用户自己发的文章
def get_my_follow_media(self):
try:
#优先更新自己的
self.cur.execute("""
SELECT
id,user_id,my_follow
FROM
tw_my_follow u
WHERE
(modify_time < date_add(now(), interval -%s day)
or not exists (
select
1
from
tw_user_media e
where
e.user_id = u.user_id)
)
and not exists (
select
1
from
tw_user_exclude e
where
e.user_id = u.user_id)
order by my_follow desc,id
""" % (self.update_day))
# self.cur.execute("""
# SELECT
# *
# FROM
# tw_my_follow u
# WHERE
# user_id='985011252362530816'
#
# """ )
res = self.cur.fetchall()
obj_list = []
for line in res:
media = twitterMedia.TwitterMeadia(self)
obj = self.threadPool.submit(media.fun_findMeadia,line)
obj_list.append(obj)
for future in as_completed(obj_list):
data = future.result()
# for line in res:
# # 更新用户信息 没做
# # 获取用户media信息
# self.get_live_user_media_info(line)
# self.cur.execute(
# ' UPDATE tw_my_follow SET modify_time=now() WHERE user_id = %s', line[1])
#
# self.con.commit()
#
# logger.info("===============================获取用户数据 完毕====================")
# logger.info(line[1])
except Exception as e:
logger.error("同步用户数据失败!")
logger.exception(e)
def initialize_info(self, user_config):
"""初始化爬虫信息"""
self.userid = user_config["user_id"]
# 更新用户信息
def save_user_info(self,update,user_id, user_name, user_account, user_location, followers_count,description,friends_count=0):
self.cur.execute('SELECT * FROM tw_user where user_id = %s' % user_id)
rowcount = self.cur.rowcount
logger.info(user_id)
# 插入不存在的
if (rowcount < 1):
logger.info("===============================save user_id====================")
# -100 填后续可以更新
import datetime
someDay=(datetime.datetime.now() + datetime.timedelta(days=-100)).strftime("%Y-%m-%d %H:%M:%S")
self.cur.execute(
"""
INSERT INTO tw_user (user_id,user_name,user_account,user_location,followers_count,friends_count,user_describe,modify_time)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
""",
(user_id, user_name, user_account, user_location, followers_count,friends_count,description,someDay ))
else:
if(update):
if followers_count :
self.cur.execute(
' UPDATE tw_user SET user_name= %s , user_account= %s,user_location= %s,followers_count= %s,friends_count= %s,user_describe= %s'
' WHERE user_id = %s',
(user_name, user_account, user_location, followers_count,friends_count,description, user_id))
if not followers_count:
self.cur.execute(
' UPDATE tw_user SET user_name= %s , user_account= %s,user_location= %s,friends_count= %s,user_describe= %s'
' WHERE user_id = %s',
(user_name, user_account, user_location, followers_count, description, user_id))
self.con.commit()
# 获取用户自己发的文章
def get_live_user_media_info(self, line):
try:
js, _ = self.get_user_media_json(line)
try:
js["data"]["user"]["result"]["timeline_v2"]["timeline"]["instructions"]
except Exception as e:
try:
UserUnavailable = js["data"]["user"]["result"]["__typename"]
if UserUnavailable == 'UserUnavailable':
# 需要关注
self.cur.execute(
' UPDATE tw_user SET need_follow=2 WHERE user_id = %s', line[1])
self.con.commit()
# self.follow_user_post_create(line)
return
except Exception as e:
logger.error("捕获没关注失败!")
logger.exception(e)
logger.error("instructions entries 节点不存在")
return
instructions = js["data"]["user"]["result"]["timeline_v2"]["timeline"]["instructions"]
for itme in instructions:
if "entries" in itme:
self.get_live_user_media_info_save(line, js, itme)
except Exception as e:
logger.error("数据解析失败!")
logger.exception(e)
# 获取用户自己发的文章
def get_user_media(self):
try:
#self.cur.execute('SELECT * FROM tw_user WHERE modify_time < date_add(now(), interval -%s day) order by id LIMIT 50,50',self.update_day)
#self.cur.execute("SELECT * FROM tw_user tum WHERE user_id ='1446564847865384960'")
#self.cur.execute("SELECT * from ( SELECT * FROM tw_user u WHERE user_id='4063091054' and modify_time < date_add(now(), interval -0 day) order by id LIMIT 100 ) tt WHERE not EXISTS (SELECT 1 from tw_user_media me WHERE tt.user_id=me.user_id)")
#self.cur.execute("SELECT * from ( SELECT * FROM tw_user u WHERE modify_time < date_add(now(), interval -0 day) order by id LIMIT 100 ) tt WHERE not EXISTS (SELECT 1 from tw_user_media me WHERE tt.user_id=me.user_id) LIMIT 10")
#sql ="SELECT * FROM tw_user u WHERE followers_count >"+str(self.min_following)+" order by id desc"
# self.cur.execute("""
# SELECT * FROM tw_user u WHERE id >2805
#
# """)
self.cur.execute("""
SELECT
*
FROM
tw_user u
WHERE
followers_count > %s
and u.status =1
and length(user_id) > 11
and modify_time < date_add(now(), interval -%s day)
and
not EXISTS (
SELECT
1
from
tw_user_media me
WHERE
u.user_id = me.user_id)
and
not EXISTS (
SELECT
1
from
tw_user_exclude me
WHERE
u.user_id = me.user_id)
"""
% (self.min_following, self.update_day))
res = self.cur.fetchall()
for line in res:
# 更新用户信息 没做
# 获取用户media信息
self.get_live_user_media_info(line)
self.cur.execute(
' UPDATE tw_user SET modify_time=now() WHERE user_id = %s',line[1])
self.con.commit()
logger.info("===============================获取用户数据 完毕====================")
logger.info(line[1])
except Exception as e:
logger.error("同步用户数据失败!")
logger.exception(e)
# 获取用户自己发的文章
def get_live_user_media_info(self, line):
try:
js, _ = self.get_user_media_json(line)
try:
js["data"]["user"]["result"]["timeline_v2"]["timeline"]["instructions"]
except Exception as e:
try:
UserUnavailable=js["data"]["user"]["result"]["__typename"]
if UserUnavailable=='UserUnavailable':
# 需要关注
self.cur.execute(
' UPDATE tw_my_follow SET need_follow=2 WHERE user_id = %s', line[1])
self.con.commit()
#self.follow_user_post_create(line)
return 1
except Exception as e:
logger.error("捕获没关注失败!")
logger.exception(e)
logger.error("instructions entries 节点不存在")
logger.exception(e)
return 1
instructions = js["data"]["user"]["result"]["timeline_v2"]["timeline"]["instructions"]
for itme in instructions:
if "entries" in itme:
self.get_live_user_media_info_save(line, js, itme)
except Exception as e:
logger.error("数据解析失败!")
logger.exception(e)
# 获取用户自己发的文章
def get_live_user_media_info_save(self, line,js,instructions):
try:
user_id = line[1]
media_list = instructions["entries"]
for itme in media_list:
try:
itme["content"]["itemContent"]["tweet_results"]["result"]["rest_id"]
except Exception as e:
logger.error("rest_id 不存在在节点")
continue
rest_id = itme["content"]["itemContent"]["tweet_results"]["result"]["rest_id"]
# 存在 跳过
self.cur.execute('SELECT * FROM tw_user_media where rest_id = %s' % rest_id)
rowcount = self.cur.rowcount
if(rowcount > 0) and self.media_exist_one_return:
return
if (rowcount > 0) and not self.media_exist_one_return:
continue
# 实体信息
extended_entities = itme["content"]["itemContent"]["tweet_results"]["result"]["legacy"]["extended_entities"]
entities = itme["content"]["itemContent"]["tweet_results"]["result"]["legacy"]["entities"]
logger.info("获取media rest_id:"+rest_id)
# full_text
full_text = itme["content"]["itemContent"]["tweet_results"]["result"]["legacy"]["full_text"]
for media in extended_entities["media"]:
# 用户发布信息存入数据库
try:
date_url_https = media["media_url_https"]
type = media["type"]
if type=="video":
date_url_https = str(media["video_info"])
self.cur.execute(
' INSERT INTO tw_user_media_v (user_id,rest_id,full_text,media_url_https,type) VALUES (%s,%s,%s,%s,%s)',
(user_id, rest_id, full_text, date_url_https, type))
self.con.commit()
else:
self.cur.execute(
' INSERT INTO tw_user_media (user_id,rest_id,full_text,media_url_https,type) VALUES (%s,%s,%s,%s,%s)',
(user_id, rest_id, full_text, date_url_https, type))
self.con.commit()
except Exception as e:
logger.error(e)
logger.error("插入数据失败!")
# 存储@的人
if self.save_at_user and "user_mentions" in entities:
for user_mentions in entities["user_mentions"]:
try:
self.save_user_info(0,user_mentions["id_str"], user_mentions["name"],
user_mentions["screen_name"],"",0,"")
except Exception as e:
logger.error("数据解析失败!")
# 随机休眠
# 进行下一页同步
jsString = str(js)
value_list = re.findall(r"'value': '(.*?)'", jsString)
cursorType_list = re.findall(r"'cursorType': '(.*?)'", jsString)
if len(cursorType_list) > 1 and len(media_list) > 2:
self.cursor = value_list[len(value_list)-1]
self.get_live_user_media_info(line)
except Exception as e:
logger.error("数据解析失败!")
logger.exception(e)
# media请求
def get_user_media_json(self,line):
try:
proxies = {'http': 'http://127.0.0.1:10887', 'https': 'http://127.0.0.1:10887'}
url = 'https://twitter.com/i/api/graphql/_vFDgkWOKL_U64Y2VmnvJw/UserMedia?variables={"userId":"' + line[1] + '",'
if len(self.cursor) > 3:
url = url + '"cursor":"' + self.cursor + '",'
url = url + '"count":20,"includePromotedContent":false,"withSuperFollowsUserFields":true,"withDownvotePerspective":false,"withReactionsMetadata":false,"withReactionsPerspective":false,"withSuperFollowsTweetFields":true,"withClientEventToken":false,"withBirdwatchNotes":false,"withVoice":true,"withV2Timeline":true}&features={"responsive_web_graphql_timeline_navigation_enabled":false,"unified_cards_ad_metadata_container_dynamic_card_content_query_enabled":false,"dont_mention_me_view_api_enabled":true,"responsive_web_uc_gql_enabled":true,"vibe_api_enabled":true,"responsive_web_edit_tweet_api_enabled":true,"graphql_is_translatable_rweb_tweet_is_translatable_enabled":false,"standardized_nudges_misinfo":true,"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled":false,"interactive_text_enabled":true,"responsive_web_text_conversations_enabled":false,"responsive_web_enhance_cards_enabled":true}'
r = requests.get(url, params="", headers=self.headers, proxies=proxies, verify=False)
self.cursor = ''
return r.json(), r.status_code
except Exception as e:
logger.error("请求user_media 数据失败!")
logger.exception(e)
# 获取用户信息
def get_user_update(self):
self.cur.execute("""
SELECT * FROM tw_user tu
WHERE (tu.modify_time < date_add(now(), interval -%s day )
or tu.up_followers_time is null
)
and tu.followers_count > 20000
and tu.status =1
and length(user_id) > 11
and not exists (
select
1
from
tw_user_exclude e
where
e.user_id = tu.user_id)
""" % self.update_day)
res = self.cur.fetchall()
self.update_user_info_(res)
# 获取用户信息
def get_my_follow_update(self):
self.cur.execute("""
SELECT * FROM tw_my_follow tu
WHERE not exists (
select
1
from
tw_user_exclude e
where
e.user_id = tu.user_id)
""")
res = self.cur.fetchall()
self.update_user_info_(res)
# 获取用户信息
def update_user_info_(self,res):
for line in res:
# 获取用户信息
try:
js, _ = self.user_by_screen_name(line)
user_name = js["data"]["user"]["result"]["legacy"]["name"]
user_location = js["data"]["user"]["result"]["legacy"]["location"]
description = js["data"]["user"]["result"]["legacy"]["description"]
friends_count = js["data"]["user"]["result"]["legacy"]["friends_count"]
followers_count = js["data"]["user"]["result"]["legacy"]["followers_count"]
except Exception as e:
logger.error("解析用户数据失败!")
logger.exception(e)
logger.error(line[3])
jsString = str(js)
text = re.findall(r"'text': '(.*?)'", jsString)
if len(text) > 0 and text[0] == 'Twitter suspends accounts that violate the Twitter Rules. Learn more' :
# 被停账号停止账户更新
self.cur.execute('update tw_user set status=2 where user_id = %s' % line[1])
self.con.commit()
logger.info("被封用户,更新成功")
continue
self.cur.execute(
' UPDATE tw_user SET user_name= %s , friends_count= %s,user_location= %s,followers_count= %s,user_describe= %s'
' WHERE user_id = %s',
(user_name, friends_count, user_location, followers_count, description, line[1]))
self.con.commit()
logger.info("同步用户信息成功!%s",line[1])
#sleep(random.randint(1, 3))
# 获取用户信息
def user_by_screen_name(self, line):
try:
proxies = {'http': 'http://127.0.0.1:10887', 'https': 'http://127.0.0.1:10887'}
url = 'https://twitter.com/i/api/graphql/vG3rchZtwqiwlKgUYCrTRA/UserByScreenName?variables={"screen_name":"' + \
line[
3] + '","withSafetyModeUserFields":true,"withSuperFollowsUserFields":true}&features={"responsive_web_graphql_timeline_navigation_enabled":false}'
r = requests.get(url, params="", headers=self.headers, proxies=proxies, verify=False)
self.cursor = ''
return r.json(), r.status_code
except Exception as e:
logger.error("请求user_media 数据失败!")
logger.exception(e)
# 获取用户的following信息
def get_datebase_user_following(self):
# self.cur.execute("""
# SELECT * FROM tw_user WHERE user_name like '%aojiaoxiaowanzi%'
# """)
self.cur.execute("""
SELECT * FROM tw_my_follow tu
where search_follow=1 and up_followers_time < date_add(now(), interval -%s day) and
not exists (
select
1
from
tw_user_exclude e
where
e.user_id = tu.user_id)
""", self.update_database_following_day )
res = self.cur.fetchall()
for line in res:
if self.save_datebase_user_following_line(line):
self.cur.execute("""UPDATE tw_my_follow set up_followers_time=now() WHERE user_id=%s
""", line[1])
self.con.commit()
logger.info(str(datetime.now()))
logger.info(line[3]+":用户following获取完成")
# 保存
def save_datebase_user_following_line(self, line):
try:
js, _ = self.get_datebase_user_following_js(line)
if "{'data': {'user': {}}}" in str(js):
return 1
try:
js["data"]["user"]["result"]["timeline"]["timeline"]["instructions"]
except Exception as e:
logger.error("instructions entries 节点不存在")
return
instructions = js["data"]["user"]["result"]["timeline"]["timeline"]["instructions"]
for itme in instructions:
if "entries" in itme:
self.save_datebase_user_following_info(line, js, itme)
return 1
except Exception as e:
logger.error("数据解析失败!")
logger.exception(e)
return 0
# 获取用户following
def save_datebase_user_following_info(self, line, js, instructions):
try:
# 用户信息
for itme in instructions["entries"]:
try:
itme["content"]["itemContent"]["user_results"]["result"]["rest_id"]
except Exception as e:
logger.error("rest_id 不存在在节点")
continue
user_id = itme["content"]["itemContent"]["user_results"]["result"]["rest_id"]
user_name = itme["content"]["itemContent"]["user_results"]["result"]["legacy"]["name"]
user_account = itme["content"]["itemContent"]["user_results"]["result"]["legacy"]["screen_name"]
user_location = itme["content"]["itemContent"]["user_results"]["result"]["legacy"]["location"]
followers_count = itme["content"]["itemContent"]["user_results"]["result"]["legacy"][
"followers_count"]
friends_count = itme["content"]["itemContent"]["user_results"]["result"]["legacy"][
"friends_count"]
description = itme["content"]["itemContent"]["user_results"]["result"]["legacy"]["description"]
protected = itme["content"]["itemContent"]["user_results"]["result"]["legacy"]["protected"]
self.save_my_follow(1, user_id, user_name, user_account, user_location, followers_count, description,friends_count=friends_count)
if protected:
try:
self.follow_user_post_create(user_id)
except Exception as e:
logger.error("follow 失败!")
# 随机休眠 处理的慢不需要
#sleep(random.randint(1, 3))
# 是否有下一页
jsString = str(js)
value_list = re.findall(r"'value': '(.*?)'", jsString)
cursorType_list = re.findall(r"'cursorType': '(.*?)'", jsString)
# 进行下一页同步
if len(cursorType_list) > 1 and len(cursorType_list) > 1 and len(value_list[0]) > 32:
self.cursor = value_list[0]
self.save_datebase_user_following_line(line)
# 存入数据库
except Exception as e:
logger.error("数据解析失败!")
logger.exception(e)
def get_datebase_user_following_js(self, line):
proxies = {'http': 'http://127.0.0.1:10887', 'https': 'http://127.0.0.1:10887'}
url = 'https://twitter.com/i/api/graphql/i94e7ZBJNGVrM_8lFPFjOw/Following?variables={"userId":"' + line[1] + '","count":20,'
if len(self.cursor) > 3:
url = url + '"cursor":"' + self.cursor + '",'
url = url + '"includePromotedContent":false,"withSuperFollowsUserFields":true,"withDownvotePerspective":false,"withReactionsMetadata":false,"withReactionsPerspective":false,"withSuperFollowsTweetFields":true}&features={"responsive_web_graphql_timeline_navigation_enabled":false,"unified_cards_ad_metadata_container_dynamic_card_content_query_enabled":false,"dont_mention_me_view_api_enabled":true,"responsive_web_uc_gql_enabled":true,"vibe_api_enabled":true,"responsive_web_edit_tweet_api_enabled":true,"graphql_is_translatable_rweb_tweet_is_translatable_enabled":false,"standardized_nudges_misinfo":true,"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled":false,"interactive_text_enabled":true,"responsive_web_text_conversations_enabled":false,"responsive_web_enhance_cards_enabled":true}'
r = requests.get(url, params="", headers=self.headers, proxies=proxies, verify=False)
self.cursor = ''
return r.json(), r.status_code
def re_down_files(self , url):
# 遍历当前路径下所有文件
file = os.listdir(url)
for f in file:
real_url = os.path.join(url, f)
if os.path.isfile(real_url):
if os.path.getsize(real_url) < 1000:
logger.info(os.path.abspath(real_url))
try:
dirs = real_url.split("/")
downUrl = "https://pbs.twimg.com/media/" + dirs[9]
downUrl = downUrl[:len(url) - 4] + ".jpg"
self.cur.execute("""
UPDATE tw_user_media set down=0 WHERE media_url_https=%s
""" % downUrl)
self.con.commit()
os.remove(real_url)
except Exception as e:
logger.exception(e)
# 如果是文件,则以绝度路径的方式输出
elif os.path.isdir(real_url):
# 如果是目录,则是地柜调研自定义函数 scaner_file (url)进行多次
self.re_down_files(real_url)
else:
logger.info("其他情况")
pass
# 关注用户
def follow_user_post_create(self, user_id):
proxies = {'http': 'http://127.0.0.1:10887', 'https': 'http://127.0.0.1:10887'}
url = "https://twitter.com/i/api/1.1/friendships/create.json"
bodyData = {
"include_profile_interstitial_type": 1,
"include_blocking": 1,
"include_blocked_by": 1,
"include_followed_by": 1,
"include_want_retweets": 1,
"include_mute_edge": 1,
"include_can_dm": 1,
"include_can_media_tag": 1,
"include_ext_has_nft_avatar": 1,
"skip_status": 1,
"user_id": user_id
}
r = requests.post(url, params="",data =bodyData ,headers=self.follow_headers, proxies=proxies, verify=False)
return r.json(), r.status_code
# 获取配置文件
def get_config():
"""获取twitter.config.json文件信息"""
config_path = os.path.split(os.path.realpath(__file__))[0] + os.sep + "twitter.config.json"
if not os.path.isfile(config_path):
logger.warning(
"当前路径:%s 不存在配置文件twitter.config.json",
(os.path.split(os.path.realpath(__file__))[0] + os.sep),
)
sys.exit()
try:
with open(config_path, encoding="utf-8") as f:
config = json.loads(f.read())
return config
except ValueError:
logger.error(
"config.json 格式不正确!"
)
sys.exit()
def main():
try:
logger.info(str(datetime.now()))
config = get_config()
tw = TwitterCrawler(config)
tw.start() # 爬取信息
tw.stop() # 停止
sleep(1)
logger.info(str(datetime.now()))
logger.info("=======数据抓取完毕=======")
if const.NOTIFY["NOTIFY"]:
push_msg("更新了一次微博")
except Exception as e:
if const.NOTIFY["NOTIFY"]:
push_msg("weibo-crawler运行出错,错误为{}".format(e))
logger.exception(e)
if __name__ == "__main__":
main()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。