diff --git a/streamlit_quant/strategy/f_score.py b/streamlit_quant/strategy/f_score.py index a59c50a..7970925 100644 --- a/streamlit_quant/strategy/f_score.py +++ b/streamlit_quant/strategy/f_score.py @@ -3,41 +3,24 @@ import pandas as pd import quantcommon # 흑자 기업이면 1점(당기순이익) -def calc_net_income(qc, ticker_list, base_date): +def calc_net_income(qc, base_date): net_income_list = qc.get_fs_list_by_account_and_date("'당기순이익'", f"'{base_date}'") net_income_list['score1'] = (net_income_list['값'] > 0).astype(int) - # print(net_income_list) - result_df = net_income_list[['종목코드', 'score1']] + return net_income_list[['종목코드', 'score1']] - # 원본 데이터프레임에 score 병합 - final_df = ticker_list[['종목코드', '종목명', '분류']].merge(result_df, on='종목코드', how='left') - - # score 값이 NaN인 경우 기본값 0으로 채우기 - final_df['score1'] = final_df['score1'].fillna(0).astype(int) - - return final_df # CFO(영업활동현금흐름) 흑자 기업이면 1점 -def calc_cfo(qc, ticker_list, base_date): +def calc_cfo(qc, base_date): cfo_list = qc.get_fs_list_by_account_and_date("'*영업에서창출된현금흐름'", f"'{base_date}'") cfo_list['score2'] = (cfo_list['값'] > 0).astype(int) - # print(cfo_list) - result_df = cfo_list[['종목코드', 'score2']] - - # 원본 데이터프레임에 score 병합 - final_df = ticker_list[['종목코드', '종목명', '분류', 'score1']].merge(result_df, on='종목코드', how='left') - - # score 값이 NaN인 경우 기본값 0으로 채우기 - final_df['score2'] = final_df['score2'].fillna(0).astype(int) - - return final_df + return cfo_list[['종목코드', 'score2']] # 신규 주식 발행(유상증사): 전년 없음인 경우 1점 # 제작년과 작년 자본금 변화가 없는 경우로 체크 -def calc_capital(qc, ticker_list, base_date): +def calc_capital(qc, base_date): last_year = datetime(base_date.year - 1, base_date.month, base_date.day).date() capital_date = f"'{last_year}', '{base_date}'" @@ -52,38 +35,22 @@ def calc_capital(qc, ticker_list, base_date): aggfunc='first' ) pivot_df = pivot_df.dropna() - # print(pivot_df) # 값 차이 계산 및 score 부여 pivot_df['diff'] = pivot_df[base_date] - pivot_df[last_year] pivot_df['score3'] = (pivot_df['diff'] == 0).astype(int) # 결과 정리 - result_df = pivot_df.reset_index()[['종목코드', 'score3']] + return pivot_df.reset_index()[['종목코드', 'score3']] - # 원본 데이터프레임에 score 병합 - final_df = ticker_list[['종목코드', '종목명', '분류', 'score1', 'score2']].merge(result_df, on='종목코드', how='left') - # score 값이 NaN인 경우 기본값 0으로 채우기 - final_df['score3'] = final_df['score3'].fillna(0).astype(int) - - # score_1_df = final_df[final_df['score'] == 1] - return final_df - -def calc_gpa(qc, ticker_list, base_date): +def calc_gpa(qc, base_date): fs_list = qc.get_fs_list_by_account_and_date("'매출총이익', '자산'", f"'{base_date}'") fs_list_pivot = fs_list.pivot(index='종목코드', columns='계정', values='값') fs_list_pivot['GPA'] = fs_list_pivot['매출총이익'] / fs_list_pivot['자산'] # 결과 정리 - result_df = fs_list_pivot.reset_index()[['종목코드', 'GPA']] - - # 티커 테이블과 합침 - final_df = ticker_list[['종목코드', '종목명', '분류', 'f-score']].merge(result_df, - how='left', - on='종목코드') - final_df['GPA'] = final_df['GPA'].fillna(-1).astype(float) - return final_df + return fs_list_pivot.reset_index()[['종목코드', 'GPA']] def get_ticker_list(qc): ticker_list = qc.get_ticker_list() @@ -92,20 +59,35 @@ def get_ticker_list(qc): q=[0, 0.2, 0.8, 1.0], # 0-20%, 20-80%, 80-100% 구간 labels=['소형주', '중형주', '대형주']) - return ticker_list + return ticker_list[['종목코드', '종목명', '분류', '종가']] def get_f_score(qc, base_date): ticker_list = get_ticker_list(qc) - apply_net_income = calc_net_income(qc, ticker_list, base_date) - apply_cfo = calc_cfo(qc, apply_net_income, base_date) - apply_capital = calc_capital(qc, apply_cfo, base_date) + score1_list = calc_net_income(qc, base_date) + score2_list = calc_cfo(qc, base_date) + score3_list = calc_capital(qc, base_date) + gpa_list = calc_gpa(qc, base_date) + + # score 1 병합 + NaN인 경우 기본값 0 + merge_score1 = ticker_list.merge(score1_list, on='종목코드', how='left') + merge_score1['score1'] = merge_score1['score1'].fillna(0).astype(int) + + # score 2 병합 + NaN인 경우 기본값 0 + merge_score2 = merge_score1.merge(score2_list, on='종목코드', how='left') + merge_score2['score2'] = merge_score2['score2'].fillna(0).astype(int) + + # score 3 병합 + NaN인 경우 기본값 0 + merge_score3 = merge_score2.merge(score3_list, on='종목코드', how='left') + merge_score3['score3'] = merge_score3['score3'].fillna(0).astype(int) # 개별 점수들로 신f-score 계산 - apply_capital['f-score'] = apply_capital['score1'] + apply_capital['score2'] + apply_capital['score3'] + merge_score3['f-score'] = merge_score3['score1'] + merge_score3['score2'] + merge_score3['score3'] - apply_gpa = calc_gpa(qc, apply_capital, base_date) + # GPA 병합 + NaN인 경우 기본 값 -1(내림차순 정렬 시에 하위 순위를 받게 하려고) + final_df = merge_score3.merge(gpa_list, on='종목코드', how='left') + final_df['GPA'] = final_df['GPA'].fillna(-1).astype(float) - f_score3 = apply_gpa[apply_gpa['f-score'] == 3] + f_score3 = final_df[final_df['f-score'] == 3].round(4) result = f_score3[f_score3['분류'] == '소형주'].sort_values('GPA', ascending=False) # print(f_score3)