from datetime import datetime import pandas as pd from streamlit_quant import quantcommon # 흑자 기업이면 1점(당기순이익) def calc_net_income(qc, base_date): net_income_list = qc.get_fs_list_by_account_and_date("'당기순이익'", f"'{base_date}'") net_income_list['score1'] = (net_income_list['값'] > 0).astype(int) return net_income_list[['종목코드', 'score1']] # CFO(영업활동현금흐름) 흑자 기업이면 1점 def calc_cfo(qc, base_date): cfo_list = qc.get_fs_list_by_account_and_date("'*영업에서창출된현금흐름'", f"'{base_date}'") cfo_list['score2'] = (cfo_list['값'] > 0).astype(int) return cfo_list[['종목코드', 'score2']] # 신규 주식 발행(유상증사): 전년 없음인 경우 1점 # 제작년과 작년 자본금 변화가 없는 경우로 체크 def calc_capital(qc, base_date): last_year = datetime(base_date.year - 1, base_date.month, base_date.day).date() capital_date = f"'{last_year}', '{base_date}'" # 자본금 capital_list = qc.get_fs_list_by_account_and_date("'자본금'", capital_date) # 기준일별로 피벗 테이블 생성 pivot_df = capital_list.pivot_table( values='값', index='종목코드', columns='기준일', aggfunc='first' ) pivot_df = pivot_df.dropna() # 값 차이 계산 및 score 부여 pivot_df['diff'] = pivot_df[base_date] - pivot_df[last_year] pivot_df['score3'] = (pivot_df['diff'] == 0).astype(int) # 결과 정리 return pivot_df.reset_index()[['종목코드', 'score3']] def calc_gpa(qc, base_date): fs_list = qc.get_fs_list_by_account_and_date("'매출총이익', '자산'", f"'{base_date}'") fs_list_pivot = fs_list.pivot(index='종목코드', columns='계정', values='값') fs_list_pivot['GPA'] = fs_list_pivot['매출총이익'] / fs_list_pivot['자산'] # 결과 정리 return fs_list_pivot.reset_index()[['종목코드', 'GPA']] def get_ticker_list(qc): ticker_list = qc.get_ticker_list() # 시가총액을 기준으로 정렬 ticker_list['분류'] = pd.qcut(ticker_list['시가총액'], q=[0, 0.2, 0.8, 1.0], # 0-20%, 20-80%, 80-100% 구간 labels=['소형주', '중형주', '대형주']) return ticker_list[['종목코드', '종목명', '분류', '종가']] def get_f_score(qc, base_date): ticker_list = get_ticker_list(qc) score1_list = calc_net_income(qc, base_date) score2_list = calc_cfo(qc, base_date) score3_list = calc_capital(qc, base_date) gpa_list = calc_gpa(qc, base_date) # score 1 병합 + NaN인 경우 기본값 0 merge_score1 = ticker_list.merge(score1_list, on='종목코드', how='left') merge_score1['score1'] = merge_score1['score1'].fillna(0).astype(int) # score 2 병합 + NaN인 경우 기본값 0 merge_score2 = merge_score1.merge(score2_list, on='종목코드', how='left') merge_score2['score2'] = merge_score2['score2'].fillna(0).astype(int) # score 3 병합 + NaN인 경우 기본값 0 merge_score3 = merge_score2.merge(score3_list, on='종목코드', how='left') merge_score3['score3'] = merge_score3['score3'].fillna(0).astype(int) # 개별 점수들로 신f-score 계산 merge_score3['f-score'] = merge_score3['score1'] + merge_score3['score2'] + merge_score3['score3'] # GPA 병합 + NaN인 경우 기본 값 -1(내림차순 정렬 시에 하위 순위를 받게 하려고) final_df = merge_score3.merge(gpa_list, on='종목코드', how='left') final_df['GPA'] = final_df['GPA'].fillna(-1).astype(float) f_score3 = final_df[final_df['f-score'] == 3].round(4) result = f_score3[f_score3['분류'] == '소형주'].sort_values('GPA', ascending=False) # print(f_score3) # fs_list_copy = f_score3[['GPA']].copy() # # print(fs_list_copy) # fs_rank = fs_list_copy.rank(ascending=False, axis=0) # # print(fs_rank) # return f_score3.loc[fs_rank['GPA'] <= 20, ['종목코드', '종목명', '분류', 'f-score', 'GPA']].round(4) return result if __name__ == '__main__': date = datetime(2024, 12, 31).date() print(get_f_score(quantcommon.QuantCommon(), date).head(30))