This commit is contained in:
Giulio De Pasquale 2025-10-12 18:05:31 +01:00
parent 6452802124
commit 0410f3aa42
4 changed files with 440 additions and 216 deletions

199
.gitignore vendored
View File

@ -9,3 +9,202 @@ devenv.local.nix
.pre-commit-config.yaml
.env
# Created by https://www.toptal.com/developers/gitignore/api/python,go
# Edit at https://www.toptal.com/developers/gitignore?templates=python,go
### Go ###
# If you prefer the allow list template instead of the deny list, see community template:
# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore
#
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
go.work
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
### Python Patch ###
# Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration
poetry.toml
# ruff
.ruff_cache/
# LSP config files
pyrightconfig.json
# End of https://www.toptal.com/developers/gitignore/api/python,go

View File

@ -1,208 +1,23 @@
#!/usr/bin/env python
import requests
import math
from sys import exit
from datetime import datetime, timedelta
from dotenv import load_dotenv
from typing import NoReturn, List
from os import environ
from enum import Enum
from dataclasses import dataclass
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn, TimeRemainingColumn
from paperone.taapi import TaapiClient, IndicatorEnum
from rich.progress import (
Progress,
SpinnerColumn,
TextColumn,
BarColumn,
TaskProgressColumn,
TimeRemainingColumn,
)
load_dotenv()
@dataclass(frozen=True)
class Indicator:
endpoint: str
params: dict[str, int]
@dataclass
class QueryResult:
datetime: datetime
value: float
class IndicatorEnum(Enum):
# Momentum Indicators
RSI = Indicator(endpoint="rsi", params={"period": 20})
STOCH = Indicator(endpoint="stoch", params={"fast_k": 14, "slow_k": 3, "slow_d": 3})
CCI = Indicator(endpoint="cci", params={"period": 20})
# Trend Indicators
MACD = Indicator(
endpoint="macd",
params={"fast_period": 12, "slow_period": 26, "signal_period": 9},
)
EMA_20 = Indicator(endpoint="ema", params={"period": 20})
EMA_50 = Indicator(endpoint="ema", params={"period": 50})
SMA_200 = Indicator(endpoint="sma", params={"period": 200})
ADX = Indicator(endpoint="adx", params={"period": 14})
# Volatility Indicators
BBANDS = Indicator(endpoint="bbands", params={"period": 20, "stddev": 2})
ATR = Indicator(endpoint="atr", params={"period": 14})
# Volume Indicators
OBV = Indicator(endpoint="obv", params={})
VOLUME = Indicator(endpoint="volume", params={})
class TaapiClient:
def __init__(self, api_key: str) -> None:
self._api_key: str = api_key
self._base_url: str = "https://api.taapi.io"
self._session: requests.Session = self._create_session_with_retries()
def __build_indicator_url__(self, indicator: Indicator) -> str:
return f"{self._base_url}/{indicator.endpoint}"
@staticmethod
def _create_session_with_retries() -> requests.Session:
session: requests.Session = requests.Session()
retry_strategy: Retry = Retry(
total=5, # Maximum 5 retry attempts
backoff_factor=1, # Exponential backoff: 1s, 2s, 4s, 8s, 16s
status_forcelist=[429, 500, 502, 503, 504], # Retry on these HTTP codes
allowed_methods=["GET"], # Only retry GET requests
raise_on_status=False, # Don't raise exceptions, return response
)
adapter: HTTPAdapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
def _do_get(self, url, params) -> requests.Response:
timeout = 5
return self._session.get(url, params=params, timeout=timeout)
def query_indicator(
self,
ticker: str,
indicator: Indicator,
target_date: datetime,
interval: str = "1d",
results: int = 14,
) -> List[QueryResult] | None:
ret: List[QueryResult] = []
backtrack_candles: int = self.__candles_to_target_date__(target_date, interval)
target_url: str = self.__build_indicator_url__(indicator)
params: dict[str, str | int | bool] = {
"secret": self._api_key,
"symbol": ticker,
"interval": interval,
"type": "stocks",
"gaps": "false",
"addResultTimestamp": "true",
"backtrack": backtrack_candles,
"results": str(results),
}
if indicator.params:
params = params | indicator.params
response = self._do_get(target_url, params)
if response.status_code != 200:
return None
data: dict[str, list[float] | list[int]] = response.json()
for val, ts in zip(data["value"], data["timestamp"]):
dt: datetime = datetime.fromtimestamp(ts)
ret.append(QueryResult(dt, val))
return ret
def query_price_on_day(
self,
ticker: str,
target_date: datetime,
) -> QueryResult | None:
backtrack_candles: int = self.__candles_to_target_date__(target_date, "1d")
target_url: str = f"{self._base_url}/price"
params: dict[str, str | int | bool] = {
"secret": self._api_key,
"symbol": ticker,
"interval": "1d",
"type": "stocks",
"gaps": "false",
"addResultTimestamp": "true",
"backtrack": backtrack_candles,
"results": "1",
}
response = self._do_get(target_url, params)
if response.status_code != 200:
return None
data = response.json()
dt: datetime = (
datetime.fromtimestamp(data["timestamp"][0])
if "timestamp" in data
else target_date
)
return QueryResult(dt, data["value"][0])
@staticmethod
def __candles_to_target_date__(
target_date: datetime,
interval: str = "1h",
current_time: datetime | None = None,
) -> int:
if current_time is None:
current_time = datetime.now()
# Calculate time difference
time_diff: datetime = current_time - target_date
time_diff_seconds: float = time_diff.total_seconds()
# Parse interval to get candle duration in seconds
interval_map: dict[str, int] = {
"1m": 60,
"5m": 300,
"15m": 900,
"30m": 1800,
"1h": 3600,
"2h": 7200,
"4h": 14400,
"12h": 43200,
"1d": 86400,
"1w": 604800,
}
candle_duration_seconds: int = interval_map[interval]
# Calculate number of candles (round up)
num_candles: int = math.ceil(time_diff_seconds / candle_duration_seconds)
return num_candles
def close(self) -> None:
self._session.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.close()
def is_trading_day(date: datetime) -> bool:
return date.weekday() not in [5, 6]
@ -241,7 +56,7 @@ def main() -> NoReturn:
exit(0)
date = parse_date_yyyymmdd("20250821")
days_range = 14
days_range = 60
dates_range = days_range_from(date, days_range)
tickers = ["AAPL", "NVDA", "AMD", "META", "MSFT", "GOOG"]
indicators = list(IndicatorEnum)
@ -253,79 +68,85 @@ def main() -> NoReturn:
TaskProgressColumn(),
TimeRemainingColumn(),
) as progress:
# Overall ticker progress
ticker_task = progress.add_task(
"[cyan]Processing tickers...", total=len(tickers)
)
with TaapiClient(api_key) as client:
for ticker in tickers:
# Update ticker task
progress.update(
ticker_task,
description=f"[cyan]Processing {ticker}..."
ticker_task, description=f"[cyan]Processing {ticker}..."
)
# Price loading subtask
price_task = progress.add_task(
f"[green] └─ Loading prices for {ticker}...",
total=len(dates_range)
f"[green] └─ Loading prices for {ticker}...",
total=len(dates_range),
)
prices = {}
for d in dates_range:
progress.update(
price_task,
description=f"[green] └─ Loading {ticker} price for {format_date_readable(d)}...",
advance=1
advance=1,
)
result = client.query_price_on_day(ticker, d)
if result:
prices[d.day] = result.value
# Remove price task when done
progress.remove_task(price_task)
# Indicator loading subtask
indicator_task = progress.add_task(
f"[yellow] └─ Loading indicators for {ticker}...",
total=len(indicators)
f"[yellow] └─ Loading indicators for {ticker}...",
total=len(indicators),
)
for indicator_enum in indicators:
progress.update(
indicator_task,
description=f"[yellow] └─ Loading {ticker} indicator: {indicator_enum.name}...",
advance=1
advance=1,
)
try:
indicator_results = client.query_indicator(
ticker, indicator_enum.value, date, results=days_range
)
except Exception as e:
progress.console.print(f"[red]Error retrieving {indicator_enum.name}: {e}")
progress.console.print(
f"[red]Error retrieving {indicator_enum.name}: {e}"
)
continue
if not indicator_results:
continue
progress.console.print(f"\n[bold]{ticker} - {indicator_enum.name}:[/bold]")
progress.console.print(
f"\n[bold]{ticker} - {indicator_enum.name}:[/bold]"
)
trading_day_values = [
x for x in indicator_results if is_trading_day(x.datetime)
]
for r in trading_day_values:
price_str = f"${prices[r.datetime.day]:.2f}" if r.datetime.day in prices else "N/A"
price_str = (
f"${prices[r.datetime.day]:.2f}"
if r.datetime.day in prices
else "N/A"
)
progress.console.print(
f" {format_date_readable(r.datetime)} ({price_str}) - {indicator_enum.name}: {r.value:.2f}"
)
# Remove indicator task when done
progress.remove_task(indicator_task)
# Advance overall ticker progress
progress.advance(ticker_task)

