diff --git a/.gitignore b/.gitignore index 37053ea..e741ea3 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,202 @@ devenv.local.nix .pre-commit-config.yaml .env + +# Created by https://www.toptal.com/developers/gitignore/api/python,go +# Edit at https://www.toptal.com/developers/gitignore?templates=python,go + +### Go ### +# If you prefer the allow list template instead of the deny list, see community template: +# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore +# +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# Go workspace file +go.work + +### Python ### +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + +### Python Patch ### +# Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration +poetry.toml + +# ruff +.ruff_cache/ + +# LSP config files +pyrightconfig.json + +# End of https://www.toptal.com/developers/gitignore/api/python,go diff --git a/paperone.py b/paperone.py index 3b8a6bf..1ba401c 100755 --- a/paperone.py +++ b/paperone.py @@ -1,208 +1,23 @@ #!/usr/bin/env python -import requests -import math from sys import exit from datetime import datetime, timedelta from dotenv import load_dotenv from typing import NoReturn, List from os import environ -from enum import Enum -from dataclasses import dataclass -from requests.adapters import HTTPAdapter -from urllib3.util.retry import Retry -from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn, TimeRemainingColumn +from paperone.taapi import TaapiClient, IndicatorEnum +from rich.progress import ( + Progress, + SpinnerColumn, + TextColumn, + BarColumn, + TaskProgressColumn, + TimeRemainingColumn, +) load_dotenv() -@dataclass(frozen=True) -class Indicator: - endpoint: str - params: dict[str, int] - - -@dataclass -class QueryResult: - datetime: datetime - value: float - - -class IndicatorEnum(Enum): - # Momentum Indicators - RSI = Indicator(endpoint="rsi", params={"period": 20}) - STOCH = Indicator(endpoint="stoch", params={"fast_k": 14, "slow_k": 3, "slow_d": 3}) - CCI = Indicator(endpoint="cci", params={"period": 20}) - - # Trend Indicators - MACD = Indicator( - endpoint="macd", - params={"fast_period": 12, "slow_period": 26, "signal_period": 9}, - ) - EMA_20 = Indicator(endpoint="ema", params={"period": 20}) - EMA_50 = Indicator(endpoint="ema", params={"period": 50}) - SMA_200 = Indicator(endpoint="sma", params={"period": 200}) - ADX = Indicator(endpoint="adx", params={"period": 14}) - - # Volatility Indicators - BBANDS = Indicator(endpoint="bbands", params={"period": 20, "stddev": 2}) - ATR = Indicator(endpoint="atr", params={"period": 14}) - - # Volume Indicators - OBV = Indicator(endpoint="obv", params={}) - VOLUME = Indicator(endpoint="volume", params={}) - - -class TaapiClient: - def __init__(self, api_key: str) -> None: - self._api_key: str = api_key - self._base_url: str = "https://api.taapi.io" - self._session: requests.Session = self._create_session_with_retries() - - def __build_indicator_url__(self, indicator: Indicator) -> str: - return f"{self._base_url}/{indicator.endpoint}" - - @staticmethod - def _create_session_with_retries() -> requests.Session: - session: requests.Session = requests.Session() - - retry_strategy: Retry = Retry( - total=5, # Maximum 5 retry attempts - backoff_factor=1, # Exponential backoff: 1s, 2s, 4s, 8s, 16s - status_forcelist=[429, 500, 502, 503, 504], # Retry on these HTTP codes - allowed_methods=["GET"], # Only retry GET requests - raise_on_status=False, # Don't raise exceptions, return response - ) - - adapter: HTTPAdapter = HTTPAdapter(max_retries=retry_strategy) - - session.mount("https://", adapter) - session.mount("http://", adapter) - - return session - - def _do_get(self, url, params) -> requests.Response: - timeout = 5 - - return self._session.get(url, params=params, timeout=timeout) - - def query_indicator( - self, - ticker: str, - indicator: Indicator, - target_date: datetime, - interval: str = "1d", - results: int = 14, - ) -> List[QueryResult] | None: - ret: List[QueryResult] = [] - backtrack_candles: int = self.__candles_to_target_date__(target_date, interval) - target_url: str = self.__build_indicator_url__(indicator) - - params: dict[str, str | int | bool] = { - "secret": self._api_key, - "symbol": ticker, - "interval": interval, - "type": "stocks", - "gaps": "false", - "addResultTimestamp": "true", - "backtrack": backtrack_candles, - "results": str(results), - } - - if indicator.params: - params = params | indicator.params - - response = self._do_get(target_url, params) - - if response.status_code != 200: - return None - - data: dict[str, list[float] | list[int]] = response.json() - for val, ts in zip(data["value"], data["timestamp"]): - dt: datetime = datetime.fromtimestamp(ts) - - ret.append(QueryResult(dt, val)) - - return ret - - def query_price_on_day( - self, - ticker: str, - target_date: datetime, - ) -> QueryResult | None: - backtrack_candles: int = self.__candles_to_target_date__(target_date, "1d") - target_url: str = f"{self._base_url}/price" - - params: dict[str, str | int | bool] = { - "secret": self._api_key, - "symbol": ticker, - "interval": "1d", - "type": "stocks", - "gaps": "false", - "addResultTimestamp": "true", - "backtrack": backtrack_candles, - "results": "1", - } - - response = self._do_get(target_url, params) - - if response.status_code != 200: - return None - - data = response.json() - - dt: datetime = ( - datetime.fromtimestamp(data["timestamp"][0]) - if "timestamp" in data - else target_date - ) - - return QueryResult(dt, data["value"][0]) - - @staticmethod - def __candles_to_target_date__( - target_date: datetime, - interval: str = "1h", - current_time: datetime | None = None, - ) -> int: - if current_time is None: - current_time = datetime.now() - - # Calculate time difference - time_diff: datetime = current_time - target_date - time_diff_seconds: float = time_diff.total_seconds() - - # Parse interval to get candle duration in seconds - interval_map: dict[str, int] = { - "1m": 60, - "5m": 300, - "15m": 900, - "30m": 1800, - "1h": 3600, - "2h": 7200, - "4h": 14400, - "12h": 43200, - "1d": 86400, - "1w": 604800, - } - - candle_duration_seconds: int = interval_map[interval] - - # Calculate number of candles (round up) - num_candles: int = math.ceil(time_diff_seconds / candle_duration_seconds) - - return num_candles - - def close(self) -> None: - self._session.close() - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb) -> None: - self.close() - - def is_trading_day(date: datetime) -> bool: return date.weekday() not in [5, 6] @@ -241,7 +56,7 @@ def main() -> NoReturn: exit(0) date = parse_date_yyyymmdd("20250821") - days_range = 14 + days_range = 60 dates_range = days_range_from(date, days_range) tickers = ["AAPL", "NVDA", "AMD", "META", "MSFT", "GOOG"] indicators = list(IndicatorEnum) @@ -253,79 +68,85 @@ def main() -> NoReturn: TaskProgressColumn(), TimeRemainingColumn(), ) as progress: - # Overall ticker progress ticker_task = progress.add_task( "[cyan]Processing tickers...", total=len(tickers) ) - + with TaapiClient(api_key) as client: for ticker in tickers: # Update ticker task progress.update( - ticker_task, - description=f"[cyan]Processing {ticker}..." + ticker_task, description=f"[cyan]Processing {ticker}..." ) - + # Price loading subtask price_task = progress.add_task( - f"[green] └─ Loading prices for {ticker}...", - total=len(dates_range) + f"[green] └─ Loading prices for {ticker}...", + total=len(dates_range), ) - + prices = {} for d in dates_range: progress.update( price_task, description=f"[green] └─ Loading {ticker} price for {format_date_readable(d)}...", - advance=1 + advance=1, ) result = client.query_price_on_day(ticker, d) if result: prices[d.day] = result.value - + # Remove price task when done progress.remove_task(price_task) - + # Indicator loading subtask indicator_task = progress.add_task( - f"[yellow] └─ Loading indicators for {ticker}...", - total=len(indicators) + f"[yellow] └─ Loading indicators for {ticker}...", + total=len(indicators), ) - + for indicator_enum in indicators: progress.update( indicator_task, description=f"[yellow] └─ Loading {ticker} indicator: {indicator_enum.name}...", - advance=1 + advance=1, ) - + try: indicator_results = client.query_indicator( ticker, indicator_enum.value, date, results=days_range ) except Exception as e: - progress.console.print(f"[red]Error retrieving {indicator_enum.name}: {e}") + progress.console.print( + f"[red]Error retrieving {indicator_enum.name}: {e}" + ) continue if not indicator_results: continue - progress.console.print(f"\n[bold]{ticker} - {indicator_enum.name}:[/bold]") + progress.console.print( + f"\n[bold]{ticker} - {indicator_enum.name}:[/bold]" + ) trading_day_values = [ x for x in indicator_results if is_trading_day(x.datetime) ] for r in trading_day_values: - price_str = f"${prices[r.datetime.day]:.2f}" if r.datetime.day in prices else "N/A" + price_str = ( + f"${prices[r.datetime.day]:.2f}" + if r.datetime.day in prices + else "N/A" + ) progress.console.print( f" {format_date_readable(r.datetime)} ({price_str}) - {indicator_enum.name}: {r.value:.2f}" ) - + # Remove indicator task when done progress.remove_task(indicator_task) - + # Advance overall ticker progress progress.advance(ticker_task) diff --git a/paperone/__init__.py b/paperone/__init__.py new file mode 100644 index 0000000..a1655f6 --- /dev/null +++ b/paperone/__init__.py @@ -0,0 +1,3 @@ +from beartype.claw import beartype_this_package + +beartype_this_package() diff --git a/paperone/taapi.py b/paperone/taapi.py new file mode 100644 index 0000000..aa69a7b --- /dev/null +++ b/paperone/taapi.py @@ -0,0 +1,201 @@ +import requests +import math +from dataclasses import dataclass +from typing import List +from enum import Enum +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry +from datetime import datetime, timedelta + + +@dataclass(frozen=True) +class Indicator: + endpoint: str + params: dict[str, int] + + +@dataclass +class QueryResult: + datetime: datetime + value: float + + +class IndicatorEnum(Enum): + # Momentum Indicators + RSI = Indicator(endpoint="rsi", params={"period": 20}) + STOCH = Indicator(endpoint="stoch", params={"fast_k": 14, "slow_k": 3, "slow_d": 3}) + CCI = Indicator(endpoint="cci", params={"period": 20}) + + # Trend Indicators + MACD = Indicator( + endpoint="macd", + params={"fast_period": 12, "slow_period": 26, "signal_period": 9}, + ) + EMA_20 = Indicator(endpoint="ema", params={"period": 20}) + EMA_50 = Indicator(endpoint="ema", params={"period": 50}) + SMA_200 = Indicator(endpoint="sma", params={"period": 200}) + ADX = Indicator(endpoint="adx", params={"period": 14}) + + # Volatility Indicators + BBANDS = Indicator(endpoint="bbands", params={"period": 20, "stddev": 2}) + ATR = Indicator(endpoint="atr", params={"period": 14}) + + # Volume Indicators + OBV = Indicator(endpoint="obv", params={}) + VOLUME = Indicator(endpoint="volume", params={}) + + +class TaapiClient: + def __init__(self, api_key: str) -> None: + self._api_key: str = api_key + self._base_url: str = "https://api.taapi.io" + self._session: requests.Session = self._create_session_with_retries() + + def __build_indicator_url__(self, indicator: Indicator) -> str: + return f"{self._base_url}/{indicator.endpoint}" + + @staticmethod + def _create_session_with_retries() -> requests.Session: + session: requests.Session = requests.Session() + + retry_strategy: Retry = Retry( + total=5, # Maximum 5 retry attempts + backoff_factor=1, # Exponential backoff: 1s, 2s, 4s, 8s, 16s + status_forcelist=[429, 500, 502, 503, 504], # Retry on these HTTP codes + allowed_methods=["GET"], # Only retry GET requests + raise_on_status=False, # Don't raise exceptions, return response + ) + + adapter: HTTPAdapter = HTTPAdapter(max_retries=retry_strategy) + + session.mount("https://", adapter) + session.mount("http://", adapter) + + return session + + def _do_get(self, url: str, params: dict) -> requests.Response: + timeout = 5 + + return self._session.get(url, params=params, timeout=timeout) + + def query_indicator( + self, + ticker: str, + indicator: Indicator, + target_date: datetime, + interval: str = "1d", + results: int = 14, + ) -> List[QueryResult] | None: + ret: List[QueryResult] = [] + backtrack_candles: int = self.__candles_to_target_date__(target_date, interval) + target_url: str = self.__build_indicator_url__(indicator) + + params: dict[str, str | int | bool] = { + "secret": self._api_key, + "symbol": ticker, + "interval": interval, + "type": "stocks", + "gaps": "false", + "addResultTimestamp": "true", + "backtrack": backtrack_candles, + "results": str(results), + } + + if indicator.params: + params = params | indicator.params + + response = self._do_get(target_url, params) + + if response.status_code != 200: + return None + + data: dict[str, list[float] | list[int]] = response.json() + for val, ts in zip(data["value"], data["timestamp"]): + dt: datetime = datetime.fromtimestamp(ts) + + ret.append(QueryResult(dt, float(val))) + + return ret + + def query_price_on_day( + self, + ticker: str, + target_date: datetime, + ) -> QueryResult | None: + backtrack_candles: int = self.__candles_to_target_date__(target_date, "1d") + target_url: str = f"{self._base_url}/price" + + params: dict[str, str | int | bool] = { + "secret": self._api_key, + "symbol": ticker, + "interval": "1d", + "type": "stocks", + "gaps": "false", + "addResultTimestamp": "true", + "backtrack": backtrack_candles, + "results": "1", + } + + response = self._do_get(target_url, params) + + if response.status_code != 200: + return None + + data = response.json() + + dt: datetime = ( + datetime.fromtimestamp(data["timestamp"][0]) + if "timestamp" in data + else target_date + ) + + if "value" not in data: + raise Exception("Invalid value") + + if len(data["value"]) != 1: + raise Exception("Multiple values returned") + + return QueryResult(dt, float(data["value"][0])) + + @staticmethod + def __candles_to_target_date__( + target_date: datetime, + interval: str = "1h", + current_time: datetime | None = None, + ) -> int: + if current_time is None: + current_time = datetime.now() + + # Calculate time difference + time_diff: timedelta = current_time - target_date + time_diff_seconds: float = time_diff.total_seconds() + + # Parse interval to get candle duration in seconds + interval_map: dict[str, int] = { + "1m": 60, + "5m": 300, + "15m": 900, + "30m": 1800, + "1h": 3600, + "2h": 7200, + "4h": 14400, + "12h": 43200, + "1d": 86400, + "1w": 604800, + } + + candle_duration_seconds: int = interval_map[interval] + + # Calculate number of candles (round up) + num_candles: int = math.ceil(time_diff_seconds / candle_duration_seconds) + + return num_candles + + def close(self) -> None: + self._session.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + self.close()