103 lines
4.1 KiB
Python
103 lines
4.1 KiB
Python
from datetime import datetime
|
|
import pandas as pd
|
|
import quantcommon
|
|
|
|
# 흑자 기업이면 1점(당기순이익)
|
|
def calc_net_income(qc, base_date):
|
|
net_income_list = qc.get_fs_list_by_account_and_date("'당기순이익'", f"'{base_date}'")
|
|
net_income_list['score1'] = (net_income_list['값'] > 0).astype(int)
|
|
|
|
return net_income_list[['종목코드', 'score1']]
|
|
|
|
|
|
# CFO(영업활동현금흐름) 흑자 기업이면 1점
|
|
def calc_cfo(qc, base_date):
|
|
cfo_list = qc.get_fs_list_by_account_and_date("'*영업에서창출된현금흐름'", f"'{base_date}'")
|
|
cfo_list['score2'] = (cfo_list['값'] > 0).astype(int)
|
|
|
|
return cfo_list[['종목코드', 'score2']]
|
|
|
|
|
|
# 신규 주식 발행(유상증사): 전년 없음인 경우 1점
|
|
# 제작년과 작년 자본금 변화가 없는 경우로 체크
|
|
def calc_capital(qc, base_date):
|
|
last_year = datetime(base_date.year - 1, base_date.month, base_date.day).date()
|
|
capital_date = f"'{last_year}', '{base_date}'"
|
|
|
|
# 자본금
|
|
capital_list = qc.get_fs_list_by_account_and_date("'자본금'", capital_date)
|
|
|
|
# 기준일별로 피벗 테이블 생성
|
|
pivot_df = capital_list.pivot_table(
|
|
values='값',
|
|
index='종목코드',
|
|
columns='기준일',
|
|
aggfunc='first'
|
|
)
|
|
pivot_df = pivot_df.dropna()
|
|
|
|
# 값 차이 계산 및 score 부여
|
|
pivot_df['diff'] = pivot_df[base_date] - pivot_df[last_year]
|
|
pivot_df['score3'] = (pivot_df['diff'] == 0).astype(int)
|
|
|
|
# 결과 정리
|
|
return pivot_df.reset_index()[['종목코드', 'score3']]
|
|
|
|
|
|
def calc_gpa(qc, base_date):
|
|
fs_list = qc.get_fs_list_by_account_and_date("'매출총이익', '자산'", f"'{base_date}'")
|
|
fs_list_pivot = fs_list.pivot(index='종목코드', columns='계정', values='값')
|
|
fs_list_pivot['GPA'] = fs_list_pivot['매출총이익'] / fs_list_pivot['자산']
|
|
|
|
# 결과 정리
|
|
return fs_list_pivot.reset_index()[['종목코드', 'GPA']]
|
|
|
|
def get_ticker_list(qc):
|
|
ticker_list = qc.get_ticker_list()
|
|
# 시가총액을 기준으로 정렬
|
|
ticker_list['분류'] = pd.qcut(ticker_list['시가총액'],
|
|
q=[0, 0.2, 0.8, 1.0], # 0-20%, 20-80%, 80-100% 구간
|
|
labels=['소형주', '중형주', '대형주'])
|
|
|
|
return ticker_list[['종목코드', '종목명', '분류', '종가']]
|
|
|
|
def get_f_score(qc, base_date):
|
|
ticker_list = get_ticker_list(qc)
|
|
score1_list = calc_net_income(qc, base_date)
|
|
score2_list = calc_cfo(qc, base_date)
|
|
score3_list = calc_capital(qc, base_date)
|
|
gpa_list = calc_gpa(qc, base_date)
|
|
|
|
# score 1 병합 + NaN인 경우 기본값 0
|
|
merge_score1 = ticker_list.merge(score1_list, on='종목코드', how='left')
|
|
merge_score1['score1'] = merge_score1['score1'].fillna(0).astype(int)
|
|
|
|
# score 2 병합 + NaN인 경우 기본값 0
|
|
merge_score2 = merge_score1.merge(score2_list, on='종목코드', how='left')
|
|
merge_score2['score2'] = merge_score2['score2'].fillna(0).astype(int)
|
|
|
|
# score 3 병합 + NaN인 경우 기본값 0
|
|
merge_score3 = merge_score2.merge(score3_list, on='종목코드', how='left')
|
|
merge_score3['score3'] = merge_score3['score3'].fillna(0).astype(int)
|
|
|
|
# 개별 점수들로 신f-score 계산
|
|
merge_score3['f-score'] = merge_score3['score1'] + merge_score3['score2'] + merge_score3['score3']
|
|
|
|
# GPA 병합 + NaN인 경우 기본 값 -1(내림차순 정렬 시에 하위 순위를 받게 하려고)
|
|
final_df = merge_score3.merge(gpa_list, on='종목코드', how='left')
|
|
final_df['GPA'] = final_df['GPA'].fillna(-1).astype(float)
|
|
|
|
f_score3 = final_df[final_df['f-score'] == 3].round(4)
|
|
result = f_score3[f_score3['분류'] == '소형주'].sort_values('GPA', ascending=False)
|
|
|
|
# print(f_score3)
|
|
# fs_list_copy = f_score3[['GPA']].copy()
|
|
# # print(fs_list_copy)
|
|
# fs_rank = fs_list_copy.rank(ascending=False, axis=0)
|
|
# # print(fs_rank)
|
|
# return f_score3.loc[fs_rank['GPA'] <= 20, ['종목코드', '종목명', '분류', 'f-score', 'GPA']].round(4)
|
|
return result
|
|
|
|
if __name__ == '__main__':
|
|
date = datetime(2024, 12, 31).date()
|
|
print(get_f_score(quantcommon.QuantCommon(), date).head(30)) |