Merge remote-tracking branch 'origin/master'
Some checks are pending
CI Build / build (push) Waiting to run

This commit is contained in:
ayuriel 2025-03-29 10:33:23 +09:00
commit 68615e2c53
28 changed files with 1079 additions and 19 deletions

View File

@ -1,2 +1,9 @@
# 실행
streamlit run .\streamlit-quant\app.py --server.port 20000
# pre-
Go to Build Tools for Visual Studio 2017
Select free download under Visual Studio Community 2017. This will download the installer. Run the installer.
Select what you need under workload tab:
a. Under Windows, there are three choices. Only check Desktop development with C++.
b. Under Web & Cloud, there are seven choices. Only check Python development (I believe this is optional, but I have done it).

View File

@ -8,6 +8,7 @@ from tqdm import tqdm
import quantcommon
# src/current-financial-statement.py 로 개선
# DB 연결
common = quantcommon.QuantCommon()
engine = common.create_engine()

View File

@ -10,6 +10,7 @@ from bs4 import BeautifulSoup
from dotenv import load_dotenv
import quantcommon
# src/current-stock.py 로 개선
load_dotenv()
GEN_OTP_URL = 'http://data.krx.co.kr/comm/fileDn/GenerateOTP/generate.cmd'

View File

@ -11,6 +11,8 @@ from tqdm import tqdm
import quantcommon
# src/current-price.py 로 개선
# DB 연결
common = quantcommon.QuantCommon()
engine = common.create_engine()

View File

@ -0,0 +1,28 @@
import yfinance as yf
import pandas as pd
import statsmodels.api as sm
KOSPI_CODE = '^KS11'
KIIUM = '039490.KS'
# KOSPI 코드(^KS11), 전통적인 고베타주인 증권주 중 키움증권(039490.KS)
tickers = [KOSPI_CODE, KIIUM]
all_data = {}
for ticker in tickers:
all_data[ticker] = yf.download(ticker, start="2020-01-01", end="2024-12-31")
# print(all_data)
# for tic, data in all_data.items():
# print(f"{tic}: {type(data)} -> {type(data['Close'])}")
# print(data['Close'])
# 종가(Close)에 해당하는 열만 선택해서 데이터프레임으로 가공
prices = pd.DataFrame({tic: data['Close'].squeeze() for tic, data in all_data.items()})
# 수익률 계산(pct_change), NA 데이터 삭제(dropna)
ret = prices.pct_change().dropna()
ret['intercept'] = 1
reg = sm.OLS(ret[[KIIUM]], ret[[KOSPI_CODE, 'intercept']]).fit()
print(reg.summary())
print(reg.params)

View File

@ -5,6 +5,7 @@ import statsmodels.api as sm
import numpy as np
import quantcommon
# strategy/momentum에 구현
# 모멘텀 포트폴리오. 최근 12개월 수익률이 높은 주식
engine = quantcommon.QuantCommon().create_engine()

View File

@ -2,6 +2,7 @@ import pandas as pd
import numpy as np
import quantcommon
# strategy/value 에서 구현
#가치주 포트폴리오. PER, PBR이 낮은 회사 20개
# DB 연결

View File

@ -6,6 +6,8 @@ import matplotlib.pyplot as plt
import seaborn as sns
import quantcommon
# strategy/multi-factor에서 구현
# 멀티 팩터 포트폴리오.
# 퀄리티: 자기자본이익률(ROE), 매출총이익(GPA), 영업활동현금흐름(CFO)
# 밸류: PER, PBR, PSR, PCR, DY

View File

@ -4,6 +4,8 @@ import matplotlib.pyplot as plt
import seaborn as sns
import quantcommon
# strategy/quality에서 구현
# 퀄리티(우량주) 포트폴리오. 영업수익성이 높은 주식
engine = quantcommon.QuantCommon().create_engine()

View File

@ -1,6 +1,7 @@
import os
from urllib.parse import quote_plus
import pandas as pd
import pymysql
from dotenv import load_dotenv
from sqlalchemy import create_engine
@ -26,3 +27,114 @@ class QuantCommon:
port=self.port,
db=self.db,
charset='utf8')
def get_ticker_list(self):
engine = self.create_engine()
try:
ticker_list = pd.read_sql("""
select * from kor_ticker
where 기준일 = (select max(기준일) from kor_ticker)
and 종목구분 = '보통주';
""", con=engine)
finally:
engine.dispose()
return ticker_list
def get_value_list(self):
engine = self.create_engine()
try:
value_list = pd.read_sql("""
select * from kor_value
where 기준일 = (select max(기준일) from kor_value);
""", con=engine)
finally:
engine.dispose()
return value_list
def get_price_list(self, interval_month):
engine = self.create_engine()
try:
price_list = pd.read_sql(f"""
select 날짜, 종가, 종목코드
from kor_price
where 날짜 >= (select (select max(날짜) from kor_price) - interval {interval_month} month);
""", con=engine)
finally:
engine.dispose()
return price_list
def get_price_list_by_code(self, codes):
engine = self.create_engine()
try:
price_list = pd.read_sql(f"""
select * from kor_price
where 종목코드 in ({codes});
""", con=engine)
finally:
engine.dispose()
return price_list
def get_fs_list(self):
engine = self.create_engine()
try:
fs_list = pd.read_sql("""
select * from kor_fs
where 계정 in ('당기순이익', '매출총이익', '영업활동으로인한현금흐름', '자산', '자본')
and 공시구분 = 'q';
""", con=engine)
finally:
engine.dispose()
return fs_list
def get_fs_list_by_account_and_date(self, account, date):
engine = self.create_engine()
try:
fs_list = pd.read_sql(f"""
select * from kor_fs
where 계정 in ({account})
and 기준일 in ({date})
and 공시구분 = 'y';
""", con=engine)
finally:
engine.dispose()
return fs_list
def get_expanded_fs_list(self):
engine = self.create_engine()
try:
fs_list = pd.read_sql("""
select * from kor_fs
where 계정 in ('매출액', '당기순이익', '법인세비용', '이자비용', '현금및현금성자산',
'부채', '유동부채', '유동자산', '비유동자산', '감가상각비')
and 공시구분 = 'q';
""", con=engine)
finally:
engine.dispose()
return fs_list
def get_sector_list(self):
engine = self.create_engine()
try:
sector_list = pd.read_sql("""
select * from kor_sector
where 기준일 = (select max(기준일) from kor_sector);
""", con=engine)
finally:
engine.dispose()
return sector_list

