打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
python 适用通达信

说明:适用通达信,别的没测试,
使用:python 文件名.py -t 999999 20070101 20070131 表示 用文本文件转化为1分钟5分钟的数据文件,代码是上证指数,日期 20070101到20070131 生成的5分钟数据文件为 sh999999.lc5 ,一分钟文件为 sh999999.lc1
注意:能直接看5分钟的数据,但不能直接看1分钟的,可能的原因是1分钟的数据文件不是sh999999.lc1形式的,有知道的请告知;但是可以换个方法看1分钟的图,方法是sh999999.lc1更名为 sh999999.lc5 ,然后在5分钟图下看1分钟的K线了。
以下是代码

#!/usr/bin/python
#-*- encoding: gbk -*-
from __future__ import division
from   struct   import   *
import os,time ,datetime,string,sys,math,re,shutil,glob
import zipfile,StringIO,getopt
#from readths2 import *
# 2010-09-02 by 厚朴

basedir = r'D:\2965\guosen' #如果你的安装路径不同,请改这里

exp_dir    = basedir + r'\T0002\export'
#exp_dir    = basedir + r'\T0002\export_back'
lc5_dir_sh = basedir + r'\Vipdoc\sh\fzline'
#lc5_dir_sh =  r'D:\2965\ydzqwsjy\Vipdoc\sh\fzline'
lc5_dir_sz = basedir + r'\Vipdoc\sz\fzline'
day_dir_sh = basedir + r'\Vipdoc\sh\lday'
day_dir_sz = basedir + r'\Vipdoc\sz\lday'

stkdict = {} #存储股票ID和上海市、深圳市的对照

#############################################################
# read 通达信分笔数据
# example readfbtxt(readlines(),'20100831-600000.TXT')
# 返回的data格式为
# (stkid,datetime,price,amount,vol(股数),笔数,buy or sale) 的list
#############################################################
def readfbtxt(p_lines,p_name):
    """读通达信分笔数据 """
    data = []
    shortname = os.path.split(p_name)[1]
    shortname = os.path.splitext(shortname)[0]
    sDay,stkID = shortname.split('-')
    if len(sDay) != 8 : return data
    stky = int(sDay[0:4])
    stkm = int(sDay[4:6])
    stkd = int(sDay[6:8])
    line_no = 0

    for l in p_lines:
        line_no += 1
        if line_no <=3: continue
        l = l.strip()
        t = re.split('\s+',l)
        k =  datetime.datetime(stky,stkm,stkd,int(t[0][0:2]),int(t[0][3:5]))
        p = float(t[1])      #price
        vol = int(t[2])*100  #股数
        amt = p * vol        #成交量
        bscnt = 0            #笔数
        bstag = ''           #buy or sale
        try:
            bscnt = int(t[3])    #笔数
            bstag = t[4]         #buy or sale
        except IndexError,e:
            pass
        data.append((stkID,k,p,amt,vol,bscnt,bstag))
    return data


#############################################################
# 将分笔数据转化为分笔数据
# p_data:传入参数 为readfbtxt所返回
# data:  返回的数据格式为
# [stkid,datetime,open,high,low,close,amt,vol(股)]
#############################################################
def fbtxt2lc0(p_data):
    """分笔数据转化为1分钟数据"""
    data = []
    for i in p_data:
        t = i[1]     #datetime
        p = i[2]     #price
        data.append([i[0],t,p,p,p,p,i[3],i[4]])
    return data

#############################################################
# 将分笔数据转化为1分钟数据
# p_data:传入参数 为readfbtxt所返回
# data:  返回的数据格式为
# [stkid,datetime,open,high,low,close,amt,vol(股)]
#############################################################
def fbtxt2lc1(p_data):
    """分笔数据转化为1分钟数据"""
    data = []
    for i in p_data:
        t = i[1]     #datetime
        p = i[2]     #price
        lend = len(data)
        j = lend - 1
        while j >= 0:
            if data[j][1] == t:break
            j -= 1
        if j < 0:  #没有找到该时间
            data.append([i[0],t,p,p,p,p,i[3],i[4]])
        else:          #找到该时间
            if p > data[j][3]:  #high
                data[j][3] = p
            if p < data[j][4]:  #low
                data[j][4] = p
            data[j][5] = p      #close
            data[j][6] += i[3]  #amout
            data[j][7] += i[4]  #vol
    #data.sort(key = lambda x:x[1])  #以datetime 排序
    return data


