加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
print.py 16.71 KB
一键复制 编辑 原始数据 按行查看 历史
郭小帅 提交于 2021-01-19 01:09 . 重复问题修复
# -*- encoding=utf8 -*-
# -*- encoding=utf8 -*-
__author__ = "Administrator"
import traceback
from airtest.core.api import *
from poco.drivers.android.uiautomation import AndroidUiautomationPoco
import pymysql
import time
import random
import sys, io
from airtest.core.android.adb import ADB
import logging
from airtest.core.settings import Settings as ST
from airtest.core.helper import set_logdir
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
auto_setup(__file__)
logger = logging.getLogger("airtest")
logger.setLevel(logging.ERROR)
# i = datetime.datetime.now()
# ST.LOG_FILE = str(i.year)+str(i.month) + str(i.day) + ".txt"
# set_logdir(r'./logs')
# 打开数据库连接
# db = pymysql.connect("127.0.0.1", "root", "123456", "xianyu", charset='utf8mb4')
db = pymysql.Connect(
host='guodashuai.51vip.biz',
port=34637,
user='root',
passwd='123456',
db='xianyu',
charset='utf8mb4'
)
# db = pymysql.connect("39.108.86.206", "root", "123456", "xianyu", charset='utf8')
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# 定义区配置定义区配置定义区配置##########################################################################################
######################功能定义区开始
# 滑动次数
swipetimes = 10
# 翻页起始点
starpoint = (200, 1900)
endpoint = (200, 570)
# 循环起点
var1 = 1
# 循环终点
loopend = 100000000000000
# 小间隔随机区秒数间下
lowsleepseconds = 15
# 小间隔随机区间秒数上
higesleepseconds = 30
# 大间隔随机区秒数间下
biglowsleepseconds = 160
# 大间隔随机区间秒数上
bighigesleepseconds = 300
# 商品配置数据库表
sys_goods_configform = 'sys_goods_primeters_config'
# 真实宝贝数量
realgoodsaccout = 90
starpointtianshu = (200, 1600)
endpointtianshu = (200, 300)
######################功能定义区完
######################产品定义区开始
# 关键字
keywords = "apple pencil"
# 表单前缀
form_keywords = "pencil"
mykeylist = []
# 真实商品最高价
hightprice = 420
# 真实商品最低价
lowprice = 250
# 此商品真实最低价小于此价格将被标记为虚假商品
lowrealgoodsprice = 100
######################产品定义区完
# 定义区配置定义区配置定义区配置完###################################################################################
def init_goodsconfig():
print("加载商品配置参数")
# 数据库查询获取配置
# SQL 查询语句
sql = "select keywords, form_keywords, lowprice, lowrealgoodsprice, hightprice, is_delete from " + sys_goods_configform + " where is_delete !=1 "
try:
# 执行SQL语句
cursor.execute(sql)
# 获取所有记录列表
results = cursor.fetchall()
for row in results:
global keywords
keywords = row[0]
global form_keywords
form_keywords = row[1]
global lowprice
lowprice = row[2]
global lowrealgoodsprice
lowrealgoodsprice = row[3]
global hightprice
hightprice = row[4]
global mykeylist
mykeylist = findallkeywords()
print(mykeylist)
openxianyu()
# 打印结果
print("init_goodsconfig ok" + keywords)
sleep(random.randint(lowsleepseconds, higesleepseconds))
except:
print("Error: unable to fecth data")
traceback.print_exc()
# 根据pah获取元素
# filter_panel = poco(name="android.widget.FrameLayout")
# f_letter = get_ui_obj(filter_panel, [0,0,0,0,3,0,2])
# print( f_letter.get_text())
def get_ui_obj(parent, path):
try:
chd = parent.child()
# print("chd,", chd.get_text(),chd.get_name(),chd.get_position())
if len(path) > 1:
# print( "path[0]",path[0],path[1:])
return get_ui_obj(chd[path[0]], path[1:])
else:
return chd[path[0]]
except:
print("get_ui_obj error")
# traceback.print_exc()
return 0
# 数据库插入操作方法
def dosql(sql):
try:
# 执行sql语句
cursor.execute(sql)
# 提交到数据库执行
db.commit()
print("sql ok")
except:
# Rollback in case there is any error
db.rollback()
print("sql error")
print(sql)
# 查询数据
def findgoods(title):
sql = "SELECT id,detail FROM " + form_keywords + "_goods WHERE titile = '%s' and create_time>= now()-interval 1 day"
data = (title,)
cursor.execute(sql % data)
# for row in cursor.fetchall():
# print("Name:%s\tSaving:%s" % row)
# print('共查找出', cursor.rowcount, '条数据')
return cursor.rowcount
# 查找关键词list
def findallkeywords():
form = form_keywords + "_keyword"
cursor.execute('select keyword from ' + form)
rs = cursor.fetchall()
result_list = []
for record in rs:
result_list.append(record[0])
return result_list
# 判断是否是数字
def isInt(num):
try:
num = int(str(num))
return isinstance(num, int)
except:
return False
# 查找卖出宝贝数
def find_goods_counts():
maijia_jingyan = poco(name="android.view.View", type="android.view.View", textMatches="[\s\S]*闲鱼[\s\S]*").get_text()
# print ("maijia_jingyan"+maijia_jingyan)
goodsnumstart = maijia_jingyan.index("卖出过")
goodsnumend = maijia_jingyan.index("件宝贝")
goods_counts = maijia_jingyan[goodsnumstart + 3:goodsnumend]
return goods_counts
# 查找天数
def find_tianshu():
maijia_jingyan = poco(name="android.view.View", type="android.view.View", textMatches="[\s\S]*闲鱼[\s\S]*").get_text()
# print ("maijia_jingyan"+maijia_jingyan)
goodsnumstart = maijia_jingyan.index("来闲鱼")
goodsnumend = maijia_jingyan.index("天了,")
tianshu = maijia_jingyan[goodsnumstart + 3:goodsnumend]
return tianshu
# 数据库插入操作方法
def insertgoods(title, tags):
now = time.strftime("%Y-%m-%d %H:%M:%S")
# SQL 插入语句
# sql = "INSERT INTO `goods_ipad` (`titile`, `detail`, `price`, `province`, `city`,result_tag) VALUES (",title,"', '",title,"', 20, '广东', '广州','",tags,"','",now,"') "
# 根据title拆分数据 title price province wantpeople
listparmters = title.split('\n')
# for value in listparmters: # 循环输出列表值
# print ("listparmters",value)sleep(1.0)
# 价格
price = listparmters[2]
# 标题
titlestring = listparmters[0]
# 省份
province = listparmters[-1]
# 想要人数
wantpeople = listparmters[-2].replace('人想要', '')
# 价格过低
if (not isInt(price) or float(price) < lowrealgoodsprice):
tags = "虚假商品"
if (not isInt(wantpeople)):
wantpeople = 0
sql = "INSERT INTO " + form_keywords + "_goods (`titile`, `price`, `province`,result_tag,create_time,wantpeople) VALUES ('" + titlestring + "' ," + price + ", '" + province + "','" + tags + "','" + now + "'," + str(
wantpeople) + ") "
# print(sql)
dosql(sql)
# 是否包含数组关键词方法
def findifcontain(stringcontent):
for i in mykeylist:
# print("i: ", i)
if i in stringcontent:
return 1
return 0
# 获取淘口令
def gettaocode():
# 点击更多
poco(text="更多").click()
# 点击淘口令
poco(text="淘口令").click()
# 获取淘口令
taokouling = poco(name="com.taobao.idlefish:id/tvWarnDetail", type="android.widget.TextView").get_text();
# 点击返回
keyevent("BACK")
return taokouling
# 从搜索页面进入详情页面判断是否商家
def findifbad(title):
sleep(1.0)
listparmters = title.split('\n')
# 价格
price = listparmters[2]
# 数据库标题
titlestring = listparmters[0]
# 省份
province = listparmters[-1]
# 标价过低不进入详情页
if (not isInt(price) or float(price) < lowrealgoodsprice):
return
# 进入详情页
poco(text=title).click()
# 获取详细描述
filter_panel = poco(name="android.widget.FrameLayout")
detail = ''
try:
deataildom = get_ui_obj(filter_panel, [0, 0, 0, 0, 3, 0, 2])
if (not deataildom):
deataildom = get_ui_obj(filter_panel, [0, 0, 0, 0, 2, 0, 2])
# 细节描述
detail = deataildom.get_text()
except:
print("findifbad deataildom error")
traceback.print_exc()
if (detail is not None):
detail = pymysql.escape_string(detail)
# 滑动 循环滑动
maijia_jingyan = 0
while (not maijia_jingyan):
swipe(starpointtianshu, endpointtianshu)
maijia_jingyan = poco(name="android.view.View", type="android.view.View", textMatches="[\s\S]*闲鱼[\s\S]*")
tianshu = -1
goods_counts = -1
try:
tianshu = find_tianshu()
goods_counts = find_goods_counts()
except:
print("goods_counts fail")
# 判断细节描述 跟新虚假信息
if (findifcontain(detail) or int(goods_counts) > realgoodsaccout):
# 虚假信息,进行跟新
updatesql = "update " + form_keywords + "_goods set detail ='" + detail + "', result_tag='虚假商品' ,seller_days=" + tianshu + " ,seller_goods=" + goods_counts + " where titile= '" + titlestring + "' and province ='" + province + "' and create_time>= now()-interval 1 day "
dosql(updatesql)
else:
# 判断卖出宝贝数
# 补充相关信息
updatesql = "update " + form_keywords + "_goods set detail ='" + detail + "', result_tag='真实商品' where titile= '" + titlestring + "' and province ='" + province + "' and create_time>= now()-interval 1 day "
# hightprice 真实商品最高价 lowprice 真实商品最低价
if (isInt(price) and float(price) < hightprice and float(price) > lowprice):
taokouling = gettaocode()
# taokouling=pymysql.escape_string(taokouling)
# taokouling=taokouling.replace("'","\\\'")
# taokouling=taokouling.replace('"','\\\"')
updatesql = "update " + form_keywords + "_goods set detail ='" + detail + "', result_tag='真实商品' , goodstaokouling='" + taokouling + "' ,seller_days=" + tianshu + " ,seller_goods=" + goods_counts + " where titile= '" + titlestring + "' and province ='" + province + "' and create_time>= now()-interval 1 day "
# dosql(updatesql)
try:
# 执行sql语句
cursor.execute(updatesql)
# 提交到数据库执行
db.commit()
print("sql ok")
except:
# Rollback in case there is any error
print("sql error")
print(updatesql)
# print("detail"+detail)
# 返回搜索结果页
poco(text="返回").click()
# 获取卖家名称
# 获取商品详情
# 储存数据到 mysql
def openxianyu():
# 关闭闲鱼
stop_app("com.taobao.idlefish")
# 打开闲鱼
start_app("com.taobao.idlefish")
sleep(1.0)
sleep(6)
# 点击搜索栏
# poco(name="com.taobao.idlefish:id/search_bg_img_front").click()
poco(text="搜索").click()
sleep(0.5)
# 输入文字搜索
# poco(name="com.taobao.idlefish:id/search_term").set_text(keywords)
# poco(name="android.widget.EditText").set_text(keywords)
text(keywords)
sleep(1.0)
# 点击搜索按钮虚假商品
# poco(name="com.taobao.idlefish:id/search_button").click()
poco(text="搜索").click()
sleep(1.0)
# 点击综合
poco(text="已折叠, 综合").click()
# 点击最新发布
poco(text="最新发布").click()
# 获取标题 翻页
buttonwordslist = ["信用", "区域", "筛选", "细选"]
# 商品临时列表
tempgoodslist = []
swipetimes = 9
count = 0
while (count < swipetimes):
sleep(1.0)
for i in poco(name="android.widget.ScrollView").child(name="android.view.View"):
detail = i.get_text()
if (not (detail is None)):
ifindatabase = findgoods(detail.split('\n')[0])
# 排除搜索页
if (detail not in buttonwordslist and detail not in tempgoodslist and ifindatabase < 1):
# 添加到tempgoodslist中
tempgoodslist.append(detail)
# 进行关键词过滤tempgoodslist
if (findifcontain(detail)):
print("虚假商品")
# 插入数据库中
insertgoods(detail, "虚假商品")
else:
insertgoods(detail, "真实商品")
findifbad(detail)
print("真实商品")
count = count + 1
swipe(starpoint, endpoint)
print("本次--结束")
# 循环
# 关键词过滤
# 关闭闲鱼
stop_app("com.taobao.idlefish")
# while (var1 < loopend):
# biglowsleepsecondsfunction = biglowsleepseconds
# bighigesleepsecondsfunction = bighigesleepseconds
# try:
# adb = ADB()
# devicesList = adb.devices()
# devicesNum = len(devicesList) > 1
# print("本机N个设备,分别是", devicesList) # [('B2T0216822004895', 'device'), ('dce3b005', 'device')]
# # assert_equal(devicesNum,True,"设备连接数量至少为2")
# biglowsleepsecondsfunction = biglowsleepseconds - (len(devicesList) * 30)
# bighigesleepsecondsfunction = bighigesleepseconds - (len(devicesList) * 30)
# for i in range(len(devicesList)):
# print(i)
# # connect_device("android:///" + devicesList[i][0]+"?cap_method=JAVACAP^&^&ori_method=ADBORI")
# dev = connect_device("android:///" + devicesList[i][0] + "?cap_method=JAVACAP^&^&ori_method=ADBORI")
# poco = AndroidUiautomationPoco(device=dev, use_airtest_input=True, screenshot_each_action=False)
# init_goodsconfig();
# # mykeylist = findallkeywords()
# # #print(mykeylist)
# # openxianyu()
# # 休眠随机时间
# sleepsecondsbig = random.randint(biglowsleepsecondsfunction, biglowsleepsecondsfunction)
# print("大休眠")
# print(sleepsecondsbig)
# sleep(sleepsecondsbig)
#
# except:
# print("d")
# traceback.print_exc()
# sleep(random.randint(lowsleepseconds, higesleepseconds))
# # Rollback in case there is any error
# # sleep(random.randint(60, 100))
# sleepsecondssmall = random.randint(lowsleepseconds, higesleepseconds)
# print("小休眠")
# print(sleepsecondssmall)
# sleep(sleepsecondssmall)
# var1 = var1 + 1
def job():
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
biglowsleepsecondsfunction = biglowsleepseconds
bighigesleepsecondsfunction = bighigesleepseconds
try:
adb = ADB()
devicesList = adb.devices()
devicesNum = len(devicesList) > 1
print("本机N个设备,分别是", devicesList) # [('B2T0216822004895', 'device'), ('dce3b005', 'device')]
# assert_equal(devicesNum,True,"设备连接数量至少为2")
biglowsleepsecondsfunction = biglowsleepseconds - (len(devicesList) * 30)
bighigesleepsecondsfunction = bighigesleepseconds - (len(devicesList) * 30)
for i in range(len(devicesList)):
print(i)
# connect_device("android:///" + devicesList[i][0]+"?cap_method=JAVACAP^&^&ori_method=ADBORI")
dev = connect_device("android:///" + devicesList[i][0] + "?cap_method=JAVACAP^&^&ori_method=ADBORI")
poco = AndroidUiautomationPoco(device=dev, use_airtest_input=True, screenshot_each_action=False)
init_goodsconfig();
# mykeylist = findallkeywords()
# #print(mykeylist)
# openxianyu()
# 休眠随机时间
sleepsecondsbig = random.randint(biglowsleepsecondsfunction, biglowsleepsecondsfunction)
print("大休眠")
print(sleepsecondsbig)
sleep(sleepsecondsbig)
except:
print("d")
traceback.print_exc()
sleep(random.randint(lowsleepseconds, higesleepseconds))
# Rollback in case there is any error
# sleep(random.randint(60, 100))
sleepsecondssmall = random.randint(lowsleepseconds, higesleepseconds)
print("小休眠")
print(sleepsecondssmall)
sleep(sleepsecondssmall)
# 循环机器
# 跟新机器状态(可选)
# 查询机器的任务
# 执行任务 记录执行日志
# 定义BlockingSchedul
sched = BlockingScheduler()
# 定时任务
sched.add_job(job, 'interval', seconds=20)
sched.start()
# 关闭数据库连接
db.close()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化