View File

@ -1,19 +0,0 @@
import pandas as pd
import numpy as np
import quantcommon
# DB 연결
engine = quantcommon.QuantCommon().create_engine()
ticker_list = pd.read_sql("""
select * from kor_ticker
where 기준일 = (select max(기준일) from kor_ticker)
and 종목구분 = '보통주';
""", con=engine)
value_list = pd.read_sql("""
select * from kor_value
where 기준일 = (select max(기준일) from kor_value);
""", con=engine)
engine.dispose()

View File

@ -0,0 +1 @@
__all__ = ['backtest', 'strategy']

View File

@ -0,0 +1,48 @@
import bt
import matplotlib.pyplot as plt
import pandas as pd
import quantcommon
import streamlit_quant.strategy.multi_factor as multi_factor
import streamlit_quant.strategy.magic_formula as magic_formula
qc = quantcommon.QuantCommon()
mf = multi_factor.get_multi_factor_top(qc, 20)
magic_formula = magic_formula.get_magic_formula_top(20)
codes = ','.join(magic_formula['종목코드'].array)
price = qc.get_price_list_by_code(codes)
# price = price.set_index(['날짜'])
# price.rename(columns={"날짜": "Date"})
price["Date"] = pd.to_datetime(price["날짜"])
pivot_df = price.pivot(index="Date", columns="종목코드", values="종가")
# print(pivot_df.tail)
strategy = bt.Strategy("Asset_EW", [
bt.algos.SelectAll(), # 모든 데이터 사용
bt.algos.WeighEqually(), # 동일 비중 투자
bt.algos.RunMonthly(), # 매 월말 리밸런싱
bt.algos.Rebalance() # 계산된 비중에 따라 리밸런싱
])
# 가격 데이터 중 시작 시점이 모두 다르므로, dropna() 함수를 통해 NA를 모두 제거하여 시작 시점을 맞춤
pivot_df.dropna(inplace=True)
# 백테스트 생성
backtest = bt.Backtest(strategy, pivot_df)
# 백테스트 실행
result = bt.run(backtest)
# prices: 누적 수익률이 데이터프레임 형태로 나타나며, 시작 시점을 100으로 환산하여 계산
# to_returns: 수익률 계산
# print(result.prices.to_returns())
result.plot(figsize=(10, 6), legend=False)
plt.show()
result.display()

View File

@ -0,0 +1,130 @@
import re
import time
import pandas as pd
import requests as rq
from bs4 import BeautifulSoup
from tqdm import tqdm
import quantcommon
# 재무제표 크롤링
def get_ticker_list():
engine = quantcommon.QuantCommon().create_engine()
# 티커리스트 불러오기
ticker_list = {}
try:
ticker_list = pd.read_sql("""
select * from kor_ticker
where 기준일 = (select max(기준일) from kor_ticker)
and 종목구분 = '보통주';
""", con=engine)
finally:
engine.dispose()
return ticker_list
# 재무제표 클렌징 함수
def clean_fs(df, ticker, frequency):
df = df[~df.loc[:, ~df.columns.isin(['계정'])].isna().all(axis=1)]
df = df.drop_duplicates(['계정'], keep='first')
df = pd.melt(df, id_vars='계정', var_name='기준일', value_name='')
df = df[~pd.isnull(df[''])]
df['계정'] = df['계정'].replace({'계산에 참여한 계정 펼치기': ''}, regex=True)
df['기준일'] = pd.to_datetime(df['기준일'],
format='%Y/%m') + pd.tseries.offsets.MonthEnd()
df['종목코드'] = ticker
df['공시구분'] = frequency
return df
# ticker 별 재무제표 조회해서 DB에 저장
def process_for_fs(ticker_list):
# DB 연결
common = quantcommon.QuantCommon()
engine = common.create_engine()
con = common.connect()
mycursor = con.cursor()
# DB 저장 쿼리
query = """
insert into kor_fs (계정, 기준일, , 종목코드, 공시구분)
values (%s,%s,%s,%s,%s) as new
on duplicate key update
=new.
"""
# 오류 발생시 저장할 리스트 생성
error_list = []
# for loop
for i in tqdm(range(0, len(ticker_list))):
# 티커 선택
ticker = ticker_list['종목코드'][i]
# 오류 발생 시 이를 무시하고 다음 루프로 진행
try:
# url 생성
url = f'https://comp.fnguide.com/SVO2/ASP/SVD_Finance.asp?pGB=1&gicode=A{ticker}'
# 데이터 받아오기
data = pd.read_html(url, displayed_only=False)
# 연간 데이터
data_fs_y = pd.concat([
data[0].iloc[:, ~data[0].columns.str.contains('전년동기')], data[2],
data[4]
])
data_fs_y = data_fs_y.rename(columns={data_fs_y.columns[0]: "계정"})
# 결산년 찾기
page_data = rq.get(url)
page_data_html = BeautifulSoup(page_data.content, 'html.parser')
fiscal_data = page_data_html.select('div.corp_group1 > h2')
fiscal_data_text = fiscal_data[1].text
fiscal_data_text = re.findall('[0-9]+', fiscal_data_text)
# 결산년에 해당하는 계정만 남기기
data_fs_y = data_fs_y.loc[:, (data_fs_y.columns == '계정') | (
data_fs_y.columns.str[-2:].isin(fiscal_data_text))]
# 클렌징
data_fs_y_clean = clean_fs(data_fs_y, ticker, 'y')
# 분기 데이터
data_fs_q = pd.concat([
data[1].iloc[:, ~data[1].columns.str.contains('전년동기')], data[3],
data[5]
])
data_fs_q = data_fs_q.rename(columns={data_fs_q.columns[0]: "계정"})
data_fs_q_clean = clean_fs(data_fs_q, ticker, 'q')
# 두개 합치기
data_fs_bind = pd.concat([data_fs_y_clean, data_fs_q_clean])
# 재무제표 데이터를 DB에 저장
args = data_fs_bind.values.tolist()
mycursor.executemany(query, args)
con.commit()
except:
# 오류 발생시 해당 종목명을 저장하고 다음 루프로 이동
print(ticker)
error_list.append(ticker)
# 타임슬립 적용
time.sleep(2)
# DB 연결 종료
engine.dispose()
con.close()
if __name__ == '__main__':
tickers = get_ticker_list()
process_for_fs(tickers)