#############################################################
# 一个时间对应的5分钟区间段
# dt 传入参数 为一个datetime.datetime or datetime.time
# 返回datetime 或time
#############################################################
def which5min(dt):
    """5 分钟时间划分 """
    if type(dt) != datetime.datetime and  type(dt) != datetime.time:
        return None
    t = dt
    ret = None
    if type(dt) == datetime.datetime:
        t = datetime.time(dt.hour,dt.minute,dt.second)

    if t < datetime.time(9,30) : return None
    if   t < datetime.time(9,35): ret = datetime.time(9,35)
    elif t < datetime.time(9,40): ret = datetime.time(9,40)
    elif t < datetime.time(9,45): ret = datetime.time(9,45)
    elif t < datetime.time(9,50): ret = datetime.time(9,50)
    elif t < datetime.time(9,55): ret = datetime.time(9,55)
    elif t < datetime.time(10,0): ret = datetime.time(10,0)
    elif t < datetime.time(10,5): ret = datetime.time(10,5)
    elif t < datetime.time(10,10): ret = datetime.time(10,10)
    elif t < datetime.time(10,15): ret = datetime.time(10,15)
    elif t < datetime.time(10,20): ret = datetime.time(10,20)
    elif t < datetime.time(10,25): ret = datetime.time(10,25)
    elif t < datetime.time(10,30): ret = datetime.time(10,30)
    elif t < datetime.time(10,35): ret = datetime.time(10,35)
    elif t < datetime.time(10,40): ret = datetime.time(10,40)
    elif t < datetime.time(10,45): ret = datetime.time(10,45)
    elif t < datetime.time(10,50): ret = datetime.time(10,50)
    elif t < datetime.time(10,55): ret = datetime.time(10,55)
    elif t < datetime.time(11,0): ret = datetime.time(11,0)
    elif t < datetime.time(11,5): ret = datetime.time(11,5)
    elif t < datetime.time(11,10): ret = datetime.time(11,10)
    elif t < datetime.time(11,15): ret = datetime.time(11,15)
    elif t < datetime.time(11,20): ret = datetime.time(11,20)
    elif t < datetime.time(11,25): ret = datetime.time(11,25)
    elif t <= datetime.time(11,30): ret = datetime.time(11,30)
   elif t < datetime.time(13,0): ret = datetime.time(13,0)
    elif t < datetime.time(13,5): ret = datetime.time(13,5)
    elif t < datetime.time(13,10): ret = datetime.time(13,10)
    elif t < datetime.time(13,15): ret = datetime.time(13,15)
    elif t < datetime.time(13,20): ret = datetime.time(13,20)
    elif t < datetime.time(13,25): ret = datetime.time(13,25)
    elif t < datetime.time(13,30): ret = datetime.time(13,30)
    elif t < datetime.time(13,35): ret = datetime.time(13,35)
    elif t < datetime.time(13,40): ret = datetime.time(13,40)
    elif t < datetime.time(13,45): ret = datetime.time(13,45)
    elif t < datetime.time(13,50): ret = datetime.time(13,50)
    elif t < datetime.time(13,55): ret = datetime.time(13,55)
    elif t < datetime.time(14,0): ret = datetime.time(14,0)
    elif t < datetime.time(14,5): ret = datetime.time(14,5)
    elif t < datetime.time(14,10): ret = datetime.time(14,10)
    elif t < datetime.time(14,15): ret = datetime.time(14,15)
    elif t < datetime.time(14,20): ret = datetime.time(14,20)
    elif t < datetime.time(14,25): ret = datetime.time(14,25)
    elif t < datetime.time(14,30): ret = datetime.time(14,30)
    elif t < datetime.time(14,35): ret = datetime.time(14,35)
    elif t < datetime.time(14,40): ret = datetime.time(14,40)
    elif t < datetime.time(14,45): ret = datetime.time(14,45)
    elif t < datetime.time(14,50): ret = datetime.time(14,50)
    elif t < datetime.time(14,55): ret = datetime.time(14,55)
    elif t <= datetime.time(15,0):  ret = datetime.time(15,0)
    else : return None
    if type(dt) == datetime.datetime:
        return datetime.datetime(dt.year,dt.month,dt.day,ret.hour,ret.minute,ret.second)
    else: return ret


