分类 数据后台 下的文章

量化练手:基于北向资金的量化策略

量化练手

最近在学习量化,在使用各种量化框架之前,试着不基于其他框架,只基于pandas和 matplotlib,编写一个量化策略,做量化学习的练手,同时也是对不熟悉的python做个练手。

量化策略

参考北向资金的每日净流入情况,决定是否买入沪深300指数。
具体策略如下:
1、获得北向资金每日净流入数,北向资金净流入为 沪港通与深港通净流入数据之和。
2、生成北向资金每日净流入资金的布林线;(此处采用100天均值,1.5倍标准差)
3、如果当日净流入高于布林线,则认为买入沪深300指数,次日按开盘价买入90%的仓位,如果连续高出,只买入一次;
4、如果当日净流入低于布林线,则认为卖出,次日按开盘价清仓。
5、交易手续费为万分之二;
6、参考一些指数基本的收费规则,如果持有天数低于7天,卖出时手续费会很高,我设置了trade_interval,默认为7天,也就是至少持仓7天,才会卖出。

数据来源

北向资金数据与指数数据来爬取自tt_fund。
beixiang_20211001.csv
market_index_20211001.csv

结果分析

北向资金布林线情况:
布林线.png
策略运行结果, 其中benchMark为沪深指数蓝线,红线为策略的运行结果。
回测曲线.png
从 2016-12-07开始至2021-10-01,总共执行买操作25次,卖操作25次.
10000的起初资金,最终市值为16436.075215, 近5年 64%的收益,高于沪深300 40%的收益。

详细代码