View File

@ -0,0 +1,101 @@
# 패키지 불러오기
import time
from datetime import date
from io import BytesIO
import pandas as pd
import requests as rq
from dateutil.relativedelta import relativedelta
from tqdm import tqdm
import quantcommon
# 주가 크롤링
def get_ticker_list():
engine = quantcommon.QuantCommon().create_engine()
# 티커리스트 불러오기
ticker_list = {}
try:
ticker_list = pd.read_sql("""
select * from kor_ticker
where 기준일 = (select max(기준일) from kor_ticker)
and 종목구분 = '보통주';
""", con=engine)
finally:
engine.dispose()
return ticker_list
def process_for_price(ticker_list):
# DB 저장 쿼리
query = """
insert into kor_price (날짜, 시가, 고가, 저가, 종가, 거래량, 종목코드)
values (%s,%s,%s,%s,%s,%s,%s) as new
on duplicate key update
시가 = new.시가, 고가 = new.고가, 저가 = new.저가,
종가 = new.종가, 거래량 = new.거래량;
"""
# DB 연결
common = quantcommon.QuantCommon()
engine = common.create_engine()
con = common.connect()
mycursor = con.cursor()
# 오류 발생시 저장할 리스트 생성
error_list = []
# 전종목 주가 다운로드 및 저장
for i in tqdm(range(0, len(ticker_list))):
# 티커 선택
ticker = ticker_list['종목코드'][i]
# 시작일과 종료일
# fr = (date.today() + relativedelta(years=-5)).strftime("%Y%m%d")
to = (date.today()).strftime("%Y%m%d")
fr = '20250125'
# 오류 발생 시 이를 무시하고 다음 루프로 진행
try:
# url 생성
url = f'''https://fchart.stock.naver.com/siseJson.nhn?symbol={ticker}&requestType=1
&startTime={fr}&endTime={to}&timeframe=day'''
# 데이터 다운로드
data = rq.get(url).content
data_price = pd.read_csv(BytesIO(data))
# 데이터 클렌징
price = data_price.iloc[:, 0:6]
price.columns = ['날짜', '시가', '고가', '저가', '종가', '거래량']
price = price.dropna()
price['날짜'] = price['날짜'].str.extract("(\d+)")
price['날짜'] = pd.to_datetime(price['날짜'])
price['종목코드'] = ticker
# 주가 데이터를 DB에 저장
args = price.values.tolist()
mycursor.executemany(query, args)
con.commit()
except:
# 오류 발생시 error_list에 티커 저장하고 넘어가기
print(ticker)
error_list.append(ticker)
# 타임슬립 적용
time.sleep(2)
# DB 연결 종료
engine.dispose()
con.close()
if __name__ == '__main__':
ticker_list = get_ticker_list()
process_for_price(ticker_list)

View File

@ -187,6 +187,7 @@ def save_sector(sector):
if __name__ == '__main__':
# sector와 ticker 갱신
latest_biz_day = get_latest_biz_day()
process_for_total_stock(latest_biz_day)
process_for_wics(latest_biz_day)

View File

@ -0,0 +1 @@
__all__ = ['multi_factor']

View File

@ -0,0 +1,36 @@
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import quantcommon
#가치주 포트폴리오. PER, PBR, PCR, PSR, DY
def get_all_value_top(count):
qc = quantcommon.QuantCommon()
ticker_list = qc.get_ticker_list()
value_list = qc.get_value_list()
# 가치 지표가 0이하인 경우 nan으로 변경
value_list.loc[value_list[''] <= 0, ''] = np.nan
# 가치지표 테이블을 가로로 긴 형태로 변경
value_pivot = value_list.pivot(index='종목코드', columns='지표', values='')
# 티커 테이블과 가치 지표 테이블을 합침
data_bind = ticker_list[['종목코드', '종목명']].merge(value_pivot,
how='left',
on='종목코드')
value_list_copy = data_bind.copy()
# DY(배당수익률)만 높을수록 좋은 지표라서 역수
value_list_copy['DY'] = 1 / value_list_copy['DY']
value_list_copy = value_list_copy[['PER', 'PBR', 'PCR', 'PSR', 'DY']]
value_rank_all = value_list_copy.rank(axis=0)
mask = np.triu(value_rank_all.corr())
fig, ax = plt.subplots(figsize=(10, 6))
sns.heatmap(value_rank_all.corr(),annot=True,mask=mask, annot_kws={"size": 16}, vmin=0, vmax=1, center=0.5,cmap='coolwarm',square=True)
ax.invert_yaxis()
# plt.show()
value_sum_all = value_rank_all.sum(axis=1, skipna=False).rank()
return data_bind.loc[value_sum_all <= count]
if __name__ == '__main__':
print(get_all_value_top(20))

