加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
main.py 2.94 KB
一键复制 编辑 原始数据 按行查看 历史
biccabo 提交于 2023-01-05 21:39 . 新增推送方式选择
# !/usr/bin/env python
# -*- coding:utf-8 -*-
# @Time : 2023/1/5 21:04
# @Author : Biccabo
import json
import os
import re
from bunch import Bunch
import requests
class config:
def __init__(self,json_file):
self.json_file = json_file
def get_config_from_json(self):
with open(self.json_file, 'r') as f:
config_dict = json.load(f)
config = Bunch(config_dict)
return config
def __re_find(response):
find = re.findall(r"_ntes_quote_callback\((.*)\);", response)
if len(find) >= 1:
return find
return None
def send2Lark(url,title, text):
message = {"title": title,"text": text}
message_json = json.dumps(message)
headers = {'Content-Type': 'text/plain'}
response = requests.post(url, headers=headers, data=message_json)
return json.loads(response.text)
def send2WeChat(url,text,content):
data = {"text":text,"desp":content}
response = requests.post(url,data=data)
return json.loads(response.text)
def send2DingDing(url,text):
header = {"Content-Type": "application/json","Charset": "UTF-8"}
message = {"msgtype": "text","text": {"content": text}}
message_json = json.dumps(message)
requests.post(url=url, data=message_json, headers=header)
def getMoney(code):
url = "http://api.money.126.net/data/feed/{},money.api".format(code)
response = requests.request("GET", url, headers=None, data=None)
re_find = __re_find(response.text)
return json.loads(re_find[0])
def getRealtimeData(data,code):
return data[code]
def convert_market(other_market_code):
"""
转换通用股票代码 sz sh bj开头+股票代码
"""
if other_market_code[0:2].lower() == 'sh':
return '0' + other_market_code[2:]
elif other_market_code[0:2].lower() == 'sz':
return '1' + other_market_code[2:]
else:
return '2' + other_market_code[2:]
def pushMode(mode,url,title,text):
if mode == "lark":
send2Lark(url,title,text)
if mode == "weChat":
send2WeChat(url,title,text)
if mode == "dingDing":
send2DingDing(url,text)
if __name__ == '__main__':
stocks = config(os.path.abspath('code.json')).get_config_from_json()
mode = stocks["pushMode"][0]["mode"]
url = stocks[mode][0]["webhook"]
codes = stocks["codes"]
for code in codes:
try:
codeNum = convert_market(code["code"])
response = getMoney(codeNum)[codeNum]
percent = round(response['percent'] * 100,2)
if percent > code["up"] or percent < code["down"]:
updown = response['updown']
name = response['name']
price = response['price']
res = "{}\n{}\nprice: ¥{} \nupdown: ¥{}\npercent: {}%".format(codeNum[1:],name,price,updown,percent)
pushMode(mode,url,"waring",res)
except Exception as e:
pushMode(mode, url, "stock Exception", "{} 数据获取异常:{}".format(code,e))
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化