代码拉取完成,页面将自动刷新
同步操作将从 笑笑生/同花顺5.0自动化下单海通版 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
# -*- coding: UTF-8 -*-
from pywinauto import application
import pywinauto
import time
from pywinauto import clipboard
import win32con
from collections import namedtuple
from pytesseract import image_to_string
from PIL import Image
from PIL import ImageGrab
SymbolPosition = namedtuple('SymbolPosition', """
symbol total available pnl avg_cost pnl_ratio
""".split())
TradeInfo = namedtuple('TradeInfo', """
trade_time symbol bs qty price turnover order_id trade_id
""")
OrderInfo =namedtuple('OrderInfo', """
order_time symbol bs ststus order_qty order_price traded_price traded_qty order_id
""")
class AutoThs():
def __init__(self,
thspath=''):
self.path = thspath
self.app = None
self.trade_win = None
self.info_win = None
self.get_order_id_count = 0
def _try2attech_main_trade_win(self):
self.trade_win = self.app.window(title_re=u'网上股票交易系统5.0')
self.trade_win.wait(wait_for="visible", timeout=5)
self.info_win = self.trade_win.window(title_re='Custom2', class_name='CVirtualGridCtrl')
self.close_popup_windows()
self.trade_win.type_keys('{F2}')
def _check_main_trade_win_visible(self):
try:
self.trade_win = self.app.window(title_re=u'网上股票交易系统5.0')
self.trade_win.wait(wait_for='visible', timeout=0.1)
return True
except:
return False
def _check_login_win_visible(self):
try:
win = self.app.window(title_re=u'用户登录', class_name='#32770')
win.wait(wait_for='visible', timeout=20)
return True
except:
return False
def attach(self,
account='',
trade_pw='',
communication_pw='',
name=''):
self.trade_pw = trade_pw
self.account = account
self.communication_pw = communication_pw
if not self.app:
try:
self.app = application.Application().connect(path=self.path)
except:
self.app = application.Application().start(self.path)
if self._check_main_trade_win_visible():
if name and name != self.get_account_name():
top_bar = self.trade_win.window(class_name='ToolbarWindow32')
top_bar_rect = top_bar.rectangle()
top_bar2_rect = top_bar.window(class_name=u'#32770').rectangle()
relogin_bar_width = top_bar2_rect.left - top_bar_rect.left
relogin_bar_height = top_bar2_rect.bottom - top_bar_rect.top
relogin_x_offest = int((relogin_bar_width / 7) * 1.5)
relogin_y_offest = int(relogin_bar_height)
coords = (relogin_x_offest, relogin_y_offest)
self.trade_win.click_input(button='left',
coords=coords)
self.attach(self.account, self.trade_pw, self.communication_pw, name)
return
else:
self._try2attech_main_trade_win()
elif self._check_login_win_visible():
self._do_login()
try:
self._try2attech_main_trade_win()
except:
self.attach(self.account, self.trade_pw, self.communication_pw, name)
return
else :
self.close()
self.attach(self.account, self.trade_pw, self.communication_pw, name)
return
self.trade_win_rect = self.trade_win.rectangle()
self.info_win_rect = self.info_win.rectangle()
self.info_win_coords = (100, 100)
def _do_login(self):
win = self.app.window(title_re=u'用户登录', class_name='#32770')
popup_hwnd = win.popup_window()
if popup_hwnd:
popup_window = self.app.window(handle=popup_hwnd)
popup_window.set_focus()
popup_window.Button.click()
#win.print_control_identifiers(filename = './fz.txt')
account_edit = win[u'帐\u3000\u3000号(&A):Edit']
trade_pw_edit = win[u'交易密码(&G):Edit']
communication_pw_dit = win[u'通讯密码(&K):Edit']
account_edit.set_edit_text(self.account)
trade_pw_edit.set_edit_text(self.trade_pw)
communication_pw_dit.set_edit_text(self.communication_pw)
win[u'连最快站点'].click()
def _set_order(self, symbol='', price='', qty=0):
try:
symbol_edit = self.trade_win[u'Edit1']
price_edit = self.trade_win[u'Edit2']
qyt_edit = self.trade_win[u'Edit3']
symbol_edit.wait(wait_for='active', timeout=0.2)
symbol_edit.set_edit_text(symbol)
if (price):
price_edit.wait(wait_for='active', timeout=0.2)
price_edit.set_edit_text(price)
qyt_edit.wait(wait_for='active', timeout=0.2)
qyt_edit.set_edit_text(qty)
except Exception as e:
print('_set_order', e)
self.trade_win.type_keys('{F5}')
self._set_order(symbol=symbol, price=price, qty=qty)
return
def _close_popup_window(self):
popup_hwnd = self.trade_win.popup_window()
if popup_hwnd:
popup_window = self.app.window(handle=popup_hwnd)
try:
popup_window.wait(wait_for='visible', timeout=0.2)
popup_window.set_focus()
popup_window.Button.click()
except:
pass
return True
return False
def close_popup_windows(self):
while self._close_popup_window():
time.sleep(0.2)
pass
def _refresh_window(self):
self.trade_win.type_keys('{F5}')
time.sleep(0.2)
def _copy_authentication(self):
popup_hwnd = self.trade_win.popup_window()
while popup_hwnd:
popup_window = self.app.window(handle=popup_hwnd)
popup_window.set_focus()
rect = popup_window['Static2'].rectangle()
img = ImageGrab.grab((rect.left, rect.top, rect.right, rect.bottom))
popup_window['Edit'].set_edit_text(image_to_string(img))
time.sleep(0.1)
popup_window.type_keys('{ENTER}')
popup_hwnd = self.trade_win.popup_window()
def _select_window(self, win=''):
self.trade_win.type_keys('{F1}')
if win == 'W':
self.info_win.post_message(win32con.WM_KEYDOWN, ord('W'))
self.info_win.post_message(win32con.WM_KEYUP, ord('W'))
elif win == 'E':
self.info_win.post_message(win32con.WM_KEYDOWN, ord('E'))
self.info_win.post_message(win32con.WM_KEYUP, ord('E'))
elif win == 'R':
self.info_win.post_message(win32con.WM_KEYDOWN, ord('R'))
self.info_win.post_message(win32con.WM_KEYUP, ord('R'))
else:
assert 0
time.sleep(0.1)
def _get_order_id(self):
popup_hwnd = self.trade_win.popup_window()
if not popup_hwnd:
time.sleep(0.05)
order_id=''
try :
popup_window = self.app.window(handle=popup_hwnd)
htbx = popup_window.child_window(title_re=u'.*合同编号.*', class_name='Static')
htbx.wait(wait_for='visible', timeout=0.5)
import re
order_id = re.findall(r'\d+', htbx.element_info.rich_text)
order_id = order_id[0]
except pywinauto.findwindows.ElementNotFoundError:
order_id = ''
except pywinauto.timings.TimeoutError:
order_id = ''
except Exception as e:
print('_get_order_id', e)
order_id = ''
self._refresh_window()
self.get_order_id_count += 1
if self.get_order_id_count < 3:
return self._get_order_id()
else :
return ''
self.close_popup_windows()
self.get_order_id_count = 0
return order_id
def buy(self, symbol='', price='', qty=0):
assert qty > 0
time.sleep(0.01)
self.trade_win.type_keys('{F1}')
self._set_order(symbol, price, qty)
time.sleep(0.2)
self.trade_win[u'买入[B]'].click()
return self._get_order_id()
def sell(self, symbol='', price='', qty=0):
assert qty > 0
time.sleep(0.01)
self.trade_win.type_keys('{F2}')
self._set_order(symbol, price, qty)
time.sleep(0.2)
self.trade_win[u'卖出[S]'].click()
return self._get_order_id()
def get_account_name(self):
temp = self.trade_win.window(best_match=u'StatusBar')
return str(temp.texts()[3])
def get_available_fund(self):
fund = self.trade_win.window(best_match=u'StatusBar')
return float(fund.texts()[5])
def get_position(self):
self._select_window('W')
self.info_win.click_input(button='left', coords=self.info_win_coords)
self.close_popup_windows()
self.info_win.type_keys('^c')
self._copy_authentication()
try:
data = clipboard.GetData()
except :
return self.get_position()
if not data :
return self.get_position()
data = data.split('\r\n')[1:]
poss = []
for i in data:
pos_item = i.split('\t')
if int(pos_item[2]) == 0 or not pos_item[0].isdigit() or pos_item[0] == '': continue
poss.append(SymbolPosition(symbol=pos_item[0],
total=int(pos_item[2]),
available=int(pos_item[3]),
pnl=float(pos_item[5]),
avg_cost=float(pos_item[6]),
pnl_ratio=float(pos_item[7])))
return poss
def get_trade(self):
self._select_window('E')
self.info_win.click_input(button='left', coords=self.info_win_coords)
self.info_win.type_keys('^c')
self._copy_authentication()
try:
data = clipboard.GetData()
except :
return self.get_trade()
if not data :
return self.get_trade()
data = data.split('\r\n')[1:]
trades = []
for i in data:
trade_item = i.split('\t')
if trade_item[0] == '': continue
trades.append(TradeInfo(trade_time=trade_item[0],
symbol=trade_item[1],
bs= 'B' if trade_item[3] == u'买' else 'S',
qty=int(trade_item[4]),
price=float(trade_item[5]),
turnover=float(trade_item[6]),
order_id=int(trade_item[7]),
trade_id=int(trade_item[8])))
return trades
def get_alive_order(self):
self._select_window('R')
self.info_win.click_input(button='left', coords=self.info_win_coords)
self.info_win.type_keys('^c')
self._copy_authentication()
try:
data = clipboard.GetData()
except :
return self.get_alive_order()
if not data :
return self.get_alive_order()
data = data.split('\r\n')[1:]
orders = []
for i in data:
order_item = i.split('\t')
if order_item[0] == '': continue
orders.append(OrderInfo(order_time=order_item[8],
symbol=order_item[0],
bs= 'B' if order_item[7] == u'买' else 'S',
ststus=order_item[2],
order_qty=int(order_item[3]),
order_price=float(order_item[5]),
traded_qty=int(order_item[4]),
traded_price=float(order_item[6]),
order_id=order_item[10]))
return orders
def cancel_order(self, order_id=""):
self.alive_orders = self.get_alive_order()
order_idx = 1
for order in self.alive_orders:
if order.order_id == order_id:
self.info_win.double_click_input(coords=(100, order_idx * 17 + 7))
time.sleep(0.05)
self.close_popup_windows()
self.close_popup_windows()
order_idx += 1
def cancel_all(self):
self.trade_win[u'全撤(Z /)Button'].click()
self.close_popup_windows()
def close(self):
self.app.kill()
if __name__ == '__main__':
aths = AutoThs(thspath=u'D:/海通融资融券5.0/xiadan.exe')
aths.attach('123456', '123456', '123456', '')
#print(aths.get_position())
order_id = aths.buy('510050',2.912, 100)
print(order_id)
aths.cancel_order(order_id)
#print(aths.get_alive_order())
#print(aths.get_trade())
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。