View File

@ -0,0 +1,103 @@
from datetime import datetime
import pandas as pd
import quantcommon
# 흑자 기업이면 1점(당기순이익)
def calc_net_income(qc, base_date):
net_income_list = qc.get_fs_list_by_account_and_date("'당기순이익'", f"'{base_date}'")
net_income_list['score1'] = (net_income_list[''] > 0).astype(int)
return net_income_list[['종목코드', 'score1']]
# CFO(영업활동현금흐름) 흑자 기업이면 1점
def calc_cfo(qc, base_date):
cfo_list = qc.get_fs_list_by_account_and_date("'*영업에서창출된현금흐름'", f"'{base_date}'")
cfo_list['score2'] = (cfo_list[''] > 0).astype(int)
return cfo_list[['종목코드', 'score2']]
# 신규 주식 발행(유상증사): 전년 없음인 경우 1점
# 제작년과 작년 자본금 변화가 없는 경우로 체크
def calc_capital(qc, base_date):
last_year = datetime(base_date.year - 1, base_date.month, base_date.day).date()
capital_date = f"'{last_year}', '{base_date}'"
# 자본금
capital_list = qc.get_fs_list_by_account_and_date("'자본금'", capital_date)
# 기준일별로 피벗 테이블 생성
pivot_df = capital_list.pivot_table(
values='',
index='종목코드',
columns='기준일',
aggfunc='first'
)
pivot_df = pivot_df.dropna()
# 값 차이 계산 및 score 부여
pivot_df['diff'] = pivot_df[base_date] - pivot_df[last_year]
pivot_df['score3'] = (pivot_df['diff'] == 0).astype(int)
# 결과 정리
return pivot_df.reset_index()[['종목코드', 'score3']]
def calc_gpa(qc, base_date):
fs_list = qc.get_fs_list_by_account_and_date("'매출총이익', '자산'", f"'{base_date}'")
fs_list_pivot = fs_list.pivot(index='종목코드', columns='계정', values='')
fs_list_pivot['GPA'] = fs_list_pivot['매출총이익'] / fs_list_pivot['자산']
# 결과 정리
return fs_list_pivot.reset_index()[['종목코드', 'GPA']]
def get_ticker_list(qc):
ticker_list = qc.get_ticker_list()
# 시가총액을 기준으로 정렬
ticker_list['분류'] = pd.qcut(ticker_list['시가총액'],
q=[0, 0.2, 0.8, 1.0], # 0-20%, 20-80%, 80-100% 구간
labels=['소형주', '중형주', '대형주'])
return ticker_list[['종목코드', '종목명', '분류', '종가']]
def get_f_score(qc, base_date):
ticker_list = get_ticker_list(qc)
score1_list = calc_net_income(qc, base_date)
score2_list = calc_cfo(qc, base_date)
score3_list = calc_capital(qc, base_date)
gpa_list = calc_gpa(qc, base_date)
# score 1 병합 + NaN인 경우 기본값 0
merge_score1 = ticker_list.merge(score1_list, on='종목코드', how='left')
merge_score1['score1'] = merge_score1['score1'].fillna(0).astype(int)
# score 2 병합 + NaN인 경우 기본값 0
merge_score2 = merge_score1.merge(score2_list, on='종목코드', how='left')
merge_score2['score2'] = merge_score2['score2'].fillna(0).astype(int)
# score 3 병합 + NaN인 경우 기본값 0
merge_score3 = merge_score2.merge(score3_list, on='종목코드', how='left')
merge_score3['score3'] = merge_score3['score3'].fillna(0).astype(int)
# 개별 점수들로 신f-score 계산
merge_score3['f-score'] = merge_score3['score1'] + merge_score3['score2'] + merge_score3['score3']
# GPA 병합 + NaN인 경우 기본 값 -1(내림차순 정렬 시에 하위 순위를 받게 하려고)
final_df = merge_score3.merge(gpa_list, on='종목코드', how='left')
final_df['GPA'] = final_df['GPA'].fillna(-1).astype(float)
f_score3 = final_df[final_df['f-score'] == 3].round(4)
result = f_score3[f_score3['분류'] == '소형주'].sort_values('GPA', ascending=False)
# print(f_score3)
# fs_list_copy = f_score3[['GPA']].copy()
# # print(fs_list_copy)
# fs_rank = fs_list_copy.rank(ascending=False, axis=0)
# # print(fs_rank)
# return f_score3.loc[fs_rank['GPA'] <= 20, ['종목코드', '종목명', '분류', 'f-score', 'GPA']].round(4)
return result
if __name__ == '__main__':
date = datetime(2024, 12, 31).date()
print(get_f_score(quantcommon.QuantCommon(), date).head(30))

View File

