今天的都是干货。
我的每一篇文章丝丝相扣,不可孤篇而论。
1,修改个装饰器:
backtrader把股票名设为了data._name,本文不建议通过这种方式进行抽取。最好是在linseries.py里加入装饰器@property
# in linseries.py
# @property
# def name(self):
# return self._name
这样我们就可以用self.name了
2,交易软件设置一下:先开通模拟交易,(每个券商的交易项是有微调的,我以模拟交易为例演示,各自需要微调。)
这些个内容哥哥券商不一致。
买入卖出界面设置成空的,我们自己填入,不需要自动填入。
3,自动化流程简述:
从历史数据中拿到自己想要的数据
9点25开盘数据生成,开始选股
将持仓和选股内容合并,开始是监控
触发买入条件买入
4,引入依赖设置
import pandas as pd
# import tushare as ts
import numpy as np
import requests
import datetime
import backtrader as bt
import time
import easyquotation
import csv
# 进度条
from tqdm import tqdm
import time
from functools import wraps
import os
import comdata
import rich
import jqktrader
from feed import SimpleLivingData
# 设置输出对齐
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 500)
pd.set_option('display.max_colwidth', 500)
pd.set_option('display.unicode.ambiguous_as_wide', True)
pd.set_option('display.unicode.east_asian_width', True)
5,懂得都懂:
作为全段看见.dot就头疼,本想写个句柄教程的,以后再说吧,这个是我git上搜的,能用就行,理论简单,操作硬性,没什么难的。
最好的方案还是改easytrader,放心。
# 这是个交易端
user = jqktrader.use()
user.connect(
exe_path=r'D:\同花顺软件\同花顺\xiadan.exe',
tesseract_cmd=r'C:\Program Files\Tesseract-OCR\tesseract.exe'
)
user.enable_type_keys_for_editor()
# user.position
6,加入时间测试,毕竟自动化时间就是命根:
def timefn(fn):
'''计算性能的修饰器'''
@wraps(fn)
def measure_time(*args, **kwargs):
t1 = time.time()
result = fn(*args, **kwargs)
t2 = time.time()
print(f'@timefn: {fn.__name__} took {t2 - t1: .5f} s')
return result
return measure_time
7,每一次触发,写入csv文件中,作为记录:
# 记载每一次的交易
def trader_save(sa):
row=sa
nowdate=datetime.datetime.now().strftime('%Y%m%d')
# 文件统一命名TK+日期
dirname='TK'+str(nowdate)+'.csv'
with open(dirname,'a',newline='' ,encoding='utf8') as cf:
wf=csv.writer(cf)
wf.writerow(row)
这个是今天下午的交易记录,时间上基本可用:
8,读取历史数据,和用历史数据生产的交易参数,此数据属于过去式,可以作为全局变量,加载到交易系统中。
读取的文件,就是盘前选股的文章中生成的文件。
# 读取今日股票仓库
df0=pd.read_csv('./code_wash.csv', )
df0.set_index('code', inplace=True,drop=False)
date_list_old=comdata.get_nowdata_list()
date_list_old=date_list_old[['今开' , '昨收' , '价格' , '最高' , '最低' , '成交量' , '日期' , '时间' , '代码']]
date_list_old['code']=date_list_old['代码']
date_list_old.set_index('code', inplace=True,drop=True)
out = pd.concat([date_list_old,df0], axis=1).sort_index()
out =out.dropna(axis=0, how='any')
out['买入阈值TK']=out['今开']+out['atrTK']
out['买入下阈值TK']=out['今开']-out['atrTK']
print(out)
写一个函数,按需从历史数据中调取:
# 从wash文件中获取数据
# 0.005m速度
# @timefn
def get_washdata(code,wap):
global out
try:
keydata=out.loc[code,wap]
except:
keydata=None
print(code,wap,'不存在')
# print(smban)
return keydata
写一个函数,按需读取实时数据,
# 返回今日实时数据
# @0.009m
# @timefn
def get_nowvalue(code,wap):
realtick=comdata.get_nowdata_all(code,wap)
return realtick
9,这里写个案例,可以利用8中的数据做一个冲高后回落的卖出条件:
# 冲高后,回落3%函数
@timefn
def get_grolue(code,lvwap):
# print(code)
try:
tnowh=get_nowvalue(code,'最高')
yclose=get_nowvalue(code,'昨收')
ylow=get_washdata(code,'最低')
except:
return print(code,'数据有误')
if tnowh>yclose:
tnowhight=tnowh
else:
tnowhight=yclose
# 择日修改,这里有负数没有处理
growlue=abs((tnowhight-ylow)/20)*lvwap
if (tnowh-yclose)*100/yclose>12:
growlue=abs((tnowhight-ylow)/20)*2
return float('%.3f' % growlue)
10,获取股票全局代码名称的函数,待修改,不完美
# 获得股票基本数据(证券代码)
# @timefn
def get_tickes(wap):
stocks = pd.read_csv('ticketslib.csv')
stocks = stocks[~stocks['名称'].str.contains('ST|\*|退')]
if wap == 'sz':
cyb = stocks[stocks['代码'].str.contains('^sz300|^sz301')]
if wap == 'sh':
cyb = stocks[stocks['代码'].str.contains('^sh60|^sz000')]
return cyb
11,tk策略代码,将就看吧,我出一期专门的介绍。
class TK(bt.Strategy):
params = dict(
exitbars = 5,
min_period = 21,
)
#日志格式
def log(self, txt, dt=None,doprint=True):
if doprint:
dt = self.datas[0].datetime.datetime(0)
print('%s, %s' % (dt.isoformat(), txt))
def __init__(self):
global df0
self.inds=dict()
self.volnow5=dict()
self.priceright=dict()
self.buy_cgvol_change=dict()
self.buy_bgvol_change=dict()
self.buy_apvol_change=dict()
self.buy_dpvol_change=dict()
self.selehigh=dict()
self.selelow=dict()
self.selevolume=dict()
self.selemidban=dict()
self.selemaxhigh=dict()
self.seleminlow=dict()
self.seleovlban=dict()
self.selesmvo5=dict()
self.opt_cyq_70=dict()
self.buy_oh_signal=dict()
self.buy_numvol=dict()
self.buy_svol_change=dict()
self.buy_kucun=dict()
self.buy_bigvol=dict()
self.buy_sobigvol=dict()
self.buy_minvol=dict()
self.buy_om_signal=dict()
self.buy_mh_signal=dict()
self.buy_cyq10_signal=dict()
self.buy_cyq30_signal=dict()
self.buy_cl_signal=dict()
self.today_na=dict()
self.today_mo_v=dict()
self.buy_setchange=dict()
self.timecheng=dict()
self.buy_sellvol=dict()
self.time_use=dict()
self.time_set=dict()
self.time_sum=dict()
self.time_cut=dict()
self.last_dt = datetime.datetime.now()
self.sele_flv=dict()
self.buy_ov_signal=dict()
# 有可用量,才可卖出
self.sell_tf=dict()
self.sell_mil=dict()
self.sell_grolue=dict()
self.sell_selfgrolue=dict()
#加入成交量排名指数
vol_jzll=None
vol_jz=[]
self.vol_num=[]
self.trader_list=dict()
self.trader_list_oldd=dict()
vol_jzp=[]
vol_jzpee=[]
vol_jzcross=[]
vol_zqmc=[]
vol_kez=[]
self.trader_name_old=[]
vol_jzpower=0
vol_cross=False
self.today_open=dict()
self.today_mot=dict()
self.today_mo=dict()
self.today_mo_t=dict()
self.today_mb=dict()
self.seleopen=dict()
self.seleclose=dict()
self.seleuse=dict()
self.seleuptk=dict()
self.tod_low=dict()
self.buy_ol_signal=dict()
self.buy_ol_t_signal=dict()
self.trader_vol=dict()
nowdate=datetime.datetime.now().strftime('%Y%m%d')
# 文件统一命名TD+日期
dirname='TK'+str(nowdate)+'.csv'
try:
f =open(dirname,'r')
df2=pd.read_csv(dirname,header = 0,encoding='utf8')
# print(df.head(2))
# 读取最后取值
grouped = df2.groupby('证券代码')
f =open(dirname,'a')
for name,group in grouped:
print(name)
self.trader_name_old.append(name)
# print(group)
trader_list_oldd = grouped.apply(lambda group:group.tail(1) )
self.trader_list_oldd=pd.DataFrame(trader_list_oldd,columns=['证券代码','证券名称','成熟度','时间','上穿高线','数量','可用','冻量','成交价','涨跌','振幅','成交量','调阈','交易信号'])
self.trader_list_oldd.set_index('证券代码',drop=False,inplace=True)
print(self.trader_list_oldd,'已经读取')
print(dirname,'存在,已经读取')
except IOError:
df2=pd.DataFrame(None,index=None,columns=['证券代码','证券名称','成熟度','时间','上穿高线','数量','可用','冻量','成交价','涨跌','振幅','成交量','调阈','交易信号'])
with open(dirname, 'a',newline='',encoding='utf8') as f:
wf=csv.writer(f)
wf.writerow(df2)
f.close()
print(dirname,'不存在;22已经创建')
for i,d in enumerate(self.datas):
opentod=get_nowvalue(d.name,'名称')
vol_jz.append(d.name)
vol_zqmc.append(opentod)
vol_jzp.append(vol_jzll)
vol_jzcross.append(vol_cross)
vol_jzpee.append(vol_jzpower)
self.trader_list['证券代码']=vol_jz
self.trader_list['app']=vol_jz
self.trader_list['证券名称']=vol_zqmc
# self.trader_list['成熟度']=vol_jzpee
self.trader_list['时间']=vol_jzp
self.trader_list['数量']=vol_jzpee
self.trader_list['可用']=vol_jzpee
self.trader_list['冻量']=vol_jzpee
self.trader_list['成交价']=vol_jzp
self.trader_list['涨跌']=vol_jzp
self.trader_list['振幅']=vol_jzp
self.trader_list['成交量']=vol_jzp
self.trader_list['交易信号']=vol_jzp
# try:
self.selehigh[d]=get_washdata(d.name,'最高')
self.selelow[d]=get_washdata(d.name,'最低')
self.selevolume[d]=get_washdata(d.name,'成交量')
self.today_open[d]=get_nowvalue(d.name,'今开')
self.today_na[d]=get_nowvalue(d.name,'名称')[-4:]
self.selehigh[d]=get_washdata(d.name,'最高')
self.seleopen[d]=get_washdata(d.name,'open')
self.seleclose[d]=get_washdata(d.name,'close')
self.seleuptk[d]=get_washdata(d.name,'买入阈值TK')
print(self.seleuptk[d],'---------成交量')
# 上穿最高线
self.buy_oh_signal[d]=bt.ind.CrossUp(d.close,self.seleuptk[d])
self.trader_list=pd.DataFrame(self.trader_list)
self.trader_list = self.trader_list.copy()
self.trader_list.set_index('证券代码', inplace=True)
print(self.trader_list,'读取数据后的')
# 读取持仓
try:
for code in self.trader_name_old:
self.trader_list.loc[code,'app'] =self.trader_list_oldd.loc[code,'证券代码']
self.trader_list.loc[code,'证券名称'] =self.trader_list_oldd.loc[code,'证券名称']
self.trader_list.loc[code,'时间'] =self.trader_list_oldd.loc[code,'时间']
print(self.trader_list,'第一次赋值')
except:
pass
try:
position = user.position
pos=pd.DataFrame(position)
pos=pos.copy()
print(pos)
kcstock=[]
for code in pos['证券代码']:
kcstock.append(code)
pos.set_index('证券代码', inplace=True)
# 设置已有股票
for code in kcstock:
codecc= 'sz'+code
# 不同的券商需要写特定的列表
try:
self.trader_list.loc[codecc,'数量'] =pos.loc[code,'股份余额']
self.trader_list.loc[codecc,'可用'] =pos.loc[code,'可用股份']
self.trader_list.loc[codecc,'冻量'] =pos.loc[code,'冻结数量']
self.trader_list.loc[codecc,'证券名称'] =pos.loc[code,'证券名称']
except:
# 应该是中信的
self.trader_list.loc[codecc,'数量'] =pos.loc[code,'股票余额']
self.trader_list.loc[codecc,'可用'] =pos.loc[code,'可用余额']
self.trader_list.loc[codecc,'冻量'] =pos.loc[code,'冻结数量']
self.trader_list.loc[codecc,'证券名称'] =pos.loc[code,'证券名称']
except:
pass
self.order = None
self.buyprice = None
self.buycomm = None
self.size=0
self.count=0
print(len(self.trader_list))
print('开始交易!!!')
def notify_order(self, order):
#订单状态处理
if order.status in [order.Submitted, order.Accepted]:
return
# 买入卖出订单处理
if order.status in [order.Completed]:
if order.isbuy():
self.log('买入单价: %.2f, 买入总金额: %.2f, 手续费 %.2f,数量%.2f'%
(order.executed.price,
order.executed.value *-1,
order.executed.comm,
order.executed.size))
elif order.issell():
self.log('卖出单价: %.2f, 总资产: %.2f, 手续费 %.2f,数量:%.2f' %
(order.executed.price,
self.broker.get_cash(),
order.executed.comm,
order.executed.size))
self.bar_executed = len(self)
elif order.status in [order.Canceled, order.Margin, order.Rejected]:
self.log('订单取消/保证金不足/拒绝')
# Write down: no pending order
self.order = None
def notify_trade(self, trade):
if not trade.isclosed:
return
self.log('OPERATION PROFIT, GROSS %.2f, NET %.2f' %
(trade.pnl, trade.pnlcomm))
# @timefn
def next(self):
for i,d in enumerate(self.datas):
# 股票处于上涨状态
self.priceright[d]=(d.open[0]-d.open[-1])>0
# 卖出
self.sell_tf[d]=self.trader_list.loc[d.name,'可用'] > 0
self.time_set[d]=self.last_dt
# FMT = '%H:%M:%S.%f'
self.time_use[d]=datetime.datetime.now()
self.time_sum[d]=(self.time_use[d]-self.time_set[d]).seconds
self.last_dt=self.time_use[d]
cname=d.name
self.time_cut[d]=self.time_sum[d]>12
if self.time_cut[d]==1:
print('无效数据')
if self.time_sum[d]>6:
print('[{cname:<{len}}\t读取成交量间隔:{dt}:\t交易时间:{dnow}'.format(cname=d.name+']',len=8-len(cname.encode('GBK'))+len(cname),dt=self.time_sum[d],dnow= self.time_use[d]))
# 时间分段
self.trader_sell_time=datetime.datetime.now().time()>datetime.time(9, 29) and datetime.datetime.now().time()<datetime.time(14,55)
self.trader_buy_time=datetime.datetime.now().time()>datetime.time(9, 45) and datetime.datetime.now().time()<datetime.time(14, 45)
self.trader_time=datetime.datetime.now().time()>datetime.time(9, 30) and datetime.datetime.now().time()<datetime.time(14,55)
self.trader_no_time=datetime.datetime.now().time()>datetime.time(9, 24) and datetime.datetime.now().time()<datetime.time(9,47)
if self.priceright[d] ==1 and self.trader_time==1:
self.trader_list.loc[d.name,'涨跌']='上涨'
if self.priceright[d] !=1 and self.trader_time==1:
self.trader_list.loc[d.name,'涨跌']='下跌'
self.trader_list.loc[d.name,'时间']= datetime.datetime.now().time()
self.buy_kucun[d]=self.trader_list.loc[d.name,'数量']==0
self.trader_list.loc[d.name,'成交价']=d.close[0]
# 格式化成交量100的倍数
def downcast(amount, lot):
return abs(amount//lot*lot)
try:
# 买入金额分组以61.8万为单仓金额
mon27=20000
if self.buy_oh_signal[d]==1:
balance = (user.balance['可用金额'])
if balance >0:
pass
else:
balance=0
print(d.name,'不满足预留金额')
# 给自己的交易模式起个名字
self.trader_list.loc[d.name,'交易信号'] ='穿云箭'
trader_save(self.trader_list.loc[d.name])
# 满足一个仓位交易
if balance >mon27:
stake= downcast(mon27/self.datas[i].close, 100)
if stake!=0:
buy_no=user.buy(d._name[2:8], price=self.today_mb[d], amount = stake)
self.trader_list.loc[d.name,'数量'] = stake+self.trader_list.loc[d.name,'数量']
print(buy_no)
self.order = self.buy(data = d,size=stake)
self.log(f'买入{d.name}, 买入价格:{d.close[0]:.2f}, 买入数量: {stake}')
self.trader_list.loc[d.name,'交易信号'] ='穿云箭'
trader_save(self.trader_list.loc[d.name])
else:
pass
except Exception as e:
print('错误类型是',e.__class__.__name__)
print('错误明细是',e)
def stop(self):
pass
注意这里:
是买入条件,单纯的演示用
利用买入条件,买入股票:
12,bt的主程序
@timefn
def runstart():
# cpus=6,防止死机
cerebro = bt.Cerebro(oldtrades=False, stdstats=False,maxcpus=6)
for code in stockpool:
codename=code
feed = SimpleLivingData(codename, stockpool,timeframe=bt.TimeFrame.Ticks, compression=1)
cerebro.adddata(feed,name=codename)
cerebro.addstrategy(TK)
#回测-资金规则(总资产和手续费)
balance = (user.balance)['可用金额']
print('可用金额',balance)
# 这没用,broker间接等于自定义
cerebro.broker.setcash(1000000)
cerebro.broker.setcommission(0.01)
print('开始自动化交易 Running')
result = cerebro.run(runonce=False)
print('结束运行 Finish')
13,从交易软件读取股票信息:
# 读取证券中的数据
@timefn
def get_ck():
position = user.position
pos=pd.DataFrame(position)
pos=pos.copy()
pos.set_index('证券代码')
kcstock=[]
for code in pos['证券代码']:
codecc= 'sz'+code
kcstock.append(codecc)
print('\033[0;31m%s\033[0m' % kcstock)
# print(kcstock)
return kcstock
14,查看可用资金
def get_position():
position = user.position
print(position)
15,选股程序,以及主程序
if __name__ == '__main__':
global out
out['代码']=out['code']
print (out)
# 读取股票池
selec_dict=out[(out['今开'] >=out['stand_price'])&(out['买入阈值TK'] <out['zt'])][['代码']]
print('选中股票:' ,len(selec_dict),'支')
selec_csv=selec_dict
stockpool=selec_csv['代码']
print(stockpool,)
# 尝试读取持仓股票代码
try:
position = user.position
pos=pd.DataFrame(position)
pos=pos.copy()
pos.set_index('证券代码')
kcstocks=[]
print(pos)
for code in pos['证券代码']:
if '60' in code:
codecc= 'sh'+code
else:
codecc= 'sz'+code
print(codecc)
if codecc in selec_csv['code'].values.tolist():
pass
else:
kcstocks.append(codecc)
stockpool=np.append(stockpool,kcstocks)
except:
pass
# 查看一下代码,股票池+持仓
print(stockpool)
runstart()
bingo,跑起来了
预计周末我有时间,把演示程序整理出来。
看一下今天的结果:
多少不尽人意,还是要用周易
2023年5月24日观瞻
联系客服