加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
twitterMedia.py 8.48 KB
一键复制 编辑 原始数据 按行查看 历史
sleking_lxl 提交于 2022-10-13 16:44 . 多线程优化
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import codecs
import copy
import csv
import json
import logging
import logging.config
import math
import os
import random
import re
import sqlite3
import sys
import warnings
from collections import OrderedDict
from datetime import date, datetime, timedelta
from pathlib import Path
from time import sleep
import requests
from lxml import etree
from requests.adapters import HTTPAdapter
from tqdm import tqdm
import pymysql
import const
from util import csvutil
from util.dateutil import convert_to_days_ago
from util.notify import push_deer
from util.notify import push_msg
if not os.path.isdir("log/media"):
os.makedirs("log/media")
logging_path = os.path.split(os.path.realpath(__file__))[0] + os.sep + "logging.conf"
logging.config.fileConfig(logging_path)
logger = logging.getLogger("TwitterMeadia")
class TwitterMeadia(object):
def __init__(self, config):
self.config= config
self.headers = config.headers
self.media_exist_one_return = config.media_exist_one_return
self.conQueue = config.conQueue
self.con = self.conQueue.get()
# 通过cursor()创建一个游标对象
self.cur = self.con.cursor()
self.time = str(datetime.now())
#查询游标
self.cursor = ""
def fun_findMeadia(self, line):
try:
if self.fun_searchMadia(line):
self.cur.execute(
' UPDATE tw_my_follow SET modify_time=now() WHERE user_id = %s', line[1])
self.con.commit()
self.cur.close()
self.conQueue.put(self.con)
return 1
except Exception as e:
print(e)
logger.error("同步用户数据失败!")
logger.exception(e)
return 0
def fun_searchMadia(self, line):
try:
self.restIdSet = set()
self.cur.execute("""
select rest_id from tw_user_media where user_id =%s group by rest_id
""" % line[1])
res = self.cur.fetchall()
for restIdLine in res:
self.restIdSet.add(str(restIdLine[0]))
js, _ = self.fun_postMedia(line)
try:
js["data"]["user"]["result"]["timeline_v2"]["timeline"]["instructions"]
except Exception as er:
try:
UserUnavailable=js["data"]["user"]["result"]["__typename"]
if UserUnavailable=='UserUnavailable':
# 需要关注
self.cur.execute(
' UPDATE tw_my_follow SET need_follow=2 WHERE user_id = %s', line[1])
self.con.commit()
#self.follow_user_post_create(line)
return
except Exception as e:
print(e)
logger.error("捕获没关注失败!")
logger.exception(e)
logger.error("instructions entries 节点不存在")
logger.exception(er)
return
instructions = js["data"]["user"]["result"]["timeline_v2"]["timeline"]["instructions"]
for itme in instructions:
if "entries" in itme:
self.fun_saveMedia(line, js, itme)
return 1
except Exception as e:
print(e)
logger.error("数据解析失败!")
logger.exception(e)
return 0
def fun_saveMedia(self, line,js,instructions):
try:
user_id = line[1]
media_list = instructions["entries"]
for itme in media_list:
try:
itme["content"]["itemContent"]["tweet_results"]["result"]["rest_id"]
except Exception as e:
logger.error("rest_id 不存在在节点")
continue
rest_id = itme["content"]["itemContent"]["tweet_results"]["result"]["rest_id"]
# 存在 跳过
# if rest_id in self.restIdSet and line[2] == 1:
# continue
if rest_id in self.restIdSet and self.media_exist_one_return:
return
if rest_id in self.restIdSet and not self.media_exist_one_return:
continue
# 实体信息
try:
extended_entities = itme["content"]["itemContent"]["tweet_results"]["result"]["legacy"]["extended_entities"]
except Exception:
print("extended_entities 不存在!")
continue
logger.info("获取media rest_id:"+rest_id)
print("=====rest_id:"+rest_id+"=====")
print("=====user_id:"+user_id+"=====")
# full_text
full_text = itme["content"]["itemContent"]["tweet_results"]["result"]["legacy"]["full_text"]
for media in extended_entities["media"]:
# 用户发布信息存入数据库
try:
date_url_https = media["media_url_https"]
type = media["type"]
if type=="video":
date_url_https = str(media["video_info"])
self.cur.execute('SELECT * FROM tw_user_media_v where rest_id = %s' % rest_id)
rowcount = self.cur.rowcount
if (rowcount > 0) and self.media_exist_one_return:
continue
self.cur.execute(
' INSERT INTO tw_user_media_v (user_id,rest_id,full_text,media_url_https,type) VALUES (%s,%s,%s,%s,%s)',
(user_id, rest_id, full_text, date_url_https, type))
self.con.commit()
else:
self.cur.execute(
' INSERT INTO tw_user_media (user_id,rest_id,full_text,media_url_https,type) VALUES (%s,%s,%s,%s,%s)',
(user_id, rest_id, full_text, date_url_https, type))
self.con.commit()
except Exception as e:
print(e)
logger.error(e)
logger.error("插入数据失败!")
sleep(random.randint(3, 5))
# 进行下一页同步
jsString = str(js)
value_list = re.findall(r"'value': '(.*?)'", jsString)
cursorType_list = re.findall(r"'cursorType': '(.*?)'", jsString)
if len(cursorType_list) > 1 and len(media_list) > 2:
self.cursor = value_list[len(value_list)-1]
self.fun_searchMadia(line)
except Exception as e:
print(e)
logger.error("数据解析失败!")
logger.exception(e)
def fun_postMedia(self, line):
try:
proxies = {'http': 'http://127.0.0.1:10887', 'https': 'http://127.0.0.1:10887'}
url = 'https://twitter.com/i/api/graphql/_vFDgkWOKL_U64Y2VmnvJw/UserMedia?variables={"userId":"' + line[1] + '",'
if len(self.cursor) > 3:
url = url + '"cursor":"' + self.cursor + '",'
url = url + '"count":20,"includePromotedContent":false,"withSuperFollowsUserFields":true,"withDownvotePerspective":false,"withReactionsMetadata":false,"withReactionsPerspective":false,"withSuperFollowsTweetFields":true,"withClientEventToken":false,"withBirdwatchNotes":false,"withVoice":true,"withV2Timeline":true}&features={"responsive_web_graphql_timeline_navigation_enabled":false,"unified_cards_ad_metadata_container_dynamic_card_content_query_enabled":false,"dont_mention_me_view_api_enabled":true,"responsive_web_uc_gql_enabled":true,"vibe_api_enabled":true,"responsive_web_edit_tweet_api_enabled":true,"graphql_is_translatable_rweb_tweet_is_translatable_enabled":false,"standardized_nudges_misinfo":true,"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled":false,"interactive_text_enabled":true,"responsive_web_text_conversations_enabled":false,"responsive_web_enhance_cards_enabled":true}'
r = requests.get(url, params="", headers=self.headers, proxies=proxies, verify=False)
self.cursor = ''
return r.json(), r.status_code
except Exception as e:
print(e)
logger.error("请求user_media 数据失败!")
logger.exception(e)
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化