#############################################################
# 将1分钟数据转为5分钟数据
# p_data:传入参数 为fbtxt2lc1所返回
# data:  返回的数据格式为
# [stkid,datetime,open,high,low,close,amt,vol(股)]
#############################################################
def lc1tolc5(p_data):
    """1分钟数据转化为5分钟数据 """
    if len(p_data) <= 0: return None
    data = []
    for i in p_data:
        t = which5min(i[1])   #找对应5分钟的区段
        if t == None:
            raise ValueError,'time out of range: %s' % i[1]
        lend = len(data)
        j = lend - 1
        while j >= 0:
            if data[j][1] == t:break
            j -= 1
        if j < 0:  #没有找到该时间
            data.append([i[0],t,i[2],i[3],i[4],i[5],i[6],i[7]])
        else:          #找到该时间
            if i[3] > data[j][3]:  #high
                data[j][3] = i[3]
            if i[4] < data[j][4]:  #low
                data[j][4] = i[4]
            data[j][5] = i[5]      #close
            data[j][6] += i[6]     #amout
            data[j][7] += i[7]     #vol
    #data.sort(key = lambda x:x[1])  #以datetime 排序
    return data


#############################################################
# read 5分钟数据
# example readlc5(r'E:\new_gxzq_v6\Vipdoc\sh\fzline\sh600000.lc5')
#############################################################
def readlc5(p_name):
        """tdx 5min 数据
           日期上低16位表示月日,高16位表示分钟
           这个结构个人感觉就不如同花顺做的巧妙
               在一个4字节中把 年 月 日 时 分 都记录下来了
        """
        f = open(p_name,'rb')
        stkID = os.path.split(p_name)[1]
        stkID = os.path.splitext(stkID)[0]
        if string.lower(stkID[0:2]) == 'sh' or string.lower(stkID[0:2]) == 'sz':
            stkID = stkID[2:]
        icnt = 0
        data = []
        while 1:
            raw = f.read(4*8)
            if len(raw) <= 0 : break
            t = unpack('IfffffII',raw)
            mins = (t[0] >> 16) & 0xffff
            mds  = t[0] & 0xffff
            month = int(mds / 100)
            day   = mds % 100
            hour = int(mins / 60)
            minute = mins % 60
            #datet = "d-d d:d" % (month,day,hour,minute)
            data.append((stkID,(month,day,hour,minute),t[1],t[2],t[3],t[4],t[5],t[6],t[7]))
            #print datet,t[1],t[2],t[3],t[4],t[5],t[6],t[7]
            icnt += 1
        ## end while
        f.close()
        return data

#############################################################
#构造通达信5min数据文件
# data 结构
#[stkID,(月,日,时,分),open,high,low,close,amt,vol,0]
#############################################################
def writelc5(p_name,data,addwrite = True):
    if addwrite :
        fout = open(p_name,'ab')
    else:
        fout = open(p_name,'wb')
    for i in data:
        t = i[1][0]*100+i[1][1] + ( (i[1][2] * 60 + i[1][3]) << 16)
        raw = pack('IfffffII',t,i[2],i[3],i[4],i[5],i[6],i[7],i[8])
        fout.write(raw)
    ## end for
    fout.close()

#############################################################
# outlist
# 将list 或者 tuple 输出
# 递归垂直输出,格式不好
#############################################################
def outlist(l):
    if type(l) != list and type(l) != tuple:
        print l
    else:
        for i in l:
            outlist(i)

#############################################################
# outlist2 二层输出,对于上文中的data 实用
# 传入参数 data,一个形如矩阵的list
#############################################################
def outlist2(p_data):
    for i in p_data:
        for j in i:
            print j,
        print    

#############################################################
# 正数与二进制表示互相表示
#
#############################################################
#------------------------------
#-- i2bin 整数转为 2进制字符串
#------------------------------
def i2bin(x):
        result = ''
        x = int(x)
        while x > 0:
               mod = x & 0x01 # 取2的余数
               x = x >> 0x01  # 右移一位
               result = str(mod) + result
        return result

#------------------------------
#-- bin2i 2进制字符串 转为正数
#------------------------------
def bin2i(bin):
    result = 0
    for s in bin:
        if s != '0' and s != '1':
            raise ValueError,'bad bin string:'+bin
        result = 2*result + int(s)
    return result


