import logging from dataclasses import dataclass, field from datetime import datetime, timedelta from typing import List, Optional import httpx logger = logging.getLogger(__name__) @dataclass class OrderResult: success: bool order_no: str = "" message: str = "" ticker: str = "" order_type: str = "" # buy/sell qty: int = 0 price: int = 0 @dataclass class Position: ticker: str name: str qty: int avg_price: float current_price: float pnl_amount: float pnl_rate: float @dataclass class AccountBalance: total_amount: float available_amount: float stock_amount: float pnl_amount: float positions: List[Position] = field(default_factory=list) class KISTradeExecutor: BASE_URL_REAL = "https://openapi.koreainvestment.com:9443" BASE_URL_PAPER = "https://openapivts.koreainvestment.com:29443" def __init__(self, app_key: str, app_secret: str, account_no: str, paper_trade: bool = True): self._app_key = app_key self._app_secret = app_secret self._account_no = account_no self._paper_trade = paper_trade self._base_url = self.BASE_URL_PAPER if paper_trade else self.BASE_URL_REAL self._access_token: Optional[str] = None self._token_expires_at: Optional[datetime] = None # account_no format: "12345678-01" -> CANO="12345678", ACNT_PRDT_CD="01" parts = account_no.split("-") if "-" in account_no else (account_no[:8], account_no[8:] or "01") self._cano = parts[0] self._acnt_prdt_cd = parts[1] if len(parts) > 1 else "01" @property def _is_token_valid(self) -> bool: if not self._access_token or not self._token_expires_at: return False return datetime.now() < self._token_expires_at - timedelta(minutes=5) def _ensure_token(self) -> None: if self._is_token_valid: return url = f"{self._base_url}/oauth2/tokenP" body = { "grant_type": "client_credentials", "appkey": self._app_key, "appsecret": self._app_secret, } with httpx.Client() as client: resp = client.post(url, json=body) resp.raise_for_status() data = resp.json() self._access_token = data["access_token"] expires_in = int(data.get("expires_in", 86400)) self._token_expires_at = datetime.now() + timedelta(seconds=expires_in) logger.info("KIS access token refreshed, expires in %d seconds", expires_in) def _headers(self, tr_id: str) -> dict: self._ensure_token() return { "content-type": "application/json; charset=utf-8", "authorization": f"Bearer {self._access_token}", "appkey": self._app_key, "appsecret": self._app_secret, "tr_id": tr_id, } def _order_tr_id(self, order_type: str) -> str: if self._paper_trade: return "VTTC0802U" if order_type == "buy" else "VTTC0801U" return "TTTC0802U" if order_type == "buy" else "TTTC0801U" def _place_order(self, ticker: str, qty: int, price: Optional[int], order_type: str) -> OrderResult: tr_id = self._order_tr_id(order_type) # ORD_DVSN: "00" = limit order, "01" = market order ord_dvsn = "00" if price else "01" body = { "CANO": self._cano, "ACNT_PRDT_CD": self._acnt_prdt_cd, "PDNO": ticker, "ORD_DVSN": ord_dvsn, "ORD_QTY": str(qty), "ORD_UNPR": str(price or 0), } url = f"{self._base_url}/uapi/domestic-stock/v1/trading/order-cash" try: with httpx.Client() as client: resp = client.post(url, headers=self._headers(tr_id), json=body) resp.raise_for_status() data = resp.json() if data.get("rt_cd") == "0": order_no = data.get("output", {}).get("ODNO", "") logger.info("KIS %s order success: %s qty=%d price=%s order_no=%s", order_type, ticker, qty, price, order_no) return OrderResult( success=True, order_no=order_no, ticker=ticker, order_type=order_type, qty=qty, price=price or 0, message=data.get("msg1", ""), ) else: msg = data.get("msg1", "Unknown error") logger.warning("KIS %s order failed: %s - %s", order_type, ticker, msg) return OrderResult(success=False, ticker=ticker, order_type=order_type, message=msg) except Exception as e: logger.error("KIS %s order exception: %s - %s", order_type, ticker, str(e)) return OrderResult(success=False, ticker=ticker, order_type=order_type, message=str(e)) def place_buy_order(self, ticker: str, qty: int, price: Optional[int] = None) -> OrderResult: return self._place_order(ticker, qty, price, "buy") def place_sell_order(self, ticker: str, qty: int, price: Optional[int] = None) -> OrderResult: return self._place_order(ticker, qty, price, "sell") def get_account_balance(self) -> AccountBalance: tr_id = "VTTC8434R" if self._paper_trade else "TTTC8434R" params = { "CANO": self._cano, "ACNT_PRDT_CD": self._acnt_prdt_cd, "AFHR_FLPR_YN": "N", "OFL_YN": "", "INQR_DVSN": "02", "UNPR_DVSN": "01", "FUND_STTL_ICLD_YN": "N", "FNCG_AMT_AUTO_RDPT_YN": "N", "PRCS_DVSN": "01", "CTX_AREA_FK100": "", "CTX_AREA_NK100": "", } url = f"{self._base_url}/uapi/domestic-stock/v1/trading/inquire-balance" try: with httpx.Client() as client: resp = client.get(url, headers=self._headers(tr_id), params=params) resp.raise_for_status() data = resp.json() positions = [] for item in data.get("output1", []): if int(item.get("hldg_qty", "0")) > 0: positions.append(Position( ticker=item.get("pdno", ""), name=item.get("prdt_name", ""), qty=int(item.get("hldg_qty", "0")), avg_price=float(item.get("pchs_avg_pric", "0")), current_price=float(item.get("prpr", "0")), pnl_amount=float(item.get("evlu_pfls_amt", "0")), pnl_rate=float(item.get("evlu_pfls_rt", "0")), )) output2 = data.get("output2", [{}]) summary = output2[0] if output2 else {} return AccountBalance( total_amount=float(summary.get("tot_evlu_amt", "0")), available_amount=float(summary.get("dnca_tot_amt", "0")), stock_amount=float(summary.get("scts_evlu_amt", "0")), pnl_amount=float(summary.get("evlu_pfls_smtl_amt", "0")), positions=positions, ) except Exception as e: logger.error("KIS balance inquiry failed: %s", str(e)) return AccountBalance(total_amount=0, available_amount=0, stock_amount=0, pnl_amount=0) def get_positions(self) -> List[Position]: balance = self.get_account_balance() return balance.positions