github地址 https://github.com/quentinxxz/QuantLib/tree/main/beixiang

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Sep 21 15:14:53 2021
@author: quentinxxz
"""

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

"""
Created on Sun Sep 19 20:42:39 2021
@author: quentinxxz
"""

import pandas as pd
import datetime  
import matplotlib.pyplot as plt



# 数据准备处理

def prepareBeixiangData():

    #爬下来的北向资金数据
    df = pd.read_csv('beixiang_20211001.csv')
    
    df['datetime']= pd.to_datetime(df['datetime'] ,format="%Y-%m-%d")
    
    # 1表示沪股通, 3为深股通, 4为
    df_1 = df[df['marketType'] == 1]
    df_3 = df[df['marketType'] == 3]
    
    df = pd.merge(df_1,df_3,suffixes=('_1', '_3'),how='left',on='datetime' )


    # 合并沪股通与深股通数据 
    df['total_in']=df['total_in_1']+df['total_in_3']
    df['total_out']=df['total_out_1']+df['total_out_3']
    df['total_net_in']=df['total_net_in_1']+df['total_net_in_3']
    df['grand_total_in']=df['grand_total_in_1']+df['grand_total_in_3']
    df['today_balance']=df['today_balance_1']+ df['today_balance_3']
    df=df.loc[:,['datetime','total_in','total_out','total_net_in','grand_total_in','today_balance']]
    
    
    # 按日期排序
    df.set_index('datetime',drop=False,append=False,inplace=True)
    df.sort_index(ascending=True,inplace=True)
    #df.sort_values(by='datetime',ascending=True,inplace= True) 

    # 计算布林线
    df['std'] = df['total_net_in'].rolling(100).std()
    df['mean'] = df['total_net_in'].rolling(100).mean()
    df['lower_interval'] = df['mean'] - 1.5*df['std']
    df['upper_interval'] = df['mean'] + 1.5*df['std']
        
    
    # 选取一段数据做展示
    #s_date = datetime.datetime.strptime('20190401', '%Y%m%d')
    #e_date = datetime.datetime.strptime('20200508', '%Y%m%d')

    # d_t = df[(df['datetime']<e_date) & (df['datetime']>s_date) ]
    # d_t.plot.line(x='datetime',y=['total_net_in','mean','lower_interval','upper_interval'])
    return df



# 准备沪深300指数
def prepareShangZheng():
    df= pd.read_csv('market_index_20211001.csv')
    df['datetime']= pd.to_datetime(df['datetime'])
    df.set_index('datetime',drop=False,append=False,inplace=True)
    df=df[df['code']==300]
    df.sort_index(ascending=True,inplace=True)
    return df





# 计算买卖时机,deprecated
def updateBuyAndSell(bx):  
    bx.loc[bx['lower_interval']>bx['total_net_in'], 'trade']=1
    bx.loc[bx['upper_interval']<bx['total_net_in'], 'trade']=-1

    

# 交易回测 
# bx为北向资金数据,
# benchmark为 回测参考 本例中采用上证指烽,
# cost_ration为手续费比例,默认万二, 
# init_cap为初始资金,
# delay代表 基于北向资金分析数据,几日后实施交易,至少为1天
def backTrade(bx,benchmark,cost_ration=0.0002,init_cap=10000,delay=1,trade_interval=7):
    count=0
    trade_records = pd.DataFrame(columns=['operation','trade_val','trade_vol','acc_val_end','acc_vol_end','total_value','capital','order_cost'],index=['datetime'])
    for pos in range(len(bx)) :
        #取对应的交易日
        next_trade_pos = pos+ delay
        if next_trade_pos >= len(bx):
            break
        trade_day = bx.iloc[next_trade_pos]['datetime']
    
        # 如果北向资金净流入,低于布林线,做卖出操作 operation -1代表卖出
        if pd.isnull(bx.iloc[pos]['lower_interval'])==False and bx.iloc[pos]['lower_interval']> bx.iloc[pos]['total_net_in']:
            record=pd.DataFrame({'operation':-1},index=[trade_day])  
            count=count+1

        # 如果北向资金净流入,高于布林线,做买入操作 operation 1代表买入
        elif pd.isnull(bx.iloc[pos]['upper_interval'])==False and bx.iloc[pos]['upper_interval'] < bx.iloc[pos]['total_net_in']:
            count=count+1
            record=pd.DataFrame({'operation':1},index=[trade_day]) 
        # 其他情况,不做操作    
        else:
            record=pd.DataFrame({'operation':0},index=[trade_day]) 
            
        trade_records= pd.concat([trade_records, record])

    # 删除首行
    trade_records.drop(['datetime'],inplace=True)  
    
    # 初始时持币
    holdCash =True
    capital =  init_cap
    acc_vol_end =0
    acc_val_end =0
    total_val = capital
    sellCount =0
    buyCount = 0
    delaySell=False

    for index ,row in trade_records.iterrows():
        
        if  row['operation'] ==1: 
            delaySell=False

    
        # 持币且operation 为1时,买入
        if  row['operation'] ==1 and holdCash==True:
            
            #按开盘价买入,收盘价计算价格,每次交易90%本金,

            trade_val = capital*0.9  # 交易额
            trade_vol =  trade_val/benchmark.loc[index,'price_end']  # 交易量
            order_cost = trade_val * cost_ration # 交易手续费
            capital = capital-trade_val-order_cost  #剩余本金
            acc_vol_end = acc_vol_end +  trade_vol # 收盘账户持仓数量
            acc_val_end = acc_vol_end * benchmark.loc[index,'price_end']  # 收盘账户持仓金额
            total_val= acc_val_end + capital # 收盘总金额

            print( "buy, date:%s, trade_val:%.2f, acc_val_end:%.2f total_val:%.2f"%(index, trade_val,acc_val_end,total_val))

            record=pd.DataFrame({'operation':1,
                  'trade_val':trade_val, 
                  'trade_vol': trade_vol,
                  'acc_vol_end': acc_vol_end,
                  'acc_val_end': acc_val_end,
                  'total_value':total_val,
                  'capital':capital,
                  'order_cost':order_cost},
                 index=[index])   
            trade_records.update(record)
            holdCash =False
            buyCount=buyCount+1
            buyDay= index #纪录购买时
        elif  (delaySell or row['operation'] == -1) and holdCash== False:
            
            if  index-buyDay<  datetime.timedelta(days=trade_interval) :
                print('delaySell,buyDay:%s, currentDay:%s'%(buyDay,index))
                delaySell =True
                acc_val_end = acc_vol_end * benchmark.loc[index,'price_end']  # 按收盘计算
                total_val= acc_val_end + capital
                record=pd.DataFrame({'operation':0,
                     'trade_val':0, 
                     'trade_vol': 0,
                     'acc_vol_end': acc_vol_end ,
                     'acc_val_end': acc_val_end,
                     'total_value':total_val,
                     'capital':capital,
                     'order_cost':0},
                    index=[index])   
                trade_records.update(record)
                
            else : 
                #按收盘价卖出,收盘价计算金额,卖出则卖空
                trade_val = acc_vol_end *benchmark.loc[index,'price_end'] # 按开盘价卖出全部
                trade_vol = acc_val_end
                order_cost = trade_val * cost_ration
                capital =capital+trade_val-order_cost
                acc_vol_end =   acc_val_end- trade_vol
                acc_val_end = acc_vol_end * benchmark.loc[index,'price_end']  # 按收盘计算
                total_val= acc_val_end + capital
                record=pd.DataFrame({'operation':-1,
                     'trade_val':trade_val, 
                     'trade_vol': trade_vol,
                     'acc_vol_end': acc_vol_end,
                     'acc_val_end': acc_val_end,
                     'total_value':total_val,
                     'capital':capital,
                     'order_cost':order_cost},
                    index=[index])   
                print( "sell, date:%s, trade_val:%.2f, acc_val_end:%.2f total_val:%.2f"%(index, trade_val,acc_val_end,total_val))
                trade_records.update(record)
                holdCash= True
                sellCount=sellCount+1

        else :
            acc_val_end = acc_vol_end * benchmark.loc[index,'price_end']  # 按收盘计算
            total_val= acc_val_end + capital
            record=pd.DataFrame({'operation':0,
                 'trade_val':0, 
                 'trade_vol': 0,
                 'acc_vol_end': acc_vol_end ,
                 'acc_val_end': acc_val_end,
                 'total_value':total_val,
                 'capital':capital,
                 'order_cost':0},
                index=[index])   
            trade_records.update(record)
            #print("common_day, date:%s,acc_val_end:%.2f,total_val: %.2f "%(index,acc_val_end,total_val))     
    print("sellCount:%d,buy_count:%d "%(sellCount,buyCount))     
    return trade_records



# 获取北向数据
bx= prepareBeixiangData()
# 绘制布林线
#bx.plot(x='datetime',y=['total_net_in','lower_interval','upper_interval','mean'] )

# 获取上证指数为参考
bm= prepareShangZheng()
# 进行回测
records= backTrade(bx,bm,delay=0)

#开始比较时间
s_date = datetime.datetime.strptime('20161206', '%Y%m%d')
bm_selected =bm[bm['datetime']>s_date]
# 将参考指数归一化,初始为10000
norm_bm_price = bm_selected['price_end']/ bm_selected.iloc[0]['price_end']*10000
# 将操作纪录归一化,初始为10000
records_select = records[records.index>s_date]
norm_records_total_val = records_select['total_value']/ records_select.iloc[0]['total_value']*10000

norm_bm_price.plot.line(subplots= True,x='datetime',y='price_end',style='b',label='benchMark')
norm_records_total_val.plot.line(subplots=True,style='r',label='beixiang')
plt.grid(True)
plt.show()

参考资料

《年化46%的北向资金+20日涨幅的创业板策略》https://www.joinquant.com/view/community/detail/a5fb2c8b9644ea49441cfcfffb735f13

20210101首发于3dobe.com
本站链接:http://3dobe.com/archives/263/

微服务介绍与实践总结

近年来,微服务与DevOps等概念不断热炒。两者实际上是紧密相联,又相辅相成,Docker、Mesos、Kubernates等技术方案的快速崛起,为微服务提供了更坚实土壤,使其得以更顺利地实施落地。 类似spirng-boot等技术的发展与大为传播,更是直接促进了微服务成熟化发展。本文将从与对传统单体架构服务与微服务的比较,介绍微服务,并在最后对DevOps与微服务联系做简单介绍。

- 更多 -

基于lucene的内嵌式kv存储

应用背景

诸多业务场景下,都有使用kv型式存储数据供快速查询的需求。正常的做法有使用HashMap存入内存,或者存入外部的nosql KV数据库/缓存。

  • 使用HashMap做KV存储,速度快,但是如果数据量达到百万及至千万级时,HashMap必将占用大量的java堆内存,给应用带来极大的内存回收压力。
  • 外部kv存储,以堆外(offHeap)存储的方式让我们的应用免于内存回收之忧,但其查询性能往往低于内存map。假设采用外部db的方式作kv存储,就会引入服务之间的通信成本,以基于LR(逻辑回归)实现推荐系统的打分服务为例,每次打分,须执行近求成百上万次kv查询(lr参数的查询),如此的查询量对性能的要求是极高的,如果每一次查询都要查询外部服务,那么网络io势必占用大量的时间。

此外,在工作中会发现很多算法问题,都会被转换为一种追求效率的搜索问题,高效的内嵌式kv存储就会显得更有价值。

- 更多 -

真实流量压测工具 tcpcopy应用浅析

极为合理的测试需求

“双一十”将近,作为一个电商应用的开发人员,也是时候操心一下自己管理服务的性能问题了。平时跑得好好的服务应用,能否承受的住双十一,尤其是午夜时刻的流量冲击?到底是要加机器还是要做服务降级?机器加多少?服务降级降多少.....为回答这一系列的问题,你首先要知道服务的极限是多少。按我们这种非专业测试人士的设想,测试方法应满足如下需求:

- 更多 -

lucene事务(译与解)

本文分两部份,第一部份为译:是对是对于lucene事务的一篇佳作《Transactional Lucene》的翻译。第二部份为解:是本人对一文中提到一些概念在源码层次的一些理解分析,参考lucene源码版本为4.10.4。《Transactional Lucene》中还提到了多commit在实际生产中的一些妙用,值得参考。

- 更多 -