@ -0,0 +1,79 @@
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import quantcommon
# 마법 공식 포트폴리오. 밸류와 퀄리티의 조합. 조엘 그린블라트의 '마법공식'
def get_magic_formula_top(count):
qc = quantcommon.QuantCommon()
ticker_list = qc.get_ticker_list()
fs_list = qc.get_expanded_fs_list()
fs_list = fs_list.sort_values(['종목코드', '계정', '기준일'])
# TTM 값을 구하기 위해서 rolling() 메소드를 통해 4분기 합 구함. 4분기 데이터가 없는 경우 제외하기 위해서 min_periods=4
fs_list['ttm'] = fs_list.groupby(['종목코드', '계정'], as_index=False)[''].rolling(
window=4, min_periods=4).sum()['']
fs_list_clean = fs_list.copy()
# 재무상태표 현황(부채, 유동부채, 유동자산, 비유동자산)은 평균값 사용
fs_list_clean['ttm'] = np.where(
fs_list_clean['계정'].isin(['부채', '유동부채', '유동자산', '비유동자산']),
fs_list_clean['ttm'] / 4, fs_list_clean['ttm'])
fs_list_clean = fs_list_clean.groupby(['종목코드', '계정']).tail(1)
fs_list_pivot = fs_list_clean.pivot(index='종목코드', columns='계정', values='ttm')
data_bind = ticker_list[['종목코드', '종목명', '시가총액']].merge(fs_list_pivot,
how='left',
on='종목코드')
# 티커 테이블에 있는 시가총액은 원, 제무제표 테이블은 억 단위이므로 단위를 맞춤
data_bind['시가총액'] = data_bind['시가총액'] / 100000000
# 이익수익률 계산식: 이자 및 법인세 차감전 이익(EBIT) / 가입가치(시가총액 + 순차입금)
# 분자(EBIT)
magic_ebit = data_bind['당기순이익'] + data_bind['법인세비용'] + data_bind['이자비용']
# 분모
magic_cap = data_bind['시가총액']
magic_debt = data_bind['부채']
## 분모: 여유자금
magic_excess_cash = data_bind['유동부채'] - data_bind['유동자산'] + data_bind[
'현금및현금성자산']
magic_excess_cash[magic_excess_cash < 0] = 0
magic_excess_cash_final = data_bind['현금및현금성자산'] - magic_excess_cash
magic_ev = magic_cap + magic_debt - magic_excess_cash_final
# 이익수익률
magic_ey = magic_ebit / magic_ev
# 투하자본 수익률
magic_ic = (data_bind['유동자산'] - data_bind['유동부채']) + (data_bind['비유동자산'] -
data_bind['감가상각비'])
magic_roc = magic_ebit / magic_ic
# 열 입력하기
data_bind['이익 수익률'] = magic_ey
data_bind['투하자본 수익률'] = magic_roc
magic_rank = (magic_ey.rank(ascending=False, axis=0) +
magic_roc.rank(ascending=False, axis=0)).rank(axis=0)
return data_bind.loc[magic_rank <= 20, ['종목코드', '종목명', '이익 수익률', '투하자본 수익률']].round(4)
# data_bind['투자구분'] = np.where(magic_rank <= 20, '마법공식', '기타')
#
# plt.rc('font', family='Malgun Gothic')
# plt.subplots(1, 1, figsize=(10, 6))
# sns.scatterplot(data=data_bind,
# x='이익 수익률',
# y='투하자본 수익률',
# hue='투자구분',
# style='투자구분',
# s=200)
# plt.xlim(0, 1)
# plt.ylim(0, 1)
# plt.show()
if __name__ == '__main__':
print(get_magic_formula_top(20))

View File

@ -0,0 +1,88 @@
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import statsmodels.api as sm
import numpy as np
import quantcommon
def print_graph(values):
plt.rc('font', family='Malgun Gothic')
g = sns.relplot(data=values,
x='날짜',
y='종가',
col='종목코드',
col_wrap=5,
kind='line',
facet_kws={
'sharey': False,
'sharex': True
})
g.set(xticklabels=[])
g.set(xlabel=None)
g.set(ylabel=None)
g.fig.set_figwidth(15)
g.fig.set_figheight(8)
plt.subplots_adjust(wspace=0.5, hspace=0.2)
plt.show()
# strategy/momentum에 구현
# 모멘텀 포트폴리오. 최근 12개월 수익률이 높은 주식
def get_momentum_top(count):
qc = quantcommon.QuantCommon()
ticker_list = qc.get_ticker_list()
price_list = qc.get_price_list(interval_month=12)
price_pivot = price_list.pivot(index='날짜', columns='종목코드', values='종가')
# 가격 테이블에서 (가장 끝 행 / 가장 첫 행)으로 각 종목의 12개월 수익률을 구함
ret_list = pd.DataFrame(data=(price_pivot.iloc[-1] / price_pivot.iloc[0]) - 1,
columns=['return'])
data_bind = ticker_list[['종목코드', '종목명']].merge(ret_list, how='left', on='종목코드')
# 12개월 수익률 열 순위를 구함. 지표가 높을 수록 좋으니 ascending=False
momentum_rank = data_bind['return'].rank(axis=0, ascending=False)
# 모멘텀만 가지고 순위 측정
price_momentum = price_list[price_list['종목코드'].isin(
data_bind.loc[momentum_rank <= count, '종목코드'])]
# 해당 종목들(모멘텀 상위 count 개)의 가격 그래프 확인
# print_graph(price_momentum)
# k-ratio(모멘텀의 꾸준함 지표)
# pct_change() 함수로 각 종목의 수익률 계산하고 수익률이 곗나되지 않는 첫 번째 행은 제외
ret = price_pivot.pct_change().iloc[1:]
# 로그 누적 수익률 계산
ret_cum = np.log(1 + ret).cumsum()
# x축은 기간
x = np.array(range(len(ret)))
k_ratio = {}
for i in range(0, len(ticker_list)):
ticker = data_bind.loc[i, '종목코드']
try:
y = ret_cum.loc[:, price_pivot.columns == ticker]
reg = sm.OLS(y, x).fit()
res = float(reg.params / reg.bse)
except:
res = np.nan
k_ratio[ticker] = res
k_ratio_bind = pd.DataFrame.from_dict(k_ratio, orient='index').reset_index()
k_ratio_bind.columns = ['종목코드', 'K_ratio']
k_ratio_bind.head()
data_bind = data_bind.merge(k_ratio_bind, how='left', on='종목코드')
k_ratio_rank = data_bind['K_ratio'].rank(axis=0, ascending=False)
momentum_top = data_bind[k_ratio_rank <= count]
k_ratio_momentum = price_list[price_list['종목코드'].isin(data_bind.loc[k_ratio_rank <= count, '종목코드'])]
print_graph(k_ratio_momentum)
return momentum_top
if __name__ == '__main__':
print(get_momentum_top(20))

