paperone/paperone.py
Giulio De Pasquale 6452802124 prettify
2025-10-12 14:33:37 +01:00

337 lines
10 KiB
Python
Executable File

#!/usr/bin/env python
import requests
import math
from sys import exit
from datetime import datetime, timedelta
from dotenv import load_dotenv
from typing import NoReturn, List
from os import environ
from enum import Enum
from dataclasses import dataclass
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn, TimeRemainingColumn
load_dotenv()
@dataclass(frozen=True)
class Indicator:
endpoint: str
params: dict[str, int]
@dataclass
class QueryResult:
datetime: datetime
value: float
class IndicatorEnum(Enum):
# Momentum Indicators
RSI = Indicator(endpoint="rsi", params={"period": 20})
STOCH = Indicator(endpoint="stoch", params={"fast_k": 14, "slow_k": 3, "slow_d": 3})
CCI = Indicator(endpoint="cci", params={"period": 20})
# Trend Indicators
MACD = Indicator(
endpoint="macd",
params={"fast_period": 12, "slow_period": 26, "signal_period": 9},
)
EMA_20 = Indicator(endpoint="ema", params={"period": 20})
EMA_50 = Indicator(endpoint="ema", params={"period": 50})
SMA_200 = Indicator(endpoint="sma", params={"period": 200})
ADX = Indicator(endpoint="adx", params={"period": 14})
# Volatility Indicators
BBANDS = Indicator(endpoint="bbands", params={"period": 20, "stddev": 2})
ATR = Indicator(endpoint="atr", params={"period": 14})
# Volume Indicators
OBV = Indicator(endpoint="obv", params={})
VOLUME = Indicator(endpoint="volume", params={})
class TaapiClient:
def __init__(self, api_key: str) -> None:
self._api_key: str = api_key
self._base_url: str = "https://api.taapi.io"
self._session: requests.Session = self._create_session_with_retries()
def __build_indicator_url__(self, indicator: Indicator) -> str:
return f"{self._base_url}/{indicator.endpoint}"
@staticmethod
def _create_session_with_retries() -> requests.Session:
session: requests.Session = requests.Session()
retry_strategy: Retry = Retry(
total=5, # Maximum 5 retry attempts
backoff_factor=1, # Exponential backoff: 1s, 2s, 4s, 8s, 16s
status_forcelist=[429, 500, 502, 503, 504], # Retry on these HTTP codes
allowed_methods=["GET"], # Only retry GET requests
raise_on_status=False, # Don't raise exceptions, return response
)
adapter: HTTPAdapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
def _do_get(self, url, params) -> requests.Response:
timeout = 5
return self._session.get(url, params=params, timeout=timeout)
def query_indicator(
self,
ticker: str,
indicator: Indicator,
target_date: datetime,
interval: str = "1d",
results: int = 14,
) -> List[QueryResult] | None:
ret: List[QueryResult] = []
backtrack_candles: int = self.__candles_to_target_date__(target_date, interval)
target_url: str = self.__build_indicator_url__(indicator)
params: dict[str, str | int | bool] = {
"secret": self._api_key,
"symbol": ticker,
"interval": interval,
"type": "stocks",
"gaps": "false",
"addResultTimestamp": "true",
"backtrack": backtrack_candles,
"results": str(results),
}
if indicator.params:
params = params | indicator.params
response = self._do_get(target_url, params)
if response.status_code != 200:
return None
data: dict[str, list[float] | list[int]] = response.json()
for val, ts in zip(data["value"], data["timestamp"]):
dt: datetime = datetime.fromtimestamp(ts)
ret.append(QueryResult(dt, val))
return ret
def query_price_on_day(
self,
ticker: str,
target_date: datetime,
) -> QueryResult | None:
backtrack_candles: int = self.__candles_to_target_date__(target_date, "1d")
target_url: str = f"{self._base_url}/price"
params: dict[str, str | int | bool] = {
"secret": self._api_key,
"symbol": ticker,
"interval": "1d",
"type": "stocks",
"gaps": "false",
"addResultTimestamp": "true",
"backtrack": backtrack_candles,
"results": "1",
}
response = self._do_get(target_url, params)
if response.status_code != 200:
return None
data = response.json()
dt: datetime = (
datetime.fromtimestamp(data["timestamp"][0])
if "timestamp" in data
else target_date
)
return QueryResult(dt, data["value"][0])
@staticmethod
def __candles_to_target_date__(
target_date: datetime,
interval: str = "1h",
current_time: datetime | None = None,
) -> int:
if current_time is None:
current_time = datetime.now()
# Calculate time difference
time_diff: datetime = current_time - target_date
time_diff_seconds: float = time_diff.total_seconds()
# Parse interval to get candle duration in seconds
interval_map: dict[str, int] = {
"1m": 60,
"5m": 300,
"15m": 900,
"30m": 1800,
"1h": 3600,
"2h": 7200,
"4h": 14400,
"12h": 43200,
"1d": 86400,
"1w": 604800,
}
candle_duration_seconds: int = interval_map[interval]
# Calculate number of candles (round up)
num_candles: int = math.ceil(time_diff_seconds / candle_duration_seconds)
return num_candles
def close(self) -> None:
self._session.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.close()
def is_trading_day(date: datetime) -> bool:
return date.weekday() not in [5, 6]
def parse_date_yyyymmdd(date_str: str) -> datetime:
return datetime.strptime(date_str, "%Y%m%d")
def days_range_from(date: datetime, n: int) -> List[datetime]:
"""
Generate a list of dates going back n days from the given date (inclusive).
Example: date=Jan 3, n=2 → [Jan 1, Jan 2, Jan 3]
Args:
date: The target date
n: Number of days to backtrack
Returns:
List of datetime objects from (date - n) to date (inclusive)
"""
start_date = date - timedelta(days=n)
return [start_date + timedelta(days=i) for i in range(n + 1)]
def format_date_readable(date: datetime) -> str:
return date.strftime("%B %d, %Y")
def main() -> NoReturn:
api_key = environ.get("API_KEY")
if not api_key:
print("API_KEY not set")
exit(0)
date = parse_date_yyyymmdd("20250821")
days_range = 14
dates_range = days_range_from(date, days_range)
tickers = ["AAPL", "NVDA", "AMD", "META", "MSFT", "GOOG"]
indicators = list(IndicatorEnum)
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
TimeRemainingColumn(),
) as progress:
# Overall ticker progress
ticker_task = progress.add_task(
"[cyan]Processing tickers...", total=len(tickers)
)
with TaapiClient(api_key) as client:
for ticker in tickers:
# Update ticker task
progress.update(
ticker_task,
description=f"[cyan]Processing {ticker}..."
)
# Price loading subtask
price_task = progress.add_task(
f"[green] └─ Loading prices for {ticker}...",
total=len(dates_range)
)
prices = {}
for d in dates_range:
progress.update(
price_task,
description=f"[green] └─ Loading {ticker} price for {format_date_readable(d)}...",
advance=1
)
result = client.query_price_on_day(ticker, d)
if result:
prices[d.day] = result.value
# Remove price task when done
progress.remove_task(price_task)
# Indicator loading subtask
indicator_task = progress.add_task(
f"[yellow] └─ Loading indicators for {ticker}...",
total=len(indicators)
)
for indicator_enum in indicators:
progress.update(
indicator_task,
description=f"[yellow] └─ Loading {ticker} indicator: {indicator_enum.name}...",
advance=1
)
try:
indicator_results = client.query_indicator(
ticker, indicator_enum.value, date, results=days_range
)
except Exception as e:
progress.console.print(f"[red]Error retrieving {indicator_enum.name}: {e}")
continue
if not indicator_results:
continue
progress.console.print(f"\n[bold]{ticker} - {indicator_enum.name}:[/bold]")
trading_day_values = [
x for x in indicator_results if is_trading_day(x.datetime)
]
for r in trading_day_values:
price_str = f"${prices[r.datetime.day]:.2f}" if r.datetime.day in prices else "N/A"
progress.console.print(
f" {format_date_readable(r.datetime)} ({price_str}) - {indicator_enum.name}: {r.value:.2f}"
)
# Remove indicator task when done
progress.remove_task(indicator_task)
# Advance overall ticker progress
progress.advance(ticker_task)
exit(0)
if __name__ == "__main__":
main()