Files
quant/code/chan.py
2019-11-21 12:02:03 +08:00

444 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

def build(df_dr):
#一、先合并
group_no_last = 0 #最后的组号
ingroup_n_last = 0 #最后一个在组内的n
group_first_n = 0 #最后一个组的第一个n
chan_low_pre = None
chan_high_pre = None
for n, k in df_dr.iterrows():
open = k.open
close = k.close
low = k.low
high = k.high
#每条的变量
updown='flat' #升降
is_group=False #是否在组里
is_group_m=False #是否组长,组长是最高的
group_no=0
group_high=0
group_low=0
if(chan_low_pre==None or chan_high_pre==None):
chan_low_pre = low
chan_high_pre = high
else:
#上一个原始K
k_pre = df_dr.iloc[n-1-1]
low_pre = k_pre.low
high_pre = k_pre.high
#上升下降
#上升(跟原始K比较平均高度)
if high_pre+low_pre < high+low:
updown='up'
#下降(跟原始K比较平均高度)
elif high_pre+low_pre > high+low:
updown='down'
elif open<close:
updown = 'up'
elif open<close:
updown = 'down'
else:
updown = 'flat'
#包含 #不能太多的包含,限制一组成员个数
if ((chan_high_pre<=high and chan_low_pre>=low) or (chan_high_pre>=high and chan_low_pre<=low)) and n<group_first_n+5:
#组长
if chan_high_pre<=high and chan_low_pre>=low:
is_group_m = True
elif chan_high_pre>=high and chan_low_pre<=low and ingroup_n_last!=n-1: #新组并且前面的高,需要设置前面的是组长
df_dr.at[n-1,'group_master']= True
group_high=max(high, chan_high_pre)
group_low=min(low, chan_low_pre)
chan_high_pre = group_high
chan_low_pre = group_low
#上一个本来在组内,这个合并到原来的组
if ingroup_n_last==n-1:
group_no=group_no_last
#新组
else:
group_no=group_no_last+1
group_first_n=n-1
group_no_last=group_no
is_group=True
ingroup_n_last=n
else:
chan_low_pre = low
chan_high_pre = high
#写标识值到表中
df_dr.at[n,'updown']= updown
df_dr.at[n,'group']= is_group
df_dr.at[n,'group_no']= group_no
df_dr.at[n,'group_high']= group_high
df_dr.at[n,'group_low']= group_low
df_dr.at[n,'group_master']= is_group_m
if is_group:
df_dr.at[n-1,'group']= is_group
df_dr.at[n-1,'group_no']= group_no
#刷新同组内的group_high and group_low
for j in range(group_first_n, n):
df_dr.at[j,'group_high']= group_high
df_dr.at[j,'group_low']= group_low
if is_group_m:
df_dr.at[j,'group_master']= False #踢掉原来的组长
#二、找分型
fx_n_maybe_pre = 0 #前面可能是分型的n如果有方向则临时记录
direction_pre = "flat"
chan_low_pre = None
chan_high_pre = None
df_dr['fx']=''
for n, k in df_dr.iterrows():
is_group = k.group
is_group_master = k.group_master
group_high = k.group_high
group_low = k.group_low
low = k.low
high = k.high
is_confirm_fx = False #确定分型,前一个组或者个体是分型,为顶分型
is_confirm_fx_n = 0 #已确定分型的分型顶点
fx_type = ''
if(chan_low_pre==None or chan_high_pre==None):
chan_low_pre=low
chan_high_pre=high
elif (is_group and is_group_master) or not is_group: #只管组长和非组成员
cal_high = high
cal_low = low
if (is_group and is_group_master):
cal_high = group_high
cal_low = group_low
#与前面的比较
if chan_high_pre+chan_low_pre>cal_high+cal_low: #下降
if direction_pre=="up":
is_confirm_fx=True
is_confirm_fx_n=fx_n_maybe_pre
fx_type='top'
direction_pre="down"
fx_n_maybe_pre=n
elif chan_high_pre+chan_low_pre<cal_high+cal_low: #上升
if direction_pre=="down":
is_confirm_fx=True
is_confirm_fx_n=fx_n_maybe_pre
fx_type='bottom'
direction_pre="up"
fx_n_maybe_pre=n
chan_low_pre=cal_low
chan_high_pre=cal_high
#分型写入表
if is_confirm_fx:
df_dr.at[is_confirm_fx_n,'fx']= fx_type
#三、找笔
fx_n_pre = None
node_n_pre = None #node是前面确认的笔的节点
node_type_pre =''
node_high_pre = None
node_low_pre = None
df_dr['bi']=''
for n, k in df_dr.iterrows():
fx_type = k.fx
high = k.high
low = k.low
is_node = False
if fx_type=='top' or fx_type=='bottom':
#第一个分型确定为笔的节点
if fx_n_pre==None:
is_node=True
#与前节点类型相同且趋势延申删除上一个节点node
elif node_type_pre==fx_type and ((fx_type=='bottom' and low<node_low_pre) or (fx_type=='top' and high>node_high_pre)):
df_dr.at[node_n_pre,'bi']= ''
is_node=True
#与前节点类型不同且超过3个差,是新笔
elif node_type_pre!=fx_type and n-node_n_pre>3:
is_node=True
fx_n_pre=n
if is_node:
node_n_pre=n
node_type_pre=fx_type
node_high_pre=high
node_low_pre=low
if is_node:
df_dr.at[n,'bi']= fx_type
#四、找线段 (n是索引从0开始 i是第几个)
dot_i_pre = None #dot是前面确认的线段的节点
dot_type_pre =''
dot_high_pre = None
dot_low_pre = None
#只保留笔的K
df_bi = df_dr.loc[(df_dr["bi"]=='top') | (df_dr["bi"]=='bottom')]
df_dr['line']=''
#找第一个线段,第一个形成走势的三笔(如果上升,第三笔的结束点高于第一笔的结束点)
for i in range(len(df_bi)): #i是笔表的索引 n是原表的index字段即第几个
k=df_bi.iloc[i]
n=df_bi.index[i]
bi_type=k.fx
high=k.high
low=k.low
if i<len(df_bi)-3:
next3n=df_bi.index[i+3]
if bi_type=='top' and df_bi.iloc[i+1].low > df_bi.iloc[i+3].low:
df_dr.at[n,'line']='top'
df_dr.at[next3n,'line']='bottom'
dot_i_pre=i+3
dot_type_pre='bottom'
break
if bi_type=='bottom' and df_bi.iloc[i+1].high < df_bi.iloc[i+3].high:
df_dr.at[n,'line']='bottom'
df_dr.at[next3n,'line']='top'
dot_i_pre=i+3
dot_type_pre='top'
break
#继续找线段
i=dot_i_pre
while i!=None and i<len(df_bi):
k=df_bi.iloc[i]
n=df_bi.index[i]
bi_type=k.fx
high=k.high
low=k.low
#不确定延申还是反弹,一直往下找,找到能确定的点
j=i+2
is_found=False
while j<len(df_bi):
if dot_type_pre=='top':
if (j-i)%2==0 and j<len(df_bi) and df_bi.iloc[j].high>high: #看是否突破延申
j_n=df_bi.index[j]
df_dr.at[n,'line']=''
df_dr.at[j_n,'line']='top'
dot_i_pre=j_n
dot_type_pre='top'
i=j
is_found=True
break
elif (j-i)%2==1 and j<len(df_bi) and df_bi.iloc[j].low<df_bi.iloc[i+1].low: #没有延申,反弹形成新的线段
j_n=df_bi.index[j]
df_dr.at[j_n,'line']='bottom'
dot_i_pre=j_n
dot_type_pre='bottom'
i=j
is_found=True
break
elif dot_type_pre=='bottom':
if (j-i)%2==0 and j<len(df_bi) and df_bi.iloc[j].low<low: #看是否突破延申
j_n=df_bi.index[j]
df_dr.at[n,'line']=''
df_dr.at[j_n,'line']='bottom'
dot_i_pre=j_n
dot_type_pre='bottom'
i=j
is_found=True
break
elif (j-i)%2==1 and j<len(df_bi) and df_bi.iloc[j].high>df_bi.iloc[i+1].high: #没有延申,反弹形成新的线段
j_n=df_bi.index[j]
j_n=df_bi.index[j]
df_dr.at[j_n,'line']='top'
dot_i_pre=j_n
dot_type_pre='top'
i=j
is_found=True
break
j=j+1
if not is_found:
break #没找到下一个线段点,退出
else:
continue
#修正线段中间的凹凸漏点,例如上升的线段中有笔的低点低于线段的开始点,需要将这个线段的开始点移到这个更低的点,反之亦然
#有时候突然下来一笔,但是不成线段就会遗漏,此处待验证合理性
dot_type_pre =''
dot_n_pre = None
dot_high_pre = None
dot_low_pre = None
i=0
while i<len(df_bi):
n=df_bi.index[i]
k=df_bi.iloc[i]
bi_type=k.fx
high=k.high
low=k.low
dot_type = df_dr.at[n,'line']
if dot_type=='top' or dot_type=='bottom':
dot_n_pre=n
dot_type_pre=dot_type
dot_high_pre=high
dot_low_pre=low
else:
#移动不能使与后面小于三比差,即后面一个如果是线段点,就不处理这个笔点了
if i<len(df_bi)-1:
next_n = df_bi.index[i+1]
next_dot_type = df_dr.at[next_n,'line']
if next_dot_type=='top' or next_dot_type=='bottom':
i=i+1
continue
#移动
if dot_type_pre=='top' and high>dot_high_pre:
df_dr.at[dot_n_pre,'line']=''
df_dr.at[n,'line']='top'
dot_n_pre=n
dot_high_pre=high
dot_low_pre=low
elif dot_type_pre=='bottom' and low<dot_low_pre:
df_dr.at[dot_n_pre,'line']=''
df_dr.at[n,'line']='bottom'
dot_n_pre=n
dot_high_pre=high
dot_low_pre=low
i=i+1
'''
#找笔中枢
#只保留笔的K
df_bi = df_dr.loc[(df_dr["bi"]=='top') | (df_dr["bi"]=='bottom')]
df_dr['b_zs_no']=0
df_dr['b_zg']=0
df_dr['b_zd']=0
df_dr['b_gg']=0
df_dr['b_dd']=0
zs_no=0 #最后一个中枢号
i=0
while i <len(df_bi):
k=df_bi.iloc[i]
n=df_bi.index[i]
bi_type=k.bi
high=k.high
low=k.low
#找从这个点开始可能的中枢
zg=0
zd=0
gg=0
dd=0
j=i+1
while j<len(df_bi):
j_k=df_bi.iloc[j]
j_n=df_bi.index[j]
jp_n=df_bi.index[j-1]
j_bi_type=j_k.bi
j_high=j_k.high
j_low=j_k.low
#print(i,j,zg,zd)
if (j_bi_type=='top' and j_high<zd and j-i>5) or (j_bi_type=='bottom' and j_low>zg and j-i>5):
#找到中枢(这时肯定已经两个点脱离,没有走回去),保存,退出,往后面继续找考虑到两个中枢被一条线连接的情况需要从j-2开始找
# /\ -------------
# / or -------------
# ------------- \
# ------------- \/
#print(i,j,'找到中枢')
zs_no=zs_no+1
df_dr.b_zs_no[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.bi!='')]=zs_no
df_dr.b_zg[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.bi!='')]=zg
df_dr.b_zd[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.bi!='')]=zd
df_dr.b_gg[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.bi!='')]=gg
df_dr.b_dd[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.bi!='')]=dd
i=j-3
break
elif (j-i<=5 and bi_type=='bottom' and zd!=0 and zd<low) or (j-i<=5 and bi_type=='top' and zg!=0 and zg>high):
#前三段zg zd超出起始线段的高、低点中枢破坏
#print(i,j,'zg zd超出起始线段的高、低点中枢破坏')
break
elif j-i<=5 and j-i>2 and zg<zd:
#前三段zg<zd中枢破坏
#print(i,j,'zg<zd中枢破坏',zg,zd)
break
elif (j-i==5 and j_bi_type=='bottom' and j_low>zg) or (j-i==5 and j_bi_type=='top' and j_high<zd):
#第五个点没有回到或跨越zgzd中枢破坏
#print(i,j,'第五个点中枢破坏')
break
if j-i<5: #只对前三个线段作为zg zd的参考点
if j_bi_type=='top':
zg=min(zg,j_high) if zg!=0 else j_high
gg=max(gg,j_high) if gg!=0 else j_high
elif j_bi_type=='bottom':
zd=max(zd,j_low) if zd!=0 else j_low
dd=min(dd,j_low) if dd!=0 else j_low
j=j+1
i=i+1
'''
#找线段中枢
#只保留线段的K
df_line = df_dr.loc[(df_dr["line"]=='bottom') | (df_dr["line"]=='top')]
df_dr['zs_no']=0
df_dr['zg']=0
df_dr['zd']=0
df_dr['gg']=0
df_dr['dd']=0
df_dr['zs_direction']=''
df_dr['zs_confirm_time']=''
df_dr['zs_confirm_price']=''
zs_no=0 #最后一个中枢号
i=0
while i <len(df_line):
k=df_line.iloc[i]
n=df_line.index[i]
line_type=k.line
high=k.high
low=k.low
#找从这个点开始可能的中枢
zg=0
zd=0
gg=0
dd=0
start_direction='down' if k.line=='top' else 'up' #第一个点是top则是下降方向
j=i+1
while j<len(df_line):
j_k=df_line.iloc[j]
j_n=df_line.index[j]
jp_n=df_line.index[j-1]
j_line_type=j_k.line
j_high=j_k.high
j_low=j_k.low
#print(i,j,zg,zd)
if (j_line_type=='top' and j_high<zd and j-i>5) or (j_line_type=='bottom' and j_low>zg and j-i>5):
#找到中枢(这时肯定已经两个点脱离,没有走回去),保存,退出,往后面继续找考虑到两个中枢被一条线连接的情况需要从j-2开始找
# /\ -------------
# / or -------------
# ------------- \
# ------------- \/
#print(i,j,'找到中枢')
zs_no=zs_no+1
end_direction='down' if j_k.line=='top' else 'up' #小尾巴是top则是下降方向
df_dr.zs_no[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.line!='')]=zs_no
df_dr.zg[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.line!='')]=zg
df_dr.zd[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.line!='')]=zd
df_dr.gg[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.line!='')]=gg
df_dr.dd[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.line!='')]=dd
df_dr.zs_direction[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.line!='')]=start_direction+end_direction
df_dr.zs_confirm_time[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.line!='')]=j_k.trade_time
df_dr.zs_confirm_price[(df_dr.index>n) & (df_dr.index<jp_n) & (df_dr.line!='')]=j_k.close
i=j-3
break
elif (j-i<=5 and line_type=='bottom' and zd!=0 and zd<low) or (j-i<=5 and line_type=='top' and zg!=0 and zg>high):
#前三段zg zd超出起始线段的高、低点中枢破坏
#print(i,j,'zg zd超出起始线段的高、低点中枢破坏')
break
elif j-i<=5 and j-i>2 and zg<zd:
#前三段zg<zd中枢破坏
#print(i,j,'zg<zd中枢破坏',zg,zd)
break
elif (j-i==5 and j_line_type=='bottom' and j_low>zg) or (j-i==5 and j_line_type=='top' and j_high<zd):
#第五个点没有回到或跨越zgzd中枢破坏
#print(i,j,'第五个点中枢破坏')
break
if j-i<5: #只对前三个线段作为zg zd的参考点
if j_line_type=='top':
zg=min(zg,j_high) if zg!=0 else j_high
gg=max(gg,j_high) if gg!=0 else j_high
elif j_line_type=='bottom':
zd=max(zd,j_low) if zd!=0 else j_low
dd=min(dd,j_low) if dd!=0 else j_low
j=j+1
i=i+1
return df_dr