View File

@ -0,0 +1,251 @@
import pandas as pd
import numpy as np
import statsmodels.api as sm
from scipy.stats import zscore
import matplotlib.pyplot as plt
import seaborn as sns
# 멀티 팩터 포트폴리오.
# 퀄리티: 자기자본이익률(ROE), 매출총이익(GPA), 영업활동현금흐름(CFO)
# 밸류: PER, PBR, PSR, PCR, DY
# 모멘텀: 12개월 수익률, K-Ratio
# 각 섹터별 아웃라이어를 제거한 후 순위와 z-score를 구하는 함수
def col_clean(df, cutoff=0.01, asc=False):
q_low = df.quantile(cutoff)
q_hi = df.quantile(1 - cutoff)
# 이상치 데이터 제거
df_trim = df[(df > q_low) & (df < q_hi)]
df_z_score = df_trim.rank(axis=0, ascending=asc).apply(
zscore, nan_policy='omit')
return df_z_score
def plot_rank(df):
ax = sns.relplot(data=df,
x='rank',
y=1,
col='variable',
hue='invest',
size='size',
sizes=(10, 100),
style='invest',
markers={'Y': 'X','N': 'o'},
palette={'Y': 'red','N': 'grey'},
kind='scatter')
ax.set(xlabel=None)
ax.set(ylabel=None)
plt.show()
def get_multi_factor_top(qc, count):
ticker_list = qc.get_ticker_list()
fs_list = qc.get_fs_list()
value_list = qc.get_value_list()
price_list = qc.get_price_list(12)
sector_list = qc.get_sector_list()
# 퀄리티 지표 계산
fs_list = fs_list.sort_values(['종목코드', '계정', '기준일'])
fs_list['ttm'] = fs_list.groupby(['종목코드', '계정'], as_index=False)[''].rolling(
window=4, min_periods=4).sum()['']
fs_list_clean = fs_list.copy()
fs_list_clean['ttm'] = np.where(fs_list_clean['계정'].isin(['자산', '자본']),
fs_list_clean['ttm'] / 4, fs_list_clean['ttm'])
fs_list_clean = fs_list_clean.groupby(['종목코드', '계정']).tail(1)
fs_list_pivot = fs_list_clean.pivot(index='종목코드', columns='계정', values='ttm')
fs_list_pivot['ROE'] = fs_list_pivot['당기순이익'] / fs_list_pivot['자본']
fs_list_pivot['GPA'] = fs_list_pivot['매출총이익'] / fs_list_pivot['자산']
fs_list_pivot['CFO'] = fs_list_pivot['영업활동으로인한현금흐름'] / fs_list_pivot['자산']
fs_list_pivot.round(4).head()
value_list.loc[value_list[''] <= 0, ''] = np.nan
value_pivot = value_list.pivot(index='종목코드', columns='지표', values='')
value_pivot.head()
# 가치 지표 계산
# 음수를 제거하고 행으로 긴 형태로 변경
price_pivot = price_list.pivot(index='날짜', columns='종목코드', values='종가')
ret_list = pd.DataFrame(data=(price_pivot.iloc[-1] / price_pivot.iloc[0]) - 1,
columns=['12M'])
ret = price_pivot.pct_change().iloc[1:]
ret_cum = np.log(1 + ret).cumsum()
x = np.array(range(len(ret)))
k_ratio = {}
for i in range(0, len(ticker_list)):
ticker = ticker_list.loc[i, '종목코드']
try:
y = ret_cum.loc[:, price_pivot.columns == ticker]
reg = sm.OLS(y, x).fit()
res = float(reg.params / reg.bse)
except:
res = np.nan
k_ratio[ticker] = res
k_ratio_bind = pd.DataFrame.from_dict(k_ratio, orient='index').reset_index()
k_ratio_bind.columns = ['종목코드', 'K_ratio']
k_ratio_bind.head()
# 가격 테이블을 이용해서 최근 12개월 수익률을 구하고
# 로그 누적 수익률을 통해 각 종목별 K-Ratio를 계산
data_bind = ticker_list[['종목코드', '종목명']].merge(
sector_list[['CMP_CD', 'SEC_NM_KOR']],
how='left',
left_on='종목코드',
right_on='CMP_CD').merge(
fs_list_pivot[['ROE', 'GPA', 'CFO']], how='left',
on='종목코드').merge(value_pivot, how='left',
on='종목코드').merge(ret_list, how='left',
on='종목코드').merge(k_ratio_bind,
how='left',
on='종목코드')
data_bind.loc[data_bind['SEC_NM_KOR'].isnull(), 'SEC_NM_KOR'] = '기타'
data_bind = data_bind.drop(['CMP_CD'], axis=1)
data_bind.round(4).head()
# 종목코드와 섹터정보(SEC_NM_KOR)를 인덱스로 설정한 후, 섹터에 따른 그룹을 묶어준다.
data_bind_group = data_bind.set_index(['종목코드',
'SEC_NM_KOR']).groupby('SEC_NM_KOR', as_index=False)
data_bind_group.head(1).round(4)
# 퀄리티 지표의 z-score를 계산
# 퀄리티 지표에 해당하는 열(ROE, GPA, CFO) 선택해서 col_clean() 적용한 후 순위의 z-score 계산
# sum() 함수를 통해 z-score의 합을 구하며, to_frame() 메소드를 통해 데이터프레임 형태로 변경
z_quality = data_bind_group[['ROE', 'GPA', 'CFO'
]].apply(lambda x: col_clean(x, 0.01, False)).sum(
axis=1, skipna=False).to_frame('z_quality')
# data_bind 테이블과 합치며, z_quality 열에는 퀄리티 지표의 z-score가 표시
data_bind = data_bind.merge(z_quality, how='left', on=['종목코드', 'SEC_NM_KOR'])
data_bind.round(4).head()
# 가치 지표의 z-score 계산
# 가치 지표에 해당하는 열(PBR, PCR, PER, PSR) 선택해서 col_clean() 적용, 오름차순
value_1 = data_bind_group[['PBR', 'PCR', 'PER',
'PSR']].apply(lambda x: col_clean(x, 0.01, True))
# DY(배당수익률)의 경우 내림차순으로 계산
value_2 = data_bind_group[['DY']].apply(lambda x: col_clean(x, 0.01, False))
# 두 결과를 합쳐 z-score의 합을 구한 후, 데이터프레임 형태로 변경
z_value = value_1.merge(value_2, on=['종목코드', 'SEC_NM_KOR'
]).sum(axis=1,
skipna=False).to_frame('z_value')
# data_bind 테이블과 합치며, z_value 열에는 밸류 지표의 z-score가 표시
data_bind = data_bind.merge(z_value, how='left', on=['종목코드', 'SEC_NM_KOR'])
data_bind.round(4).head()
# 모멘텀 지표의 z-score 계산
# 모멘텀 지표에 해당하는 열(12M, K_ratio) 선택 후 col_clean() 적용
z_momentum = data_bind_group[[
'12M', 'K_ratio'
]].apply(lambda x: col_clean(x, 0.01, False)).sum(
axis=1, skipna=False).to_frame('z_momentum')
# data_bind 테이블과 합치며, z_momentum 열에는 모멘텀 지표의 z-score가 표시
data_bind = data_bind.merge(z_momentum, how='left', on=['종목코드', 'SEC_NM_KOR'])
data_bind.round(4).head()
# 팩터 분포를 시각화
data_z = data_bind[['z_quality', 'z_value', 'z_momentum']].copy()
fig, axes = plt.subplots(3, 1, figsize=(10, 6), sharex=True, sharey=True)
for n, ax in enumerate(axes.flatten()):
ax.hist(data_z.iloc[:, n])
ax.set_title(data_z.columns[n], size=12)
fig.tight_layout()
# plt.show()
# 팩터 분포가 동일하지 않으니 z-score를 다시 계산해서 분포의 넓이를 비슷하게 맞춤
# 종목 코드와 각 팩터의 z-score만 선택한 후, 종목 코드를 인덱스로 설정
# apply()를 통해서 z-score 다시 계산
data_bind_final = data_bind[['종목코드', 'z_quality', 'z_value', 'z_momentum'
]].set_index('종목코드').apply(zscore,
nan_policy='omit')
# 열 이름 설정
data_bind_final.columns = ['quality', 'value', 'momentum']
plt.rc('font', family='Malgun Gothic')
plt.rc('axes', unicode_minus=False)
fig, axes = plt.subplots(3, 1, figsize=(10, 6), sharex=True, sharey=True)
for n, ax in enumerate(axes.flatten()):
ax.hist(data_bind_final.iloc[:, n])
ax.set_title(data_bind_final.columns[n], size=12)
fig.tight_layout()
# plt.show()
# 각 팩터간 상관 관계 확인
mask = np.triu(data_bind_final.corr())
fig, ax = plt.subplots(figsize=(10, 6))
sns.heatmap(data_bind_final.corr(),
annot=True,
mask=mask,
annot_kws={"size": 16},
vmin=0,
vmax=1,
center=0.5,
cmap='coolwarm',
square=True)
ax.invert_yaxis()
# plt.show()
# 각 팩터간 상관관계가 매우 낮으며, 여러 팩터를 동시에 고려함으로써 분산효과를 기대할 수 있다.
# 이제 계산된 팩터들을 토대로 최종 포트폴리오를 구성해 보자.
# 각 팩터를 동일 비중으로 설정. 0.2, 0.4, 0.4 등 중요하다고 생각되는 팩터에 비중을 다르게도 지정할 수 있음
wts = [0.3, 0.3, 0.3]
# 위에서 설정한 비율을 반영해서 데이터프레임 형태로 변경
data_bind_final_sum = (data_bind_final * wts).sum(axis=1,
skipna=False).to_frame()
data_bind_final_sum.columns = ['qvm']
# 기본 테이블(data_bind)과 합침
port_qvm = data_bind.merge(data_bind_final_sum, on='종목코드')
# 최종 z-score의 합(qvm) 기준 순위가 20위 이내인 경우 투자 종목에 해당하니 Y로 표시, 나머진 N
port_qvm['invest'] = np.where(port_qvm['qvm'].rank() <= count, 'Y', 'N')
# round()는 DataFrame 객체 내의 요소를 반올림하는 메서드
return port_qvm[port_qvm['invest'] == 'Y'].round(4)
# 이하 선택된 종목과 선택되지 않은 종목들 간의 특성을 그리기 위한 코드
# data_melt = port_qvm.melt(id_vars='invest',
# value_vars=[
# 'ROE', 'GPA', 'CFO', 'PER', 'PBR', 'PCR', 'PSR',
# 'DY', '12M', 'K_ratio'
# ])
#
# data_melt['size'] = data_melt['invest'].map({'Y': 100, 'N': 10})
# data_melt.head()
#
# hist_quality = data_melt[data_melt['variable'].isin(['ROE', 'GPA',
# 'CFO'])].copy()
# hist_quality['rank'] = hist_quality.groupby('variable')['value'].rank(
# ascending=False)
# plot_rank(hist_quality)
#
# hist_value = data_melt[data_melt['variable'].isin(
# ['PER', 'PBR', 'PCR', 'PSR', 'DY'])].copy()
# hist_value['value'] = np.where(hist_value['variable'] == 'DY',
# 1 / hist_value['value'], hist_value['value'])
# hist_value['rank'] = hist_value.groupby('variable')['value'].rank()
# plot_rank(hist_value)
#
# hist_momentum = data_melt[data_melt['variable'].isin(['12M', 'K_ratio'])].copy()
# hist_momentum['rank'] = hist_momentum.groupby('variable')['value'].rank(ascending = False)
# plot_rank(hist_momentum)
#
# port_qvm[port_qvm['invest'] == 'Y']['종목코드'].to_excel('model.xlsx', index=False)

