加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
worker.py 11.78 KB
一键复制 编辑 原始数据 按行查看 历史
liweimin 提交于 2021-08-09 21:20 . 开放
# -*- coding: utf-8 -*-
import base64
from threading import Thread
import time
import random
import zlib
import requests
import logging
from io import BytesIO
import http.cookiejar as cookielib
from PIL import Image
from uuid import uuid4
import os, sys
requests.packages.urllib3.disable_warnings()
logger = logging.getLogger(__name__)
agent = [
'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2227.1 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:84.0) Gecko/20100101 Firefox/84.0'
]
headers = {
'User-Agent' : random.choice(agent),
'Referer' : 'https://creator.douyin.com/',
'Origin' : 'https://creator.douyin.com'
}
def initloger():
if not os.path.exists('./logs'):
os.mkdir('./logs')
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler('./logs/{}.log'.format(time.strftime('%Y%m%d', time.localtime(time.time()))), encoding='utf-8',mode='a')
formatter = logging.Formatter("%(asctime)s - %(message)s", datefmt='%Y-%m-%d %H:%M:%S')
fh.setFormatter(formatter)
logger.addHandler(fh)
sh = logging.StreamHandler()
sh.setFormatter(formatter)
logger.addHandler(sh)
initloger()
def generate_random_str(randomlength=11):
# 生成一个指定长度的随机字符串
random_str = ''
base_str = 'abcdefghijklmnopqrstuvwxyz0123456789'
length = len(base_str) - 1
for i in range(randomlength):
random_str += base_str[random.randint(0, length)]
return random_str
class showpng(Thread):
def __init__(self, data):
Thread.__init__(self)
self.data = data
def run(self):
img = Image.open(BytesIO(self.data))
img.show()
def islogin(userid):
if not os.path.exists('./cache'):
os.mkdir('./cache')
cookiefile = './cache/%s.txt' % userid
if not os.path.exists(cookiefile):
with open(cookiefile, 'w') as f:
f.write('')
session = requests.session()
session.cookies = cookielib.LWPCookieJar(filename=cookiefile)
try:
session.cookies.load(ignore_discard=True)
except Exception:
pass
res = session.get('https://sso.douyin.com/check_login/', headers=headers).json()
if res['has_login']:
#logger.info('Cookies值有效,无需扫码登录!')
return 1, session
else:
logger.info('Cookies值失效,请重新扫码登录!')
return 0, session
def login(userid):
status, session = islogin(userid)
if status == 0:
session = requests.session()
cookiefile = './cache/%s.txt' % userid
session.cookies = cookielib.LWPCookieJar(filename=cookiefile)
urldata = session.get('https://sso.douyin.com/get_qrcode/', headers=headers).json()
testpng = base64.b64decode(urldata['data']['qrcode'])
# <img src="data:image/png;base64,{testpng}">
token = urldata['data']['token']
t = showpng(testpng)
t.start()
while True:
tokendata = session.get('https://sso.douyin.com/check_qrconnect/?token=%s' % token, headers=headers).json()
if 5 == int(tokendata['data']['status']):
logger.info('登录二维码已过期,请重新打开')
if 3 == int(tokendata['data']['status']):
session.get(tokendata['data']['redirect_url'], headers=headers)
session.get('https://creator.douyin.com/', headers=headers)
session.get('https://creator.douyin.com/web/api/media/user/info/', headers=headers)
logger.info('已确认,登录成功')
break
time.sleep(3)
session.cookies.save()
return session
def getaccount(session):
req = session.get('https://creator.douyin.com/aweme/v1/creator/user/info/', headers=headers)
res = req.json()
if res['status_code'] == 0:
userId = res['douyin_user_verify_info']['douyin_unique_id']
userName = res['douyin_user_verify_info']['nick_name']
userAvatar = res['douyin_user_verify_info']['avatar_url']
return userId, userName, userAvatar
return False
def getcookies(session):
cookies = {}
for ck in session.cookies:
cookies[ck.name] = ck.value
return cookies
def delvideo(session, item_id):
data = {
'item_id': (None, str(item_id))
}
res = session.post('https://creator.douyin.com/web/api/media/aweme/delete/', files=data, headers=headers).json()
if res['status_code'] == 0:
return True
def workslist(userid):
status, session = islogin(userid)
if status == 0:
return False
res = session.get('https://creator.douyin.com/web/api/media/aweme/post/?scene=star_atlas&status=0&count=12&max_cursor=0', headers=headers).json()
if res['status_code'] == 0:
#if res['has_more']:
totalCount = res['total']
pageCount = int(int(totalCount + 50 - 1) / 50)
for work in res['aweme_list']:
workId , title, cover, uploadTime, playCount, likeCount, commentCount = work['aweme_id'], work['desc'], work['create_time'], '', work['statistics']['play_count'], work['statistics']['digg_count'], work['statistics']['comment_count']
def uploadtoken(session, fsize):
# 1获取上传ak auth
cookies = getcookies(session)
headers['x-csrf-token'] = cookies['csrf_token']
res = session.get('https://creator.douyin.com/web/api/media/upload/auth/', headers=headers).json()
if res['status_code'] == 0:
akey = res['ak']
auth = res['auth']
else:
logger.info('获取上传token错误')
# 2申请上传
headersx = {
'Authorization': auth,
'X-TT-Access': akey,
'Content-Type': 'application/json'
}
rdmstr = generate_random_str()
res = session.get('https://vas-lf-x.snssdk.com/video/openapi/v1/?action=GetVideoUploadParams&s=%s&use_edge_node=1&file_size=%s' % (rdmstr, fsize), headers=headersx).json()
if res['message'] == 'ok':
vid = res['data']['vid']
oid = res['data']['oid']
tos_host = res['data']['tos_host']
tos_sign = res['data']['tos_sign']
token = res['data']['token']
upload_id = res['data']['upload_id']
encryption_key = res['data']['encryption_key']
#tmp = session.post('https://%s/%s?uploads' % (tos_host, oid), headers=headers).json()
#upload_id = tmp['payload']['uploadID']
return vid, oid, tos_host, tos_sign, token, upload_id, akey, auth
else:
logger.info('申请上传错误 - %s' % res['message'])
return False
def upload(session, flen, chunk, i, tos_sign, upload_id, upload_url):
prev = zlib.crc32(chunk)
crc32 = "%X" % (prev & 0xffffffff)
headersx = {
'Authorization': tos_sign,
'Content-CRC32': crc32.lower(),
'Content-Disposition': 'attachment; filename="undefined"',
'Content-Length': str(len(chunk)),
'Content-Type': 'application/octet-stream'
}
put_url = upload_url if flen <= 5242880 else '%s?partNumber=%s&uploadID=%s' % (upload_url, i, upload_id)
res = session.post(put_url, data=chunk, headers=headersx).json()
if res['success'] == 0:
pass
else:
logger.info('分块上传失败 - %s' % i)
def upload_check(session, tos_sign, upload_id, upload_url, vid, oid, token, akey, auth):
data = {
'vid': vid,
'oid': oid,
'token': token,
'poster_ss': 0,
'is_exact_poster': True,
'user_reference':''
}
headersx = {
'Authorization': auth,
'X-TT-Access': akey,
'Content-Type': 'application/json'
}
res = session.post('https://vas-lf-x.snssdk.com/video/openapi/v1/?action=UpdateVideoUploadInfos', json=data, headers=headersx).json()
if res['message'] == 'ok':
return res['data']['video']['width'], res['data']['video']['height'], res['data']['poster']['oid']
else:
logger.info('校验上传出错 - %s' % res['message'])
return False
def submit(session, vid, poster_oid, caption = '', longitude = '', latitude = ''):
data = {
'video_id': (None, vid),
'text': (None, caption),
'record': (None, 'null'),
'source_info': (None, ''),
'text_extra': (None, '[]'),
'challenges': (None, '[]'),
'mentions': (None, '[]'),
'hashtag_source': (None, ''),
'visibility_type': (None, '1'),
'download': (None, '0'),
'upload_source': (None, '1'),
'mix_id': (None, ''),
'mix_order': (None, ''),
'is_preview': (None, '0'),
'hot_sentence': (None, ''),
'cover_text_uri': (None, ''),
'cover_text': (None, ''),
'poster': (None, poster_oid),
'poster_delay': (None, '0'),
'poi_id': (None, '6601244105802516494'),
'poi_name': (None, '华润大厦'),
'music_source': (None, '0'),
'music_id': (None, '')
}
res = session.post('https://creator.douyin.com/web/api/media/aweme/create/', files=data, headers=headers).json()
if res['status_code'] == 0:
return 1
else:
logger.info('发布出错 - %s' % res['status_msg'])
return 0
def upvideo(session, filePath, title='', longitude='', latitude=''):
with open(filePath, 'rb') as f:
f.seek(0, 2)
flen = f.tell()
# 获取上传凭证
vid, oid, tos_host, tos_sign, token, upload_id, akey, auth = uploadtoken(session, flen)
# 回文件头
f.seek(0, 0)
i = 1
while True:
chunk = f.read(5242880)
if not chunk:
break
# 按5M大小分块上传视频
upload_url = 'https://%s/%s' % (tos_host, oid)
upload(session, flen, chunk, i, tos_sign, upload_id, upload_url)
i += 1
try:
# 校验上传
width, height, poster_oid = upload_check(session, tos_sign, upload_id, upload_url, vid, oid, token, akey, auth)
# 发布
result = submit(session, vid, poster_oid, title, longitude, latitude)
except Exception as e:
result = 0
logger.info('Error#' + str(e))
return result
def publish(session, filePath, title, longitude='', latitude='', unlink = 0):
logger.info('开始上传 - %s < %s' % (title, filePath))
result = upvideo(session, filePath, title, longitude, latitude)
if result == 1:
if unlink == 1:
os.remove(filePath)
logger.info('上传完成 - %s %s' % (title, ('> 缓存文件已删除' if unlink == 1 else '')))
return result
def worker(curpath, unlink = False):
userid = curpath.split('\\')[-1]
session = login(userid)
filenum = 0
fileset = set()
for root, dirs, files in os.walk(curpath):
for f in files:
if f.endswith('.mp4'):
fileset.add(os.path.join(root, f))
filenum += 1
fileset = list(fileset)
logger.info('%s共%s个视频' % (curpath, str(filenum)))
for i in range(0, filenum):
time.sleep(3)
fname = fileset[i]
logger.info('正在上传%s_%s_%s' % (curpath, str(i+1), fname))
#发布内容
title = os.path.basename(fname)[:-4]
result = upvideo(session, fname, title)
if result is True and unlink is True:
os.remove(fname)
def traceexcept(type, value, trace):
log = 'Trace#' + str(type) + '::' + str(value)
while trace:
log += os.linesep + ' File ' + str(trace.tb_frame.f_code.co_filename) + ', Line ' + str(trace.tb_lineno)
trace = trace.tb_next
logger.error(log)
if __name__ == '__main__':
sys.excepthook = traceexcept
# 视频文件夹
curpath = sys.argv[1] if len(sys.argv)>1 else './'
# 上传完成后是否删除
unlink = True if len(sys.argv)>2 else False
worker(curpath, unlink)
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化