打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
python量化,获取量化模型之一技术方法,17种技术指标——量化06
userphoto

2023.06.29 浙江

关注

17 种技术指标代码

import pandas as pdimport numpy as npfrom scipy import statsfrom datetime import datetime, timedeltaimport akshare as akimport datetime# 获取日线历史行情def get_stock_data(symbol, table_name=None): today = datetime.date.today() end_date_str = today.strftime('%Y%m%d') start_date = (today - datetime.timedelta(days=7 * 365)).strftime('%Y%m%d') if table_name == '美股中国企业价值': symbol = '106.' + symbol stock_data = ak.stock_us_hist( symbol=symbol, period='daily', start_date=start_date, end_date=end_date_str, adjust='qfq' ) elif table_name == '纳指100价值': symbol = '105.' + symbol stock_data = ak.stock_us_hist( symbol=symbol, period='daily', start_date=start_date, end_date=end_date_str, adjust='qfq' ) elif table_name == '港股价值': stock_data = ak.stock_hk_hist( symbol=symbol, period='daily', start_date=start_date, end_date=end_date_str, adjust='qfq' ) elif table_name == '场内基金价值': stock_data = ak.index_zh_a_hist( symbol=symbol, period='daily', start_date=start_date, end_date=end_date_str ) else: if symbol.startswith(('1', '5')): stock_data = ak.fund_etf_hist_em( symbol=symbol, period='daily', start_date=start_date, end_date=end_date_str, adjust='qfq' ) else: stock_data = ak.stock_zh_a_hist( symbol=symbol, period='daily', start_date=start_date, end_date=end_date_str, adjust='qfq' ) return stock_data# 获取简单移动平均线,参数有2个,一个是数据源,一个是日期def MA(data, n): MA = pd.Series(data['收盘'].rolling(n).mean(), name='MA_' + str(n)).iloc[-1] close = data['收盘'].iloc[-1] signal = np.where(MA < close, 1, np.where(MA > close, -1, 0)) return pd.DataFrame(signal.reshape(1, -1), columns=['MA_' + str(n)]) # 获取指数移动平均线,参数有2个,一个是数据源,一个是日期def EMA(data, n): EMA = pd.Series(data['收盘'].ewm(span=n, min_periods=n).mean(), name='EMA_' + str(n)).iloc[-1] close = data['收盘'].iloc[-1] signal = np.where(EMA < close, 1, np.where(EMA > close, -1, 0)) return pd.DataFrame(signal.reshape(1, -1), columns=['EMA_' + str(n)])# 获取一目均衡表基准线 (data, conversion_periods, base_periods, lagging_span2_periods, displacement)# 参数有5个,第一个是数据源,其他4个分别是一目均衡表基准线 (9, 26, 52, 26),即ichimoku_cloud(data,9, 26, 52, 26)def ichimoku_cloud(data, conversion_periods, base_periods, lagging_span2_periods, displacement): def donchian(length): return (data['high'].rolling(length).max() + data['low'].rolling(length).min()) / 2 conversion_line = donchian(conversion_periods) base_line = donchian(base_periods) lead_line1 = (conversion_line + base_line) / 2 lead_line2 = donchian(lagging_span2_periods) # lagging_span = data['close'].shift(-displacement + 1).shift(25n>).ffill() # leading_span_a = lead_line1.shift(displacement - 1) # leading_span_b = lead_line2.shift(displacement - 1) # ichimoku_data = pd.concat([conversion_line, base_line, lagging_span, lead_line1, lead_line2], axis=1) # ichimoku_data.columns = ['Conversion Line', 'Base Line', 'Lagging Span', 'lead_line1', 'lead_line2'] # price = data['close'].iloc[-1] base = base_line.iloc[-1] conversion = conversion_line.iloc[-1] lead1 = lead_line1.iloc[-1] lead2 = lead_line2.iloc[-1] conversion_prev = conversion_line.shift(1).iloc[-1] close = data['close'].iloc[-1] signal = np.where((base < close) & (conversion_prev < close) & (conversion > close) & (lead1 > close) & (lead1 > lead2), 1, np.where((base > close) & (conversion_prev > close) & (conversion < close) & (lead1 < close) & (lead1 < lead2), -1, 0)) return pd.DataFrame(signal.reshape(1, -1), columns=['Ichimoku Cloud'])# 成交量加权移动平均线 VWMA (data, 20),参数有2个,1个是数据源,另一个是日期,通过为20def VWMA(data, n): VWMA = pd.Series((data['close'] * data['volume']).rolling(n).sum() / data['volume'].rolling(n).sum(), name='VWMA_' + str(n)).iloc[-1] close = data['close'].iloc[-1] signal = np.where(VWMA < close, 1, np.where(VWMA > close, -1, 0)) return pd.DataFrame(signal.reshape(1, -1), columns=['VWMA_' + str(n)])# 计算Hull MA船体移动平均线 Hull MA (data,9),参数有2,一个是数据源,另一个是日期,一般为9def HullMA(data, n=9): def wma(series, period): weights = np.arange(1, period + 1) return series.rolling(period).apply(lambda x: np.dot(x, weights) / weights.sum(), raw=True) source = data['close'] wma1 = wma(source, n // 2) * 2 wma2 = wma(source, n) hullma = wma(wma1 - wma2, int(math.<>n>floor(math.sqrt(n)))).iloc[-1] close = data['close'].iloc[-1] signal = np.where(hullma < close, 1, np.where(hullma > close, -1, 0)) return pd.DataFrame(signal.reshape(1, -1), columns=['HullMA_' + str(n)])# 计算RSI指标,参数有2,一个为数据源,另一个为日期,一般为14,即RSI(data, 14)def RSI(data, n): lc = data['close'].shift(1) diff = data['close'] - lc up = diff.where(diff > 0, 0) down = -diff.where(diff < 0, 0) ema_up = up.ewm(alpha=1/n, adjust=False).mean() ema_down = down.ewm(alpha=1/n, adjust=False).mean() rs = ema_up / ema_down rsi = 100 - 100 / (1 + rs) rsi_plus = rsi.iloc[-1] rsi_prev = rsi.iloc[-2] signal = np.where((rsi_plus < 30) & (rsi_plus > rsi_prev), 1, np.where((rsi_plus > 70) & (rsi_plus < rsi_prev), -1, 0)) return pd.DataFrame(signal.reshape(1, -1), columns=['RSI_' + str(n)])# 计算Stochastic,k是主线,d_signal是信号线,参数有4,一个是数据源,另外三个为日期,一般为STOK(data, 14, 3, 3)def STOK(data, n, m, t): high = data['high'].rolling(n).max() low = data['low'].rolling(n).min() k = 100 * (data['close'] - low) / (high - low) d = k.rolling(m).mean() d_signal = d.rolling(t).mean() data['%K'] = k data['%D'] = d data['%D_signal'] = d_signal main_line = data['%K'].iloc[-1] signal_line = data['%D_signal'].iloc[-1] main_line_prev = data['%K'].iloc[-2] signal_line_prev = data['%D_signal'].iloc[-2] signal = np.where((main_line < 20) & (main_line_prev < signal_line_prev) & (main_line > signal_line), 1, np.where((main_line > 80) & (main_line_prev > signal_line_prev) & (main_line < signal_line), -1, 0)) return pd.DataFrame(signal.reshape(1, -1), columns=['STOK1'])# 计算 CCI 指标,参数有 2,一个是数据源,另一个是日期,一般为 20,即 CCI(data, 20)def CCI(data, n): TP = (data['high'] + data['low'] + data['close']) / 3 MA = TP.rolling(n).mean() MD = TP.rolling(n).apply(lambda x: np.abs(x - x.mean()).mean()) CCI = (TP - MA) / (0.015 * MD) CCI_plus = CCI.iloc[-1] CCI_prev = CCI.iloc[-2] signal = np.where((CCI_plus < -100) & (CCI_plus > CCI_prev), 1, np.where((CCI_plus > 100) & (CCI_plus < CCI_prev), -1, 0)) return pd.DataFrame(signal.reshape(1, -1), columns=['CCI_' + str(n)])# 平均趋向指数ADX(14),参数有2,一个是数据源,另一个是日期,一般为14,即ADX(data,14)def ADX(data, n): up = data['high'] - data['high'].shift(1) down = data['low'].shift(1) - data['low'] plusDM = pd.Series(np.where((up > down) & (up > 0), up, 0)) minusDM = pd.Series(np.where((down > up) & (down > 0), down, 0)) truerange = np.maximum(data['high'] - data['low'], np.maximum(np.abs(data['high'] - data['close'].shift()), np.abs(data['low'] - data['close'].shift()))) plus = 100 * plusDM.ewm(alpha=1/n, min_periods=n).mean() / truerange.ewm(alpha=1/n, min_periods=n).mean() minus = 100 * minusDM.ewm(alpha=1/n, min_periods=n).mean() / truerange.ewm(alpha=1/n, min_periods=n).mean() sum = plus + minus adx = 100 * (np.abs(plus - minus) / np.where(sum == 0, 1, sum)).ewm(alpha=1/n, min_periods=n).mean() adx_plusDI = adx.iloc[-1] > 20 and plus.iloc[-1] > minus.iloc[-1] and plus.iloc[-2] < minus.iloc[-2] adx_minusDI = adx.iloc[-1] > 20 and plus.iloc[-1] < minus.iloc[-1] and plus.iloc[-2] > minus.iloc[-2] signal = np.where(adx_plusDI, 1, np.where(adx_minusDI, -1, 0)) return pd.DataFrame(signal.reshape(1, -1), columns=['ADX'])# 计算动量震荡指标(AO),参数只有一个,即数据源def AO(data): data['AO'] = (data['high'].rolling(5).mean() + data['low'].rolling(5).mean()) / 2 - (data['high'].rolling(34).mean() + data['low'].rolling(34).mean()) / 2 AO = data['AO'].iloc[-3:] AO_plus = (AO.iloc[0] < AO.iloc[1]) and (AO.iloc[1] < AO.iloc[2]) and (AO.iloc[2] > 0) AO_minus = (AO.iloc[0] > AO.iloc[1]) and (AO.iloc[1] > AO.iloc[2]) and (AO.iloc[2] < 0) signal = np.where(AO_plus, 1, np.where(AO_minus, -1, 0)) return pd.DataFrame(signal.reshape(1, -1), columns=['AO'])# 计算动量指标(10),参数只有一个,即数据源def MTM(data): data['MTM'] = data['close'] - data['close'].shift(10) MTM = data['MTM'].iloc[-2:] signal = np.where(MTM.iloc[0] < MTM.iloc[1], 1, np.where(MTM.iloc[0] > MTM.iloc[1], -1, 0)) return pd.DataFrame(signal.reshape(1, -1), columns=['MTM'])# MACD_1是以金叉和死叉进行判断,参数有3个,第一个是数据源,其余两个为日期,一般取1226,即MACD(data, 12,26)def MACD_1(data, n_fast, n_slow): EMAfast = data['close'].ewm(span=n_fast, min_periods=n_slow).mean() EMAslow = data['close'].ewm(span=n_slow, min_periods=n_slow).mean() data['MACD'] = EMAfast - EMAslow data['MACDsignal'] = data['MACD'].ewm(span=9, min_periods=9).mean() data['MACDhist'] = data['MACD'] - data['MACDsignal'] signal = np.where(data['MACDhist'].iloc[-2] < 0 and data['MACDhist'].iloc[-1] > 0, 1, np.where(data['MACDhist'].iloc[-2] > 0 and data['MACDhist'].iloc[-1] < 0, -1, 0)) return pd.DataFrame(signal.reshape(1, -1), columns=['MACD_1'])# MACD_2是以MACD柱是否大于0进行判断,tradingview的判断依据def MACD_2(data, n_fast, n_slow): EMAfast = data['close'].ewm(span=n_fast, min_periods=n_slow).mean() EMAslow = data['close'].ewm(span=n_slow, min_periods=n_slow).mean() data['MACD'] = EMAfast - EMAslow data['MACDsignal'] = data['MACD'].ewm(span=9, min_periods=9).mean() data['MACDhist'] = data['MACD'] - data['MACDsignal'] signal = np.where(data['MACDhist'].iloc[-1] > 0, 1, np.where(data['MACDhist'].iloc[-1] < 0, -1, 0)) return pd.DataFrame(signal.reshape(1, -1), columns=['MACD_2'])# 计算Stoch_RSI(data,3, 3, 14, 14),有5个参数,第1个为数据源def Stoch_RSI(data, smoothK=3, smoothD=3, lengthRSI=14, lengthStoch=14): # 计算RSI lc = data['close'].shift(1) diff = data['close'] - lc up = diff.where(diff > 0, 0) down = -diff.where(diff < 0, 0) ema_up = up.ewm(alpha=1/lengthRSI, adjust=False).mean() ema_down = down.ewm(alpha=1/lengthRSI, adjust=False).mean() rs = ema_up / ema_down rsi = 100 - 100 / (1 + rs) # 计算Stochastic stoch = (rsi - rsi.rolling(window=lengthStoch).min()) / (rsi.rolling(window=lengthStoch).max() - rsi.rolling(window=lengthStoch).min()) k = stoch.rolling(window=smoothK).mean() d = k.rolling(window=smoothD).mean() # 添加到data中 data['Stoch_RSI_K'] = k * 100 data['Stoch_RSI_D'] = d * 100 trend = np.where(data['close'].iloc[-1] > data['close'].iloc[-10], 1, -1) signal = np.where(trend == 1 and data['Stoch_RSI_K'].iloc[-1] < 20 and data['Stoch_RSI_D'].iloc[-1] < 20 and data['Stoch_RSI_K'].iloc[-1] > data['Stoch_RSI_D'].iloc[-1] and data['Stoch_RSI_K'].iloc[-2] < data['Stoch_RSI_D'].iloc[-2], 1, np.where(trend == -1 and data['Stoch_RSI_K'].iloc[-1] > 80 and data['Stoch_RSI_D'].iloc[-1] > 80 and data['Stoch_RSI_K'].iloc[-1] < data['Stoch_RSI_D'].iloc[-1] and data['Stoch_RSI_K'].iloc[-2] > data['Stoch_RSI_D'].iloc[-2], -1, 0)) return pd.DataFrame(signal.reshape(1, -1), columns=['Stoch_RSI'])# 计算威廉百分比变动,参数有2,第1是数据源,第二是日期,一般为14,即WPR(data, 14)def WPR(data, n): WPR = pd.Series((data['high'].rolling(n).max() - data['close']) / (data['high'].rolling(n).max() - data['low'].rolling(n).min()) * -100, name='WPR_' + str(n)) lower_band = -80 upper_band = -20 signal = np.where((WPR.iloc[-1] < lower_band) & (WPR.iloc[-1] > WPR.iloc[-2]), 1, np.where((WPR.iloc[-1] > upper_band) & (WPR.iloc[-1] < WPR.iloc[-2]), -1, 0)) return pd.DataFrame(signal.reshape(1, -1), columns=['WPR_' + str(n)])# 计算Bull Bear Power牛熊力量(BBP),参数有2,一个是数据源,另一个是日期,一般为20,但在tradingview取13,即BBP(data, 13)def BBP(data, n): bullPower = data['high'] - data['close'].ewm(span=n).mean() bearPower = data['low'] - data['close'].ewm(span=n).mean() BBP = bullPower + bearPower data['BBP'] = BBP trend = np.where(data['close'].iloc[-1] > data['close'].iloc[-n], 1, np.where(data['close'].iloc[-1] < data['close'].iloc[-n], -1, 0)) signal = np.where((trend == 1) & (bearPower.iloc[-1] < 0) & (bearPower.iloc[-1] > bearPower.iloc[-2]), 1, np.where((trend == -1) & (bullPower.iloc[-1] > 0) & (bullPower.iloc[-1] < bullPower.iloc[-2]), -1, 0)) return pd.DataFrame(signal.reshape(1, -1), columns=['BBP'])# 计算Ultimate Oscillator终极震荡指标UO (data,7, 14, 28),有4个参数,第1个是数据源,其他的是日期def UO(data, n1, n2, n3): min_low_or_close = pd.concat([data['low'], data['close'].shift(1)], axis=1).min(axis=1) max_high_or_close = pd.concat([data['high'], data['close'].shift(1)], axis=1).max(axis=1) bp = data['close'] - min_low_or_close tr_ = max_high_or_close - min_low_or_close avg7 = bp.rolling(n1).sum() / tr_.rolling(n1).sum() avg14 = bp.rolling(n2).sum() / tr_.rolling(n2).sum() avg28 = bp.rolling(n3).sum() / tr_.rolling(n3).sum() UO = 100 * ((4 * avg7) + (2 * avg14) + avg28) / 7 signal = np.where(UO.iloc[-1] > 70, 1, np.where(UO.iloc[-1] < 30, -1, 0)) return pd.DataFrame(signal.reshape(1, -1), columns=['UO']) if __name__ == '__main__': # MA(get_stock_data(symbol, table_name=None), n) data = get_stock_data('000001', table_name=None) data1 = MA(data, 10) data2 = EMA(data, 10) print(data) print(data1) print(data2)

代码说明

上面代码共有 18 个自定义函数:

  1. def get_stock_data(symbol, table_name=None):获取日线历史行情数据,可以是指数、股票和基金。参数有 2 个,分别是代码和数据来源对代码增加市场,第 2 个为非必要参数。
  2. 其他 17 个自定义函数都是技术指标函数,包括 MA、IC、VWMA、HullMA、RSI、STOK、CCI、ADX、AO、MTM、MACD_1、MACD_2、Stoch_RSI、WPR、BBP 和 UO。其中技术指标的参数 n 都是技术的日期,比如 MA(data, 10)为 10 日均线指标。

需要修改的代码

根据自己的需要求不同日期的技术指标,比如需要判断 10 日均线是否符合买入,可以用 MA(data, 10)获取 10 日均线是否符合。若返回结果为 1,则说明现价>10日均线。

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
量化交易平台重构:数据篇
菜鸟第一次提取TCGA编码蛋白基因和lncRNA表达谱
东方财富 量化交易(自动)程序(1)
如何手动将ORACLE的sql建表语句转换为MYSQL的建表语句
__FILE__,__LINE__,__DATE__,__TIME__
免费开源!常用策略函数的复用与累积
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服