View File

@ -0,0 +1,57 @@
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import quantcommon
# 퀄리티(우량주) 포트폴리오. 영업수익성이 높은 주식
def get_quality_top(count):
qc = quantcommon.QuantCommon()
ticker_list = qc.get_ticker_list()
fs_list = qc.get_fs_list()
fs_list = fs_list.sort_values(['종목코드', '계정', '기준일'])
# TTM 값을 구하기 위해서 rolling() 메소드를 통해 4분기 합 구함. 4분기 데이터가 없는 경우 제외하기 위해서 min_periods=4
fs_list['ttm'] = fs_list.groupby(['종목코드', '계정'], as_index=False)[''].rolling(
window=4, min_periods=4).sum()['']
fs_list_clean = fs_list.copy()
# 자산과 자본은 재무상태표 항목이므로 합이 아닌 평균을 구하며, 나머지 항목은 합을 그대로 사용
fs_list_clean['ttm'] = np.where(fs_list_clean['계정'].isin(['자산', '자본']),
fs_list_clean['ttm'] / 4, fs_list_clean['ttm'])
# tail(1)을 통해 종목코드와 계정별 최근 데이터만 선택
fs_list_clean = fs_list_clean.groupby(['종목코드', '계정']).tail(1)
fs_list_pivot = fs_list_clean.pivot(index='종목코드', columns='계정', values='ttm')
# 수익성 지표에 해당하는 ROE, GPA, CFO 계산
fs_list_pivot['ROE'] = fs_list_pivot['당기순이익'] / fs_list_pivot['자본']
fs_list_pivot['GPA'] = fs_list_pivot['매출총이익'] / fs_list_pivot['자산']
fs_list_pivot['CFO'] = fs_list_pivot['영업활동으로인한현금흐름'] / fs_list_pivot['자산']
# 티커 테이블과 합침
quality_list = ticker_list[['종목코드', '종목명']].merge(fs_list_pivot,
how='left',
on='종목코드')
# quality_list.round(count).head()
quality_list_copy = quality_list[['ROE', 'GPA', 'CFO']].copy()
quality_rank = quality_list_copy.rank(ascending=False, axis=0)
mask = np.triu(quality_rank.corr())
fig, ax = plt.subplots(figsize=(10, 6))
sns.heatmap(quality_rank.corr(),
annot=True,
mask=mask,
annot_kws={"size": 16},
vmin=0,
vmax=1,
center=0.5,
cmap='coolwarm',
square=True)
ax.invert_yaxis()
# 위에서 구한 3개 지표들의 순위를 더한 후 다시 순위를 매김
quality_sum = quality_rank.sum(axis=1, skipna=False).rank()
# 최종 순위가 낮은 종목 선택
return quality_list.loc[quality_sum <= count, ['종목코드', '종목명', 'ROE', 'GPA', 'CFO']].round(4)
if __name__ == '__main__':
print(get_quality_top(20))

