本文演示了采用Python脚本,配合MongoDB数据库,读取股票数据文件,并写入数据库的过程。 导入数据库的方法支持单线程及多线程两种方式。经过本地开发机测试,开启2-3个线程时,导入速度快于单线程导入;当开启线程超过4个之后,导入速度反而小于单线程;猜测可能是跟机器CPU核心数有关,建议开始CPU核心数N-1的线程。例如,若CPU核心数为8时,可以开启7个线程进行导入,可大幅提高导入速度。 Python操作MongoDB,需要引入pymongo模块,可自行搜索该模块的安装方法,不再赘述。以下是Python代码:
#!/usr/bin/env python#coding:utf-8import threadingimport pymongoimport timeclass UtilFile: def IsSubString(SubStrList,Str): flag=True for substr in SubStrList: if not(substr in Str): flag=False return flag def GetFileList(FindPath,FlagStr=[]): import os FileList=[] FileNames=os.listdir(FindPath) if (len(FileNames)>0): for fn in FileNames: if (len(FlagStr)>0): #返回指定类型的文件名 if (UtilFile.IsSubString(FlagStr,fn)): #fullfilename=os.path.join(FindPath,fn) #FileList.append(fullfilename) FileList.append(fn) else: #默认直接返回所有文件名 #fullfilename=os.path.join(FindPath,fn) #FileList.append(fullfilename) FileList.append(fn) #对文件名排序 if (len(FileList)>0): FileList.sort() return FileList ''' 函数功能:读取通达信数据导出文件内容,导入到数据库中 入参: conn_db: 导入所使用的数据库 file_dir: 数据文件目录 file_list: 数据目录下文件名List file_type: 数据文件类型。枚举值。 M1:1分钟线数据; D: 日线数据 tb_pre_name: 数据表名称前缀 print_level: 打印级别。枚举值。0:不打印; 1:按文件打印; 2:按记录打印 thread_name: 线程名称(打印输出用) ''' def ImpDataFromFile(conn_db, file_dir, file_list, file_type, tb_pre_name, print_level, thread_name): # file_list = UtilFile.GetFileList(file_dir) #获取1分钟线目录下文件名清单 file_num = len(file_list) #文件总数量 file_prct = round(1/file_num,4) #每文件占总数比 # 读取文件清单中每个文件的内容,并解析每行记录 { file_cnt = 0 for file in file_list: file_cnt += 1 #当前处理文件序号 tm_start = time.clock() #计算文件中的总行数 { op = open(file_dir+'/'+file) record_num = 0 for line in op: record_num += 1 #计算文件中的总行数 } #将文件逐行读入表中 { op = open(file_dir+'/'+file) record_cnt = 0 for line in op: if not("数据来源" in line): content_temp = line.split('\n') content = content_temp[0].split('\t') if file_type == "M1": record = { "date":content[0], #日期 "time":content[1], #时间 "begin":float(content[2]), #开盘 "high":float(content[3]), #最高 "low":float(content[4]), #最低 "end":float(content[5]), #收盘 "vol":int(content[6]), #成交量 "amt":float(content[7])#成交额 } elif file_type == "D": record = { "date":content[0], #日期 "begin":float(content[1]), #开盘 "high":float(content[2]), #最高 "low":float(content[3]), #最低 "end":float(content[4]), #收盘 "vol":int(content[5]), #成交量 "amt":float(content[6])#成交额 } clct = conn_db[tb_pre_name+file] #打开/创建 数据表 #计算处理百分比 { record_cnt += 1 if print_level == 2: file_prcs = file_prct*(file_cnt-1) record_prct = round(record_cnt/record_num,4) ttl_prcs = round((file_prcs+file_prct*record_prct)*100,2) print(thread_name+"\t"+str(ttl_prcs)+"%"+"\t"+str(file_cnt)+"/"+str(file_num)+"\t"+file+"\t"+str(record_cnt)+"/"+str(record_num)) #计算处理百分比 } #确保索引存在 { if file_type == "M1": clct.ensure_index([("date", pymongo.DESCENDING), ("time", pymongo.DESCENDING)]) if clct.find({"date":content[0],"time":content[1]}).count() > 0: pass #防止重复导入 else: clct.insert(record) elif file_type == "D": clct.ensure_index([("date", pymongo.DESCENDING)]) if clct.find({"date":content[0]}).count() > 0: pass #防止重复导入 else: clct.insert(record) #确保索引存在 } #将文件逐行读入表中 } tm_finish = time.clock() tm_spend = round(tm_finish - tm_start, 2) if print_level == 1: file_prcs = round(file_prct*file_cnt*100,2) print(thread_name+"\t"+str(file_prcs)+"%"+"\t"+str(file_cnt)+"/"+str(file_num)+"\t"+file+"\t"+str(tm_spend)+"s") # 读取文件清单中每个文件的内容,并解析每行记录 } print(thread_name+"\t"+"Import Done!") ''' 函数功能:使用多线程方式导入数据入口方法 入参: imp_thread_num: 开启线程数 conn_db: 导入所使用的数据库 file_dir: 数据文件目录 file_type: 数据文件类型。枚举值。 M1:1分钟线数据; D: 日线数据 tb_pre_name: 数据表名称前缀 print_level: 打印级别。枚举值。0:不打印; 1:按文件打印; 2:按记录打印 ''' def ImpDataFromFileMultThrd(imp_thread_num, conn_db, file_dir, file_type, tb_pre_name, print_level): file_list = UtilFile.GetFileList(file_dir, ".txt") thrd_num = imp_thread_num thrd_task = [] for i in range(thrd_num): thrd_task.append([]) for i in range(len(file_list)): idx = divmod(i,thrd_num)[1] thrd_task[idx].append(file_list[i]) print(thrd_task) for i in range(thrd_num): thrd = MyThread(i,"Thread-"+str(i+1),[conn_db, file_dir, thrd_task[i-1], file_type, tb_pre_name, print_level]) thrd.start()class MyThread (threading.Thread): #继承父类threading.Thread def __init__(self, threadID, name, func_args): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.conn_db = func_args[0] self.file_dir = func_args[1] self.file_list = func_args[2] self.file_type = func_args[3] self.tb_pre_name = func_args[4] self.print_level = func_args[5] def run(self): #把要执行的代码写到run函数里面 线程在创建后会直接运行run函数 print("Starting " + self.name) UtilFile.ImpDataFromFile(self.conn_db, self.file_dir, self.file_list, self.file_type, self.tb_pre_name, self.print_level, self.name) print("Exiting " + self.name)
#!/usr/bin/env python#coding:utf-8'''函数功能:根据传入的字符串,删除名称匹配的聚集 入参:db_conn: 数据库连接clct_substr: 需匹配名称的字符串'''def DelClcts(db_conn, clct_substr): a = db_conn.collection_names() b = [] for i in a: if clct_substr in i: b.append(i) db_conn[i].drop() print("Drop:"+i)'''函数功能:对Dict按Key排序入参:dic: 数据字典'''def SortDictKeys(dic): return sorted(dic.items(), key=lambda d: d[0])
#!/usr/bin/env python#coding:utf-8from util_file import UtilFile# from util_thread import MyThreadimport pymongo#************************************************************************#常量定义清单#************************************************************************#文件stck_m1_file_dir = "D:/new_tdx/T0002/export/min1" #1分钟线数据文件目录stck_day_file_dir = "D:/new_tdx/T0002/export/day" #1分钟线数据文件目录#数据库ip = '127.0.0.1'port = 27017db_name = 'robin'#表名称bs_m1_pre = 'BS_M1_'#基础表_1分钟线数据表前缀bs_day_pre = 'BS_DAY_'#基础表_日线数据表前缀#************************************************************************#主程序#************************************************************************conn = pymongo.MongoClient(host=ip, port=port) #连接Mongodb数据库服务器db = conn[db_name] #选择存储数据库#************************************************************************##从文件中导入数据到基础表#************************************************************************# UtilFile.ImpDataFromFileMultThrd(2, db, stck_m1_file_dir, "M1", bs_m1_pre, 1) #导入1分钟线数据UtilFile.ImpDataFromFileMultThrd(2, db, stck_day_file_dir, "D", bs_day_pre, 1) #导入日线数据
联系客服