3
paperone/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from beartype.claw import beartype_this_package
beartype_this_package()

201
paperone/taapi.py Normal file
View File

@ -0,0 +1,201 @@
import requests
import math
from dataclasses import dataclass
from typing import List
from enum import Enum
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from datetime import datetime, timedelta
@dataclass(frozen=True)
class Indicator:
endpoint: str
params: dict[str, int]
@dataclass
class QueryResult:
datetime: datetime
value: float
class IndicatorEnum(Enum):
# Momentum Indicators
RSI = Indicator(endpoint="rsi", params={"period": 20})
STOCH = Indicator(endpoint="stoch", params={"fast_k": 14, "slow_k": 3, "slow_d": 3})
CCI = Indicator(endpoint="cci", params={"period": 20})
# Trend Indicators
MACD = Indicator(
endpoint="macd",
params={"fast_period": 12, "slow_period": 26, "signal_period": 9},
)
EMA_20 = Indicator(endpoint="ema", params={"period": 20})
EMA_50 = Indicator(endpoint="ema", params={"period": 50})
SMA_200 = Indicator(endpoint="sma", params={"period": 200})
ADX = Indicator(endpoint="adx", params={"period": 14})
# Volatility Indicators
BBANDS = Indicator(endpoint="bbands", params={"period": 20, "stddev": 2})
ATR = Indicator(endpoint="atr", params={"period": 14})
# Volume Indicators
OBV = Indicator(endpoint="obv", params={})
VOLUME = Indicator(endpoint="volume", params={})
class TaapiClient:
def __init__(self, api_key: str) -> None:
self._api_key: str = api_key
self._base_url: str = "https://api.taapi.io"
self._session: requests.Session = self._create_session_with_retries()
def __build_indicator_url__(self, indicator: Indicator) -> str:
return f"{self._base_url}/{indicator.endpoint}"
@staticmethod
def _create_session_with_retries() -> requests.Session:
session: requests.Session = requests.Session()
retry_strategy: Retry = Retry(
total=5, # Maximum 5 retry attempts
backoff_factor=1, # Exponential backoff: 1s, 2s, 4s, 8s, 16s
status_forcelist=[429, 500, 502, 503, 504], # Retry on these HTTP codes
allowed_methods=["GET"], # Only retry GET requests
raise_on_status=False, # Don't raise exceptions, return response
)
adapter: HTTPAdapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
def _do_get(self, url: str, params: dict) -> requests.Response:
timeout = 5
return self._session.get(url, params=params, timeout=timeout)
def query_indicator(
self,
ticker: str,
indicator: Indicator,
target_date: datetime,
interval: str = "1d",
results: int = 14,
) -> List[QueryResult] | None:
ret: List[QueryResult] = []
backtrack_candles: int = self.__candles_to_target_date__(target_date, interval)
target_url: str = self.__build_indicator_url__(indicator)
params: dict[str, str | int | bool] = {
"secret": self._api_key,
"symbol": ticker,
"interval": interval,
"type": "stocks",
"gaps": "false",
"addResultTimestamp": "true",
"backtrack": backtrack_candles,
"results": str(results),
}
if indicator.params:
params = params | indicator.params
response = self._do_get(target_url, params)
if response.status_code != 200:
return None
data: dict[str, list[float] | list[int]] = response.json()
for val, ts in zip(data["value"], data["timestamp"]):
dt: datetime = datetime.fromtimestamp(ts)
ret.append(QueryResult(dt, float(val)))
return ret
def query_price_on_day(
self,
ticker: str,
target_date: datetime,
) -> QueryResult | None:
backtrack_candles: int = self.__candles_to_target_date__(target_date, "1d")
target_url: str = f"{self._base_url}/price"
params: dict[str, str | int | bool] = {
"secret": self._api_key,
"symbol": ticker,
"interval": "1d",
"type": "stocks",
"gaps": "false",
"addResultTimestamp": "true",
"backtrack": backtrack_candles,
"results": "1",
}
response = self._do_get(target_url, params)
if response.status_code != 200:
return None
data = response.json()
dt: datetime = (
datetime.fromtimestamp(data["timestamp"][0])
if "timestamp" in data
else target_date
)
if "value" not in data:
raise Exception("Invalid value")
if len(data["value"]) != 1:
raise Exception("Multiple values returned")
return QueryResult(dt, float(data["value"][0]))
@staticmethod
def __candles_to_target_date__(
target_date: datetime,
interval: str = "1h",
current_time: datetime | None = None,
) -> int:
if current_time is None:
current_time = datetime.now()
# Calculate time difference
time_diff: timedelta = current_time - target_date
time_diff_seconds: float = time_diff.total_seconds()
# Parse interval to get candle duration in seconds
interval_map: dict[str, int] = {
"1m": 60,
"5m": 300,
"15m": 900,
"30m": 1800,
"1h": 3600,
"2h": 7200,
"4h": 14400,
"12h": 43200,
"1d": 86400,
"1w": 604800,
}
candle_duration_seconds: int = interval_map[interval]
# Calculate number of candles (round up)
num_candles: int = math.ceil(time_diff_seconds / candle_duration_seconds)
return num_candles
def close(self) -> None:
self._session.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.close()