#############################################################
# fill_stkdict 填充全局变量字典 stkdict
#
#############################################################
def fill_stkdict():
    global stkdict
    lsh = os.listdir(day_dir_sh)
    for l in lsh:
        if len(l) <= 4:continue
        l = string.lower(l)
        if l[-3:] != 'day': continue
        n = os.path.splitext(l)[0]
        if n[0:2] == 'sh' or n[0:2] == 'sz':
            n = n[2:]
        stkdict[n] = 'sh'
    lsz = os.listdir(day_dir_sz)
    for l in lsz:
        if len(l) <= 4:continue
        l = string.lower(l)
        if l[-3:] != 'day': continue
        n = os.path.splitext(l)[0]
        if n[0:2] == 'sh' or n[0:2] == 'sz':
            n = n[2:]
        stkdict[n] = 'sz'

def getMarketByID(id):
    global stkdict
    if len(stkdict) == 0: fill_stkdict()
    return stkdict.setdefault(id,'')


#copy file   
#dstname = os.path.join(lc5_dir_sh,fout)
#shutil.copy(fout,dstname)


def writelcfiles(p_lines,p_name,lctype = 'lc5lc1',addfile = True):
    """
写内容
p_lines 文件行
l_name  短文件名
lctype  要转化生成的分钟类型 lc5 表示5分钟 lc1 表示1分钟 lc0表示分笔的
addfile True表示追加文件 False 表示覆盖
    """
    data1  = readfbtxt(p_lines,p_name)
    if len(data1) ==0: return
    data2 = fbtxt2lc1(data1)
    data3 = lc1tolc5(data2)

    ## lc5 5分钟文件
    if 'lc5' in lctype:
        data = []
        for i in data3:
            data.append([i[0],(i[1].month,i[1].day,i[1].hour,i[1].minute),i[2],i[3],i[4],i[5],i[6],i[7],0])
        if len(data) == 0 :
            sys.stderr.write('Error:no data in data\n')
            sys.exit(1)
        stkID = data[0][0]
        mark = getMarketByID(stkID)
        if mark == '':
            sys.stderr.write('不能确定它的市场:%s.请检查代码!\n' % stkID)
        else:
            fout = mark + stkID + '.lc5'
            if mark == 'sh':
                fout = os.path.join(lc5_dir_sh,fout)
            else:
                fout = os.path.join(lc5_dir_sz,fout)       

            writelc5(fout,data,addfile)
        #endif
    #endif
    if 'lc1' in lctype :
        data = []
        for i in data2:
            data.append([i[0],(i[1].month,i[1].day,i[1].hour,i[1].minute),i[2],i[3],i[4],i[5],i[6],i[7],0])
        if len(data) == 0 :
            sys.stderr.write('Error:no data in data\n')
            sys.exit(1)
        stkID = data[0][0]
        mark = getMarketByID(stkID)
        if mark == '':
            sys.stderr.write('不能确定它的市场:%s.请检查代码!\n' % stkID)
        else:
            fout = mark + stkID + '.lc1'
            if mark == 'sh':
                fout = os.path.join(lc5_dir_sh,fout)
            else:
                fout = os.path.join(lc5_dir_sz,fout)       
   
            writelc5(fout,data,addfile)
        #endif.
    #endif.

    #lc0 分笔的K线文件
    if 'lc0' in lctype:
        data0 = fbtxt2lc0(data1)
        data = []
        for i in data0:
            data.append([i[0],(i[1].month,i[1].day,i[1].hour,i[1].minute),i[2],i[3],i[4],i[5],i[6],i[7],0])
        if len(data) == 0 :
            sys.stderr.write('Error:no data in data\n')
            sys.exit(1)
        stkID = data[0][0]
        mark = getMarketByID(stkID)
        if mark == '':
            sys.stderr.write('不能确定它的市场:%s.请检查代码!\n' % stkID)
        else:
            fout = mark + stkID + '.lc0'
            if mark == 'sh':
                fout = os.path.join(lc5_dir_sh,fout)
            else:
                fout = os.path.join(lc5_dir_sz,fout)       
   
            writelc5(fout,data,addfile)
        #endif.
    #endif.

 

