Files
quant/code/chan.py

444 lines
17 KiB
Python
Raw Permalink Normal View History

2019-11-21 12:02:03 +08:00
def build(df_dr):
#一、先合并
group_no_last = 0 #最后的组号
ingroup_n_last = 0 #最后一个在组内的n
group_first_n = 0 #最后一个组的第一个n
chan_low_pre = None
chan_high_pre = None
for n, k in df_dr.iterrows():
open = k.open
close = k.close
low = k.low
high = k.high
#每条的变量
updown='flat' #升降
is_group=False #是否在组里
is_group_m=False #是否组长,组长是最高的
group_no=0
group_high=0
group_low=0
if(chan_low_pre==None or chan_high_pre==None):
chan_low_pre = low
chan_high_pre = high
else:
#上一个原始K
k_pre = df_dr.iloc[n-1-1]
low_pre = k_pre.low
high_pre = k_pre.high
#上升下降
#上升(跟原始K比较平均高度)
if high_pre+low_pre < high+low:
updown='up'
#下降(跟原始K比较平均高度)
elif high_pre+low_pre > high+low:
updown='down'
elif open<close:
updown = 'up'
elif open<close:
updown = 'down'
else:
updown = 'flat'
#包含 #不能太多的包含,限制一组成员个数
if ((chan_high_pre<=high and chan_low_pre>=low) or (chan_high_pre>=high and chan_low_pre<=low)) and n<group_first_n+5:
#组长
if chan_high_pre<=high and chan_low_pre>=low:
is_group_m = True
elif chan_high_pre>=high and chan_low_pre<=low and ingroup_n_last!=n-1: #新组并且前面的高,需要设置前面的是组长
df_dr.at[n-1,'group_master']= True
group_high=max(high, chan_high_pre)
group_low=min(low, chan_low_pre)
chan_high_pre = group_high
chan_low_pre = group_low
#上一个本来在组内,这个合并到原来的组
if ingroup_n_last==n-1:
group_no=group_no_last
#新组
else:
group_no=group_no_last+1
group_first_n=n-1
group_no_last=group_no
is_group=True
ingroup_n_last=n
else:
chan_low_pre = low
chan_high_pre = high
#写标识值到表中
df_dr.at[n,'updown']= updown
df_dr.at[n,'group']= is_group
df_dr.at[n,'group_no']= group_no
df_dr.at[n,'group_high']= group_high
df_dr.at[n,'group_low']= group_low
df_dr.at[n,'group_master']= is_group_m
if is_group:
df_dr.at[n-1,'group']= is_group
df_dr.at[n-1,'group_no']= group_no
#刷新同组内的group_high and group_low
for j in range(group_first_n, n):
df_dr.at[j,'group_high']= group_high
df_dr.at[j,'group_low']= group_low
if is_group_m:
df_dr.at[j,'group_master']= False #踢掉原来的组长
#二、找分型
fx_n_maybe_pre = 0 #前面可能是分型的n如果有方向则临时记录
direction_pre = "flat"
chan_low_pre = None
chan_high_pre = None
df_dr['fx']=''
for n, k in df_dr.iterrows():
is_group = k.group
is_group_master = k.group_master
group_high = k.group_high
group_low = k.group_low
low = k.low
high = k.high
is_confirm_fx = False #确定分型,前一个组或者个体是分型,为顶分型
is_confirm_fx_n = 0 #已确定分型的分型顶点
fx_type = ''
if(chan_low_pre==None or chan_high_pre==None):
chan_low_pre=low
chan_high_pre=high
elif (is_group and is_group_master) or not is_group: #只管组长和非组成员
cal_high = high
cal_low = low
if (is_group and is_group_master):
cal_high = group_high
cal_low = group_low
#与前面的比较
if chan_high_pre+chan_low_pre>cal_high+cal_low: #下降
if direction_pre=="up":
is_confirm_fx=True
is_confirm_fx_n=fx_n_maybe_pre
fx_type='top'
direction_pre="down"
fx_n_maybe_pre=n
elif chan_high_pre+chan_low_pre<cal_high+cal_low: #上升
if direction_pre=="down":
is_confirm_fx=True
is_confirm_fx_n=fx_n_maybe_pre
fx_type='bottom'
direction_pre="up"
fx_n_maybe_pre=n
chan_low_pre=cal_low
chan_high_pre=cal_high
#分型写入表
if is_confirm_fx:
df_dr.at[is_confirm_fx_n,'fx']= fx_type
#三、找笔
fx_n_pre = None
node_n_pre = None #node是前面确认的笔的节点
node_type_pre =''
node_high_pre = None
node_low_pre = None
df_dr['bi']=''
for n, k in df_dr.iterrows():
fx_type = k.fx
high = k.high
low = k.low
is_node = False
if fx_type=='top' or fx_type=='bottom':
#第一个分型确定为笔的节点
if fx_n_pre==None:
is_node=True
#与前节点类型相同且趋势延申删除上一个节点node
elif node_type_pre==fx_type and ((fx_type=='bottom' and low<node_low_pre) or (fx_type=='top' and high>node_high_pre)):
df_dr.at[node_n_pre,'bi']= ''
is_node=True
#与前节点类型不同且超过3个差,是新笔
elif node_type_pre!=fx_type and n-node_n_pre>3:
is_node=True
fx_n_pre=n
if is_node:
node_n_pre=n
node_type_pre=fx_type
node_high_pre=high
node_low_pre=low
if is_node:
df_dr.at[n,'bi']= fx_type
#四、找线段 (n是索引从0开始 i是第几个)
dot_i_pre = None #dot是前面确认的线段的节点
dot_type_pre =''
dot_high_pre = None
dot_low_pre = None
#只保留笔的K
df_bi = df_dr.loc[(df_dr["bi"]=='top') | (df_dr["bi"]=='bottom')]
df_dr['line']=''
#找第一个线段,第一个形成走势的三笔(如果上升,第三笔的结束点高于第一笔的结束点)
for i in range(len(df_bi)): #i是笔表的索引 n是原表的index字段即第几个
k=df_bi.iloc[i]
n=df_bi.index[i]
bi_type=k.fx
high=k.high
low=k.low
if i<len(df_bi)-3:
next3n=df_bi.index[i+3]
if bi_type=='top' and df_bi.iloc[i+1].low > df_bi.iloc[i+3].low:
df_dr.at[n,'line']='top'
df_dr.at[next3n,'line']='bottom'
dot_i_pre=i+3
dot_type_pre='bottom'
break
if bi_type=='bottom' and df_bi.iloc[i+1].high < df_bi.iloc[i+3].high:
df_dr.at[n,'line']='bottom'
df_dr.at[next3n,'line']='top'
dot_i_pre=i+3
dot_type_pre='top'
break
#继续找线段
i=dot_i_pre
while i!=None and i<len(df_bi):
k=df_bi.iloc[i]
n=df_bi.index[i]
bi_type=k.fx
high=k.high
low=k.low
#不确定延申还是反弹,一直往下找,找到能确定的点
j=i+2
is_found=False
while j<len(df_bi):
if dot_type_pre=='top':
if (j-i)%2==0 and j<len(df_bi) and df_bi.iloc[j].high>high: #看是否突破延申
j_n=df_bi.index[j]
df_dr.at[n,'line']=''
df_dr.at[j_n,'line']='top'
dot_i_pre=j_n
dot_type_pre='top'
i=j
is_found=True
break
elif (j-i)%2==1 and j<len(df_bi) and df_bi.iloc[j].low<df_bi.iloc[i+1].low: #没有延申,反弹形成新的线段
j_n=df_bi.index[j]
df_dr.at[j_n,'line']='bottom'
dot_i_pre=j_n
dot_type_pre='bottom'
i=j
is_found=True
break
elif dot_type_pre=='bottom':
if (j-i)%2==0 and j<len(df_bi) and df_bi.iloc[j].low<low: #看是否突破延申
j_n=df_bi.index[j]
df_dr.at[n,'line']=''
df_dr.at[j_n,'line']='bottom'
dot_i_pre=j_n
dot_type_pre='bottom'
i=j
is_found=True
break
elif (j-i)%2==1 and j<len(df_bi) and df_bi.iloc[j].high>df_bi.iloc[i+1].high: #没有延申,反弹形成新的线段
j_n=df_bi.index[j]
j_n=df_bi.index[j]
df_dr.at[j_n,'line']='top'
dot_i_pre=j_n
dot_type_pre='top'
i=j
is_found=True
break
j=j+1
if not is_found:
break #没找到下一个线段点,退出
else:
continue
#修正线段中间的凹凸漏点,例如上升的线段中有笔的低点低于线段的开始点,需要将这个线段的开始点移到这个更低的点,反之亦然
#有时候突然下来一笔,但是不成线段就会遗漏,此处待验证合理性
dot_type_pre =''
dot_n_pre = None
dot_high_pre = None
dot_low_pre = None
i=0
while i<len(df_bi):
n=df_bi.index[i]
k=df_bi.iloc[i]
bi_type=k.fx
high=k.high
low=k.low
dot_type = df_dr.at[n,'line']
if dot_type=='top' or dot_type=='bottom':
dot_n_pre=n
dot_type_pre=dot_type
dot_high_pre=high
dot_low_pre=low
else:
#移动不能使与后面小于三比差,即后面一个如果是线段点,就不处理这个笔点了
if i<len(df_bi)-1:
next_n = df_bi.index[i+1]
next_dot_type = df_dr.at[next_n,'line']
if next_dot_type=='top' or next_dot_type=='bottom':
i=i+1
continue
#移动
if dot_type_pre=='top' and high>dot_high_pre:
df_dr.at[dot_n_pre,'line']=''
df_dr.at[n,'line']='top'
dot_n_pre=n
dot_high_pre=high
dot_low_pre=low
elif dot_type_pre=='bottom' and low<dot_low_pre:
df_dr.at[dot_n_pre,'line']=''
df_dr.at[n,'line']='bottom'
dot_n_pre=n
dot_high_pre=high
dot_low_pre=low
i=i+1
'''
#找笔中枢
#只保留笔的K
df_bi = df_dr.loc[(df_dr["bi"]=='top') | (df_dr["bi"]=='bottom')]
df_dr['b_zs_no']=0
df_dr['b_zg']=0
df_dr['b_zd']=0
df_dr['b_gg']=0
df_dr['b_dd']=0
zs_no=0 #最后一个中枢号
i=0
while i <len(df_bi):
k=df_bi.iloc[i]
n=df_bi.index[i]
bi_type=k.bi
high=k.high
low=k.low
#找从这个点开始可能的中枢
zg=0
zd=0
gg=0
dd=0
j=i+1
while j<len(df_bi):
j_k=df_bi.iloc[j]
j_n=df_bi.index[j]
jp_n=df_bi.index[j-1]
j_bi_type=j_k.bi
j_high=j_k.high
j_low=j_k.low
#print(i,j,zg,zd)
if (j_bi_type=='top' and j_high<zd and j-i>5) or (j_bi_type=='bottom' and j_low>zg and j-i>5):
#找到中枢(这时肯定已经两个点脱离,没有走回去),保存,退出,往后面继续找考虑到两个中枢被一条线连接的情况需要从j-2开始找
# /\ -------------
# / or -------------
# ------------- \
# ------------- \/
#print(i,j,'找到中枢')
zs_no=zs_no+1
df_dr.b_zs_no[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.bi!='')]=zs_no
df_dr.b_zg[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.bi!='')]=zg
df_dr.b_zd[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.bi!='')]=zd
df_dr.b_gg[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.bi!='')]=gg
df_dr.b_dd[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.bi!='')]=dd
i=j-3
break
elif (j-i<=5 and bi_type=='bottom' and zd!=0 and zd<low) or (j-i<=5 and bi_type=='top' and zg!=0 and zg>high):
#前三段zg zd超出起始线段的高、低点中枢破坏
#print(i,j,'zg zd超出起始线段的高、低点中枢破坏')
break
elif j-i<=5 and j-i>2 and zg<zd:
#前三段zg<zd中枢破坏
#print(i,j,'zg<zd中枢破坏',zg,zd)
break
elif (j-i==5 and j_bi_type=='bottom' and j_low>zg) or (j-i==5 and j_bi_type=='top' and j_high<zd):
#第五个点没有回到或跨越zgzd中枢破坏
#print(i,j,'第五个点中枢破坏')
break
if j-i<5: #只对前三个线段作为zg zd的参考点
if j_bi_type=='top':
zg=min(zg,j_high) if zg!=0 else j_high
gg=max(gg,j_high) if gg!=0 else j_high
elif j_bi_type=='bottom':
zd=max(zd,j_low) if zd!=0 else j_low
dd=min(dd,j_low) if dd!=0 else j_low
j=j+1
i=i+1
'''
#找线段中枢
#只保留线段的K
df_line = df_dr.loc[(df_dr["line"]=='bottom') | (df_dr["line"]=='top')]
df_dr['zs_no']=0
df_dr['zg']=0
df_dr['zd']=0
df_dr['gg']=0
df_dr['dd']=0
df_dr['zs_direction']=''
df_dr['zs_confirm_time']=''
df_dr['zs_confirm_price']=''
zs_no=0 #最后一个中枢号
i=0
while i <len(df_line):
k=df_line.iloc[i]
n=df_line.index[i]
line_type=k.line
high=k.high
low=k.low
#找从这个点开始可能的中枢
zg=0
zd=0
gg=0
dd=0
start_direction='down' if k.line=='top' else 'up' #第一个点是top则是下降方向
j=i+1
while j<len(df_line):
j_k=df_line.iloc[j]
j_n=df_line.index[j]
jp_n=df_line.index[j-1]
j_line_type=j_k.line
j_high=j_k.high
j_low=j_k.low
#print(i,j,zg,zd)
if (j_line_type=='top' and j_high<zd and j-i>5) or (j_line_type=='bottom' and j_low>zg and j-i>5):
#找到中枢(这时肯定已经两个点脱离,没有走回去),保存,退出,往后面继续找考虑到两个中枢被一条线连接的情况需要从j-2开始找
# /\ -------------
# / or -------------
# ------------- \
# ------------- \/
#print(i,j,'找到中枢')
zs_no=zs_no+1
end_direction='down' if j_k.line=='top' else 'up' #小尾巴是top则是下降方向
df_dr.zs_no[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.line!='')]=zs_no
df_dr.zg[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.line!='')]=zg
df_dr.zd[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.line!='')]=zd
df_dr.gg[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.line!='')]=gg
df_dr.dd[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.line!='')]=dd
df_dr.zs_direction[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.line!='')]=start_direction+end_direction
df_dr.zs_confirm_time[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.line!='')]=j_k.trade_time
df_dr.zs_confirm_price[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.line!='')]=j_k.close
i=j-3
break
elif (j-i<=5 and line_type=='bottom' and zd!=0 and zd<low) or (j-i<=5 and line_type=='top' and zg!=0 and zg>high):
#前三段zg zd超出起始线段的高、低点中枢破坏
#print(i,j,'zg zd超出起始线段的高、低点中枢破坏')
break
elif j-i<=5 and j-i>2 and zg<zd:
#前三段zg<zd中枢破坏
#print(i,j,'zg<zd中枢破坏',zg,zd)
break
elif (j-i==5 and j_line_type=='bottom' and j_low>zg) or (j-i==5 and j_line_type=='top' and j_high<zd):
#第五个点没有回到或跨越zgzd中枢破坏
#print(i,j,'第五个点中枢破坏')
break
if j-i<5: #只对前三个线段作为zg zd的参考点
if j_line_type=='top':
zg=min(zg,j_high) if zg!=0 else j_high
gg=max(gg,j_high) if gg!=0 else j_high
elif j_line_type=='bottom':
zd=max(zd,j_low) if zd!=0 else j_low
dd=min(dd,j_low) if dd!=0 else j_low
j=j+1
i=i+1
return df_dr