from datetime import datetime import pandas as pd import quantcommon # 흑자 기업이면 1점(당기순이익) def calc_net_income(qc, ticker_list, base_date): net_income_list = qc.get_fs_list_by_account_and_date("'당기순이익'", f"'{base_date}'") net_income_list['score1'] = (net_income_list['값'] > 0).astype(int) # print(net_income_list) result_df = net_income_list[['종목코드', 'score1']] # 원본 데이터프레임에 score 병합 final_df = ticker_list[['종목코드', '종목명', '분류']].merge(result_df, on='종목코드', how='left') # score 값이 NaN인 경우 기본값 0으로 채우기 final_df['score1'] = final_df['score1'].fillna(0).astype(int) return final_df # CFO(영업활동현금흐름) 흑자 기업이면 1점 def calc_cfo(qc, ticker_list, base_date): cfo_list = qc.get_fs_list_by_account_and_date("'*영업에서창출된현금흐름'", f"'{base_date}'") cfo_list['score2'] = (cfo_list['값'] > 0).astype(int) # print(cfo_list) result_df = cfo_list[['종목코드', 'score2']] # 원본 데이터프레임에 score 병합 final_df = ticker_list[['종목코드', '종목명', '분류', 'score1']].merge(result_df, on='종목코드', how='left') # score 값이 NaN인 경우 기본값 0으로 채우기 final_df['score2'] = final_df['score2'].fillna(0).astype(int) return final_df # 신규 주식 발행(유상증사): 전년 없음인 경우 1점 # 제작년과 작년 자본금 변화가 없는 경우로 체크 def calc_capital(qc, ticker_list, base_date): last_year = datetime(base_date.year - 1, base_date.month, base_date.day).date() capital_date = f"'{last_year}', '{base_date}'" # 자본금 capital_list = qc.get_fs_list_by_account_and_date("'자본금'", capital_date) # 기준일별로 피벗 테이블 생성 pivot_df = capital_list.pivot_table( values='값', index='종목코드', columns='기준일', aggfunc='first' ) pivot_df = pivot_df.dropna() # print(pivot_df) # 값 차이 계산 및 score 부여 pivot_df['diff'] = pivot_df[base_date] - pivot_df[last_year] pivot_df['score3'] = (pivot_df['diff'] == 0).astype(int) # 결과 정리 result_df = pivot_df.reset_index()[['종목코드', 'score3']] # 원본 데이터프레임에 score 병합 final_df = ticker_list[['종목코드', '종목명', '분류', 'score1', 'score2']].merge(result_df, on='종목코드', how='left') # score 값이 NaN인 경우 기본값 0으로 채우기 final_df['score3'] = final_df['score3'].fillna(0).astype(int) # score_1_df = final_df[final_df['score'] == 1] return final_df def calc_gpa(qc, ticker_list, base_date): fs_list = qc.get_fs_list_by_account_and_date("'매출총이익', '자산'", f"'{base_date}'") fs_list_pivot = fs_list.pivot(index='종목코드', columns='계정', values='값') fs_list_pivot['GPA'] = fs_list_pivot['매출총이익'] / fs_list_pivot['자산'] # 결과 정리 result_df = fs_list_pivot.reset_index()[['종목코드', 'GPA']] # 티커 테이블과 합침 final_df = ticker_list[['종목코드', '종목명', '분류', 'f-score']].merge(result_df, how='left', on='종목코드') final_df['GPA'] = final_df['GPA'].fillna(-1).astype(float) return final_df def get_ticker_list(qc): ticker_list = qc.get_ticker_list() # 시가총액을 기준으로 정렬 ticker_list['분류'] = pd.qcut(ticker_list['시가총액'], q=[0, 0.2, 0.8, 1.0], # 0-20%, 20-80%, 80-100% 구간 labels=['소형주', '중형주', '대형주']) return ticker_list def get_f_score(qc, base_date): ticker_list = get_ticker_list(qc) apply_net_income = calc_net_income(qc, ticker_list, base_date) apply_cfo = calc_cfo(qc, apply_net_income, base_date) apply_capital = calc_capital(qc, apply_cfo, base_date) # 개별 점수들로 신f-score 계산 apply_capital['f-score'] = apply_capital['score1'] + apply_capital['score2'] + apply_capital['score3'] apply_gpa = calc_gpa(qc, apply_capital, base_date) f_score3 = apply_gpa[apply_gpa['f-score'] == 3] result = f_score3[f_score3['분류'] == '소형주'].sort_values('GPA', ascending=False) # print(f_score3) # fs_list_copy = f_score3[['GPA']].copy() # # print(fs_list_copy) # fs_rank = fs_list_copy.rank(ascending=False, axis=0) # # print(fs_rank) # return f_score3.loc[fs_rank['GPA'] <= 20, ['종목코드', '종목명', '분류', 'f-score', 'GPA']].round(4) return result if __name__ == '__main__': date = datetime(2024, 12, 31).date() print(get_f_score(quantcommon.QuantCommon(), date).head(30))