def convert(p_stkid,p_type = 'txt',filterfunc = None):
    if p_type == 'txt' : #txt file
        txtfiles = glob.glob(os.path.join(exp_dir,'*-'+p_stkid+'.txt'))
        if filterfunc :
            txtfiles = filter(filterfunc,txtfiles)
        txtfiles.sort()
        l_i = 0
        for fname in txtfiles:
            sys.stderr.write('%s\n' % fname)
            try:
                doc_lines = file(fname).readlines()
            except IOError , e :
                sys.stderr.write("Open file %s fail!\n" % fname)
                continue
            if len(doc_lines) <=4:
                sys.stderr.write('No data in %s\n' % fname)
                continue
            if l_i == 0 :
                writelcfiles(doc_lines,fname,'lc5lc1',False) ##over write
            else:
                writelcfiles(doc_lines,fname,'lc5lc1',True)  ##add write
            l_i += 1
        #endfor
           
    else:                #zipfile
        try:
            fzip = zipfile.ZipFile(os.path.join(exp_dir,p_stkid+'.zip'))
        except IOError,e:
            print 'Can not open file!',e
            return
        zipedfiles = fzip.namelist()
        if len(zipedfiles) == 0:
            return
        if filterfunc :
            zipedfiles = filter(filterfunc,zipedfiles)
        zipedfiles.sort()
        l_i = 0
        for fname in zipedfiles:
            sys.stderr.write('%s\n' % fname)
            doc = fzip.read(fname)
            doc_lines = StringIO.StringIO(doc).readlines()       
            if l_i == 0 :
                writelcfiles(doc_lines,fname,'lc5lc1',False) ##over write
            else:
                writelcfiles(doc_lines,fname,'lc5lc1',True)  ##add write
            l_i += 1
        #endfor
    #endif.


#############################################################
# usage 使用说明
#
#############################################################

def usage(p):
    print """
python %s [-t txt|zip] stkid [from] [to]
-t txt 表示从txt files 读取数据,否则从zip file 读取(这也是默认方式)
for example :
python %s 999999 20070101 20070302
python %s -t txt 999999 20070101 20070302
    """ % (p,p,p)

 

if __name__ == '__main__':
    """
    python readtdxlc5.py 999999 20070101 20070131
    """
    argv = sys.argv[1:]
    try :
        opts, args = getopt.getopt(argv, "ht:", ["help", "type="])
    except getopt.GetoptError:
        usage(sys.argv[0])
        sys.exit(0)
    l_type = 'zip'  #default type is zipfiles!
    for opt, arg in opts:               
        if opt in ("-h", "--help"):
            usage(sys.argv[0])
            sys.exit(0)
        elif opt in ("-t", "--type"):
            l_type = arg
    if len(args) <1:
        print 'You must specified the stock No.!'
        usage(sys.argv[0])
        sys.exit(0)

    stkid = args[0]
    l_from = None
    l_to = None
    try:
        l_from = args[1]
        l_to   = args[2]
    except :
        pass

    #过滤函数
    def filfunc(x):
        if l_from == None and l_to == None:
            return True
        ymd = os.path.splitext(os.path.split(x)[1])[0].split('-')[0]
        if l_from and l_to:
            return ymd >= l_from and ymd <= l_to
        elif l_from :
            return ymd >= l_from
        else :
            return ymd <= l_to
   
    if l_type == 'txt': # 从一般txt 文件
        convert(stkid,'txt',filfunc)
    else:
        convert(stkid,'zip',filfunc)
       
    mark = getMarketByID(stkid)
    if mark == '':
        sys.stderr.write('不能确定它的市场:%s.请检查代码!\n' % stkid)       
    else:
       os.system('copy D:\\2965\\guosen\\Vipdoc\\'+ mark +'\\fzline\\'+ mark + stkid +'.lc5 D:\\2965\\my_yd\\Vipdoc\\'+ mark +'\\fzline\\')
       os.system('copy D:\\2965\\guosen\\Vipdoc\\'+ mark +'\\fzline\\'+ mark + stkid +'.lc1 D:\\2965\\ydzqwsjy\\Vipdoc\\' + mark + '\\fzline\\' + mark + stkid +'.lc5')

    #data = readlc5(os.path.join(lc5_dir_sh,'sh601398.lc5'))
    #outlist2(data)

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
怎么样才能得到07年08年的1分钟线数据呢,python和通达信...
024 实战一,使用理杏仁数据模拟基金定投
重磅----网格交易支持股票,可转债,ETF,自定义等,提供源代码,打包程序。
Dji RoboMaster Tello SDK封装.3(回复指令解包类)
python调用shell脚本 获取shell脚本中间的输出值
把backtrader改造成金融强化学习回测引擎
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服