View File

@ -0,0 +1,26 @@
import numpy as np
import quantcommon
#가치주 포트폴리오. PER, PBR이 낮은 회사 20개
def get_value_top(count):
qc = quantcommon.QuantCommon()
ticker_list = qc.get_ticker_list()
value_list = qc.get_value_list()
# 가치 지표가 0이하인 경우 nan으로 변경
value_list.loc[value_list[''] <= 0, ''] = np.nan
# 가치지표 테이블을 가로로 긴 형태로 변경
value_pivot = value_list.pivot(index='종목코드', columns='지표', values='')
# 티커 테이블과 가치 지표 테이블을 합침
data_bind = ticker_list[['종목코드', '종목명']].merge(value_pivot,
how='left',
on='종목코드')
# rank() 함수로 PER, PBR 열의 순위를 구함. axis=0을 입력하여 순위는 열 방향으로 구함.(PER, PBR 각각 순위)
value_rank = data_bind[['PER', 'PBR']].rank(axis = 0)
# axis=1을 통해서 위에서 구한 순위 랭크를 합침. 합친 것을 다시 rank()
value_sum = value_rank.sum(axis = 1, skipna = False).rank()
return data_bind.loc[value_sum <= count, ['종목코드', '종목명', 'PER', 'PBR']]
if __name__ == '__main__':
print(get_value_top(20))