275 lines
7.6 KiB
Python
Executable File
275 lines
7.6 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
import requests
|
|
import math
|
|
from sys import exit
|
|
from datetime import datetime
|
|
from dotenv import load_dotenv
|
|
from typing import NoReturn, List
|
|
from os import environ
|
|
from enum import Enum
|
|
from dataclasses import dataclass
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
|
|
load_dotenv()
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Indicator:
|
|
endpoint: str
|
|
params: dict[str, int]
|
|
|
|
@dataclass
|
|
class QueryResult:
|
|
datetime: datetime
|
|
value: float
|
|
|
|
|
|
class IndicatorEnum(Enum):
|
|
# Momentum Indicators
|
|
RSI = Indicator(endpoint="rsi", params={"period": 20})
|
|
STOCH = Indicator(endpoint="stoch", params={"fast_k": 14, "slow_k": 3, "slow_d": 3})
|
|
CCI = Indicator(endpoint="cci", params={"period": 20})
|
|
|
|
# Trend Indicators
|
|
MACD = Indicator(
|
|
endpoint="macd",
|
|
params={"fast_period": 12, "slow_period": 26, "signal_period": 9},
|
|
)
|
|
EMA_20 = Indicator(endpoint="ema", params={"period": 20})
|
|
EMA_50 = Indicator(endpoint="ema", params={"period": 50})
|
|
SMA_200 = Indicator(endpoint="sma", params={"period": 200})
|
|
ADX = Indicator(endpoint="adx", params={"period": 14})
|
|
|
|
# Volatility Indicators
|
|
BBANDS = Indicator(endpoint="bbands", params={"period": 20, "stddev": 2})
|
|
ATR = Indicator(endpoint="atr", params={"period": 14})
|
|
|
|
# Volume Indicators
|
|
OBV = Indicator(endpoint="obv", params={})
|
|
VOLUME = Indicator(endpoint="volume", params={})
|
|
|
|
|
|
class Interval(Enum):
|
|
OneMinute = 60
|
|
FiveMinutes = 300
|
|
FifteenMinutes = 900
|
|
ThirtyMinutes = 1800
|
|
OneHour = 3600
|
|
TwoHours = 7200
|
|
FourHours = 14400
|
|
TwelveHours = 43200
|
|
OneDay = 86400
|
|
OneWeek = 604800
|
|
|
|
|
|
class TaapiClient:
|
|
def __init__(self, api_key: str) -> None:
|
|
self._api_key: str = api_key
|
|
self._base_url: str = "https://api.taapi.io"
|
|
self._session: requests.Session = self._create_session_with_retries()
|
|
|
|
def __build_indicator_url__(self, indicator: Indicator) -> str:
|
|
return f"{self._base_url}/{indicator.endpoint}"
|
|
|
|
@staticmethod
|
|
def _create_session_with_retries() -> requests.Session:
|
|
session: requests.Session = requests.Session()
|
|
|
|
retry_strategy: Retry = Retry(
|
|
total=5, # Maximum 5 retry attempts
|
|
backoff_factor=1, # Exponential backoff: 1s, 2s, 4s, 8s, 16s
|
|
status_forcelist=[429, 500, 502, 503, 504], # Retry on these HTTP codes
|
|
allowed_methods=["GET"], # Only retry GET requests
|
|
raise_on_status=False, # Don't raise exceptions, return response
|
|
)
|
|
|
|
adapter: HTTPAdapter = HTTPAdapter(max_retries=retry_strategy)
|
|
|
|
session.mount("https://", adapter)
|
|
session.mount("http://", adapter)
|
|
|
|
return session
|
|
|
|
def _do_get(self, url, params) -> requests.Response:
|
|
timeout = 5
|
|
|
|
return self._session.get(url, params=params, timeout=timeout)
|
|
|
|
def query_indicator(
|
|
self,
|
|
ticker: str,
|
|
indicator: Indicator,
|
|
target_date: datetime,
|
|
interval: str = "1d",
|
|
results: int = 14,
|
|
) -> List[QueryResult] | None:
|
|
ret: List[QueryResult] = []
|
|
backtrack_candles: int = self.__candles_to_target_date__(target_date, interval)
|
|
target_url: str = self.__build_indicator_url__(indicator)
|
|
|
|
params: dict[str, str | int | bool] = {
|
|
"secret": self._api_key,
|
|
"symbol": ticker,
|
|
"interval": interval,
|
|
"type": "stocks",
|
|
"gaps": "false",
|
|
"addResultTimestamp": "true",
|
|
"backtrack": backtrack_candles,
|
|
"results": str(results),
|
|
}
|
|
|
|
if indicator.params:
|
|
params = params | indicator.params
|
|
|
|
response = self._do_get(target_url, params)
|
|
|
|
if response.status_code != 200:
|
|
return None
|
|
|
|
data: dict[str, list[float] | list[int]] = response.json()
|
|
for val, ts in zip(data["value"], data["timestamp"]):
|
|
dt: datetime = datetime.fromtimestamp(ts)
|
|
|
|
ret.append(QueryResult(dt, val))
|
|
|
|
return ret
|
|
|
|
def query_price_on_day(
|
|
self,
|
|
ticker: str,
|
|
target_date: datetime,
|
|
) -> QueryResult | None:
|
|
backtrack_candles: int = self.__candles_to_target_date__(target_date, "1d")
|
|
target_url: str = f"{self._base_url}/price"
|
|
|
|
params: dict[str, str | int | bool] = {
|
|
"secret": self._api_key,
|
|
"symbol": ticker,
|
|
"interval": "1d",
|
|
"type": "stocks",
|
|
"gaps": "false",
|
|
"addResultTimestamp": "true",
|
|
"backtrack": backtrack_candles,
|
|
"results": "1",
|
|
}
|
|
|
|
response = self._do_get(target_url, params)
|
|
|
|
if response.status_code != 200:
|
|
return None
|
|
|
|
data = response.json()
|
|
|
|
dt: datetime = (
|
|
datetime.fromtimestamp(data["timestamp"][0])
|
|
if "timestamp" in data
|
|
else target_date
|
|
)
|
|
|
|
return QueryResult(dt, data["value"][0])
|
|
|
|
@staticmethod
|
|
def __candles_to_target_date__(
|
|
target_date: datetime,
|
|
interval: str = "1h",
|
|
current_time: datetime | None = None,
|
|
) -> int:
|
|
if current_time is None:
|
|
current_time = datetime.now()
|
|
|
|
# Calculate time difference
|
|
time_diff: datetime = current_time - target_date
|
|
time_diff_seconds: float = time_diff.total_seconds()
|
|
|
|
# Parse interval to get candle duration in seconds
|
|
interval_map: dict[str, int] = {
|
|
"1m": 60,
|
|
"5m": 300,
|
|
"15m": 900,
|
|
"30m": 1800,
|
|
"1h": 3600,
|
|
"2h": 7200,
|
|
"4h": 14400,
|
|
"12h": 43200,
|
|
"1d": 86400,
|
|
"1w": 604800,
|
|
}
|
|
|
|
candle_duration_seconds: int = interval_map[interval]
|
|
|
|
# Calculate number of candles (round up)
|
|
num_candles: int = math.ceil(time_diff_seconds / candle_duration_seconds)
|
|
|
|
return num_candles
|
|
|
|
def close(self) -> None:
|
|
self._session.close()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
|
|
self.close()
|
|
|
|
|
|
def is_trading_day(date: datetime) -> bool:
|
|
return date.weekday() not in [5, 6]
|
|
|
|
|
|
def parse_date_yyyymmdd(date_str: str) -> datetime:
|
|
return datetime.strptime(date_str, "%Y%m%d")
|
|
|
|
|
|
def format_date_readable(date: datetime) -> str:
|
|
return date.strftime("%B %d, %Y")
|
|
|
|
|
|
def main() -> NoReturn:
|
|
api_key = environ.get("API_KEY")
|
|
|
|
if not api_key:
|
|
print("API_KEY not set")
|
|
|
|
exit(0)
|
|
|
|
date = parse_date_yyyymmdd("20250821")
|
|
|
|
with TaapiClient(api_key) as client:
|
|
# for t in ["AAPL", "NVDA", "AMD", "META", "MSFT", "GOOG"]:
|
|
for t in ["AAPL"]:
|
|
print(f"TICKER: {t}\n")
|
|
for i in IndicatorEnum:
|
|
try:
|
|
indicator_results = client.query_indicator(t, i.value, date)
|
|
except Exception as e:
|
|
# print(f"Could not retrieve data: {e}")
|
|
|
|
continue
|
|
|
|
if not indicator_results:
|
|
# print("Could not retrieve data")
|
|
|
|
continue
|
|
|
|
print(f"Indicator: {i}")
|
|
|
|
trading_day_values = [
|
|
x for x in indicator_results if is_trading_day(x.datetime)
|
|
]
|
|
|
|
for r in trading_day_values:
|
|
price = client.query_price_on_day(t, r.datetime)
|
|
print(
|
|
f"{format_date_readable(r.datetime)} (${price.value:.2f}) - {i.name}: {r.value:.2f}"
|
|
)
|
|
|
|
print("---------------")
|
|
|
|
exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|