代码拉取完成,页面将自动刷新
from datetime import datetime
import gym
from gym import spaces
import numpy as np
from define import *
import random
import pandas as pd
import helper
INITIAL_CAPITAL = 100000
SERVICE_CHARGE = 4.1
class DataCenter():
def __init__(self) -> None:
pass
def __del__(self):
pass
def get_training_info(self,trade_day):
csv_data = pd.read_csv("E:/data/"+trade_day,encoding="utf-8")
date_time = csv_data["业务日期"].astype(str)+" "+csv_data["最后修改时间"].astype(str)+"."+csv_data["最后修改毫秒"].astype(str)
current_date = date_time.apply(lambda i:datetime.strptime(i,'%Y%m%d %H:%M:%S.%f'))
res_data = {
"time":current_date,
"price":csv_data["最新价"].astype(float),
"buy_price":csv_data["申卖价一"].astype(float),
"buy_volume":csv_data["申卖量一"].astype(int),
"volume":csv_data["数量"].astype(int),
"sell_price":csv_data["申买价一"].astype(float),
"sell_volume":csv_data["申买量一"].astype(int),
"delta_hold":csv_data["持仓量"].astype(int)-csv_data["昨持仓量"].astype(int),
"standard":csv_data["上次结算价"].astype(float),
"trade_day":csv_data["交易日"].astype(int)
}
data_list = pd.DataFrame(res_data)
data_list.sort_values(by=["time"],inplace=True)
#print(data_list)
return data_list
class FutureHolder():
def __init__(self,multiple=10) -> None:
self.money = INITIAL_CAPITAL
self.long_order = 0
self.buy_price = 0
self.short_order = 0
self.sell_price = 0
self.multiple = multiple
self.befor_money = self.money
self.mortgage = 0
self.time = None
self.current_price = 0
self.open_time = None
def prev(self):
self.befor_money = self.get_fortune()
def step(self,last_info):
last_price = np.float32(last_info["price"])
self.time = last_info["time"]
self.current_price = last_price
def buy_open(self,price:np.float32,count:np.int32)->np.float64:
if self.money-self.mortgage < price * count:
return 0
self.mortgage += price * count
self.buy_price = (price * count + self.buy_price*self.long_order)/(count + self.long_order)
self.long_order += count
self.money -= SERVICE_CHARGE*count
self.open_time = self.time
#print(f"{str(self.time)} OPEN BUY : {price} {count}")
return -SERVICE_CHARGE*count
def sell_close(self,price:np.float32,count:np.int32)->np.float64:
if self.long_order < count:
return 0
self.mortgage -= self.buy_price * count
self.long_order -= count
delta_money = (price-self.buy_price)*count*self.multiple - SERVICE_CHARGE*count
self.money += delta_money
hold_seconds = (self.time- self.open_time).seconds
reward = delta_money
if hold_seconds > 0:
reward = delta_money/hold_seconds
self.open_time = None
#print(f"{str(self.time)} CLOSE BUY : {price} {count} {delta_money}")
return reward
def sell_open(self,price:np.float32,count:np.int32)->np.float64:
if self.money-self.mortgage < price * count:
return 0
self.mortgage += price * count
self.sell_price = (price * count + self.sell_price*self.short_order)/(count + self.short_order)
self.short_order += count
self.money -= SERVICE_CHARGE*count
self.open_time = self.time
#print(f"{str(self.time)} OPEN SELL : {price} {count}")
return -SERVICE_CHARGE*count
def buy_close(self,price:np.float32,count:np.int32)->np.float64:
if self.short_order < count:
return 0
self.mortgage -= self.sell_price * count
self.short_order -= count
delta_money = (self.sell_price-price)*count*self.multiple - SERVICE_CHARGE*count
self.money += delta_money
hold_seconds = (self.time- self.open_time).seconds
reward = delta_money
if hold_seconds > 0:
reward = delta_money/hold_seconds
self.open_time = None
#print(f"{str(self.time)} CLOSE SELL : {price} {count} {delta_money}")
return reward
def buy(self,price:np.float32,count:np.int32):
need_open_count = count - self.short_order
reward = 0
if self.short_order >= count:
reward+=self.buy_close(price,count)
if need_open_count > 0 :
reward+=self.buy_open(price,need_open_count)
return reward
def sell(self,price:np.float32,count:np.int32):
need_open_count = count - self.long_order
reward = 0
if self.long_order >= count:
reward += self.sell_close(price,count)
if need_open_count > 0 :
reward += self.sell_open(price,need_open_count)
return reward
def close_all(self)->np.float64:
delta_money = 0
hold_seconds = 0
if self.short_order > 0:
hold_seconds = (self.time- self.open_time).seconds
self.mortgage -= self.sell_price * self.short_order
delta_money = (self.sell_price-self.current_price)*self.short_order*self.multiple - SERVICE_CHARGE*self.short_order
print(f"{str(self.time)} CLOSE ALL SELL : {self.current_price} {self.short_order}")
self.short_order = 0
self.open_time = None
if self.long_order > 0:
hold_seconds = (self.time- self.open_time).seconds
self.mortgage -= self.buy_price * self.long_order
delta_money= (self.current_price-self.buy_price)*self.long_order*self.multiple - SERVICE_CHARGE*self.long_order
print(f"{str(self.time)} CLOSE ALL BUY : {self.current_price} {self.long_order}")
self.long_order = 0
self.open_time = None
self.money += delta_money
print(f"{str(self.time)} Money : {delta_money} {self.money}")
reward = delta_money
if hold_seconds> 0:
reward = delta_money/hold_seconds
return reward
def get_fortune(self):
price:np.float64 = self.current_price
sell_money = (self.sell_price-price)*self.short_order*self.multiple
buy_money = (price-self.buy_price)*self.long_order*self.multiple
return self.money+buy_money+sell_money
class TrainingEnv(gym.Env):
"""Futures trading environment for OpenAI gym"""
metadata = {'render.modes': ['human']}
def __init__(self,all_trade_day):
super(TrainingEnv, self).__init__()
# Actions of the format Buy x%, Sell x%, Hold, etc.
self.action_space = spaces.Box(low=np.float16(-1), high=np.float16(1),shape=(1,),dtype=np.float16)
# Prices contains the OHCL values for the last five prices
self.observation_space = spaces.Box(low=np.float32(-1), high=np.float32(1), shape=(OBS_COUNT,), dtype=np.float32)
self.all_trade_day = all_trade_day
self.holder = FutureHolder()
self.data_center = DataCenter()
self.current_step = 0
self.trade_day = None
self.last_valume = 0
self.tick_data = pd.DataFrame()
def step(self, action):
self.last_valume = np.float32(self.last_info["volume"])
self.holder.prev()
self.current_step += 1
done = self._is_done()
reward = 0
if done == False :
self.last_info = self.tick_data.iloc[self.current_step]
self.holder.step(self.last_info)
reward = self._take_action(action[0])
obs = self._get_frame_data()
else:
reward = self.holder.close_all()
obs = np.zeros(shape=(OBS_COUNT),dtype=np.float32)
return obs, reward, done, {}
def reset(self):
self.holder = FutureHolder()
while(self._is_done()):
index = random.randint(0,len(self.all_trade_day)-1)
self.trade_day = self.all_trade_day[index]
self.tick_data = self.data_center.get_training_info(self.trade_day)
self.last_valume = np.float32(self.tick_data.iloc[0]["volume"])
self.current_step = 1
self.last_info = self.tick_data.iloc[self.current_step]
observation = self._get_frame_data()
return observation
def _is_done(self):
if self.current_step >= len(self.tick_data):
return True
current_monery = self.holder.get_fortune()
if current_monery < self.holder.befor_money*0.98 or current_monery < INITIAL_CAPITAL*0.9:
return True
return False
def _get_frame_data(self):
holder_data = {
"buy_order":self.holder.long_order,
"sell_order":self.holder.short_order,
"buy_price":self.holder.buy_price,
"sell_price":self.holder.sell_price,
"last_valume":self.last_valume,
}
obs = helper.get_obs(self.last_info,holder_data)
return obs
def render(self, mode='human'):
if self.holder.mortgage>0:
print(f'{str(self.holder.time)} Money : {self.holder.get_fortune()},{self.holder.money}')
def close (self):
self.holder.close_all()
print(f'Money : {self.holder.get_fortune()}')
#执行对应的action
def _take_action(self, action):
n = round(action)
if n == 1 :
# 多头 买
if self.holder.long_order < MAX_ORDER:
sell_price = np.float64(self.last_info.get("buy_price"))
self.holder.buy(sell_price,ONCE_ORDER)
elif n == -1 :
if self.holder.short_order < MAX_ORDER:
buy_price = np.float64(self.last_info.get("sell_price"))
self.holder.sell(buy_price,ONCE_ORDER)
reward = self.holder.get_fortune() - self.holder.befor_money
return reward
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。