feat(metadata): add context model and CRUD operations

This commit is contained in:
Giulio De Pasquale 2025-10-18 13:06:05 +01:00
parent d13540757c
commit ac33da2412
4 changed files with 444 additions and 2 deletions

View File

@ -2,7 +2,7 @@ from sqlmodel import SQLModel, Session, create_engine, select
from typing import List, Optional, Tuple from typing import List, Optional, Tuple
from datetime import datetime from datetime import datetime
from contextlib import contextmanager from contextlib import contextmanager
from .models import TickerOHLCV, IndicatorsData from .models import TickerOHLCV, IndicatorsData,TickerContext
from .entities import TimeSeriesTickerData from .entities import TimeSeriesTickerData
@ -34,7 +34,7 @@ class TradingDataCRUD:
def get_session(self): def get_session(self):
"""Context manager for database sessions with automatic cleanup.""" """Context manager for database sessions with automatic cleanup."""
session = Session(self.engine, expire_on_commit=False) session = Session(self.engine, expire_on_commit=False)
try: try:
yield session yield session
session.commit() session.commit()
@ -267,6 +267,7 @@ class TradingDataCRUD:
TimeSeriesTickerData instance with OHLCV data TimeSeriesTickerData instance with OHLCV data
""" """
ohlcv_list = self.get_ohlcv_range(ticker, start_date, end_date) ohlcv_list = self.get_ohlcv_range(ticker, start_date, end_date)
return TimeSeriesTickerData.build_time_series_ticker_data(ticker, ohlcv_list) return TimeSeriesTickerData.build_time_series_ticker_data(ticker, ohlcv_list)
# ======================================================================== # ========================================================================
@ -475,3 +476,58 @@ class TradingDataCRUD:
session.commit() session.commit()
return (ohlcv_count, indicators_count) return (ohlcv_count, indicators_count)
def get_context(self, ticker: str, date: datetime) -> Optional[TickerContext]:
"""Get metadata for a specific ticker and date."""
with self.get_session() as session:
statement = select(TickerContext).where(
TickerContext.ticker == ticker, TickerContext.date == date
)
return session.exec(statement).first()
def upsert_context(self, metadata: TickerContext) -> TickerContext:
"""Insert or update metadata record."""
existing = self.get_context(metadata.ticker, metadata.date)
if existing:
metadata_fields = {
k: v
for k, v in metadata.__dict__.items()
if k not in ["ticker", "date", "ohlcv", "_sa_instance_state"]
}
return self.update_context(
metadata.ticker, metadata.date, **metadata_fields
)
else:
return self.create_context(metadata)
def create_context(self, metadata: TickerContext) -> TickerContext:
"""Insert a single metadata record."""
with self.get_session() as session:
session.add(metadata)
session.commit()
session.refresh(metadata)
return metadata
def update_context(
self, ticker: str, date: datetime, **kwargs
) -> Optional[TickerContext]:
"""Update metadata fields for a specific record."""
with self.get_session() as session:
statement = select(TickerContext).where(
TickerContext.ticker == ticker, TickerContext.date == date
)
metadata = session.exec(statement).first()
if not metadata:
return None
for key, value in kwargs.items():
if hasattr(metadata, key):
setattr(metadata, key, value)
session.add(metadata)
session.commit()
session.refresh(metadata)
return metadata

292
paperone/metadata.py Normal file
View File

@ -0,0 +1,292 @@
import talib
import numpy as np
from typing import List
from datetime import datetime, timedelta
from scipy import stats
from .models import TickerOHLCV, TickerContext
from .database import TradingDataCRUD
class TickerContextService:
"""
Service for calculating enriched metadata from OHLCV data.
Provides multi-timeframe context, volume analysis, swing points,
and other metrics that complement technical indicators.
"""
def __init__(self, crud: TradingDataCRUD):
self._crud = crud
def calculate_metadata_for_date(
self,
ticker: str,
target_date: datetime,
lookback_days: int = 250, # Need 200+ for 200-day MA
) -> TickerContext | None:
"""
Calculate all metadata for a ticker on a specific date.
Args:
ticker: Stock ticker symbol
target_date: Date to calculate metadata for
lookback_days: Historical days needed (default: 250)
Returns:
TickerMetadata instance or None if insufficient data
"""
# Fetch historical data
start_date = target_date - timedelta(days=lookback_days * 2)
ohlcv_records = self._crud.get_ohlcv_range(ticker, start_date, target_date)
if len(ohlcv_records) < 200: # Need at least 200 days for 200-day MA
return None
# Get current day data
current = ohlcv_records[-1]
closes = np.array([r.close for r in ohlcv_records])
highs = np.array([r.high for r in ohlcv_records])
lows = np.array([r.low for r in ohlcv_records])
volumes = np.array([r.volume for r in ohlcv_records])
# Calculate all components
mas = self._calculate_moving_averages(closes, current.close)
changes = self._calculate_price_changes(closes)
volume_metrics = self._calculate_volume_metrics(volumes)
swing_20d = self._calculate_swing_points(ohlcv_records, 20)
swing_60d = self._calculate_swing_points(ohlcv_records, 60)
volatility = self._calculate_volatility(closes, current.close)
trend = self._calculate_trend_metrics(closes)
consecutive = self._calculate_consecutive_days(closes)
# Get ATR from indicators if available
indicators = self._crud.get_indicators(ticker, target_date)
atr = indicators.atr_14 if indicators else 0.0
atr_pct = (atr / current.close * 100) if current.close > 0 else 0.0
return TickerContext(
ticker=ticker,
date=target_date,
# Moving averages
sma_20=mas["sma_20"],
sma_50=mas["sma_50"],
sma_200=mas["sma_200"],
ema_20=mas["ema_20"],
ema_50=mas["ema_50"],
ema_200=mas["ema_200"],
dist_sma_20_pct=mas["dist_20"],
dist_sma_50_pct=mas["dist_50"],
dist_sma_200_pct=mas["dist_200"],
# Price changes
change_1d_pct=changes["1d"],
change_5d_pct=changes["5d"],
change_10d_pct=changes["10d"],
change_20d_pct=changes["20d"],
change_60d_pct=changes["60d"],
# Volume
volume_20d_avg=volume_metrics["avg_20d"],
volume_relative_pct=volume_metrics["relative_pct"],
volume_5d_trend=volume_metrics["trend_5d"],
# Swing points (20-day)
swing_high_20d=swing_20d["high"],
swing_high_20d_date=swing_20d["high_date"],
swing_low_20d=swing_20d["low"],
swing_low_20d_date=swing_20d["low_date"],
position_in_range_pct=swing_20d["position_pct"],
# Swing points (60-day)
swing_high_60d=swing_60d["high"],
swing_low_60d=swing_60d["low"],
# Volatility
atr_pct_of_price=atr_pct,
volatility_20d=volatility["vol_20d"],
volatility_60d=volatility["vol_60d"],
# Trend
trend_strength_20d=trend["slope"],
trend_r_squared_20d=trend["r_squared"],
consecutive_up_days=consecutive["up"],
consecutive_down_days=consecutive["down"],
)
def calculate_and_save_metadata(
self, ticker: str, target_date: datetime, force_update: bool = False
) -> TickerContext | None:
"""Calculate and save metadata to database."""
if not force_update:
existing = self._crud.get_metadata(ticker, target_date)
if existing:
return existing
metadata = self.calculate_metadata_for_date(ticker, target_date)
if metadata is None:
return None
return self._crud.upsert_context(metadata)
# ========================================================================
# PRIVATE: Calculation Methods
# ========================================================================
@staticmethod
def _calculate_moving_averages(closes: np.ndarray, current_price: float) -> dict:
"""Calculate SMAs, EMAs, and distances."""
sma_20 = float(talib.SMA(closes, timeperiod=20)[-1])
sma_50 = float(talib.SMA(closes, timeperiod=50)[-1])
sma_200 = float(talib.SMA(closes, timeperiod=200)[-1])
ema_20 = float(talib.EMA(closes, timeperiod=20)[-1])
ema_50 = float(talib.EMA(closes, timeperiod=50)[-1])
ema_200 = float(talib.EMA(closes, timeperiod=200)[-1])
dist_20 = ((current_price - sma_20) / sma_20 * 100) if sma_20 > 0 else 0.0
dist_50 = ((current_price - sma_50) / sma_50 * 100) if sma_50 > 0 else 0.0
dist_200 = ((current_price - sma_200) / sma_200 * 100) if sma_200 > 0 else 0.0
return {
"sma_20": round(sma_20, 2),
"sma_50": round(sma_50, 2),
"sma_200": round(sma_200, 2),
"ema_20": round(ema_20, 2),
"ema_50": round(ema_50, 2),
"ema_200": round(ema_200, 2),
"dist_20": round(dist_20, 2),
"dist_50": round(dist_50, 2),
"dist_200": round(dist_200, 2),
}
@staticmethod
def _calculate_price_changes(closes: np.ndarray) -> dict:
"""Calculate price changes over multiple periods."""
current = closes[-1]
def pct_change(days_ago):
if len(closes) > days_ago:
old_price = closes[-(days_ago + 1)]
return (
((current - old_price) / old_price * 100) if old_price > 0 else 0.0
)
return 0.0
return {
"1d": round(pct_change(1), 2),
"5d": round(pct_change(5), 2),
"10d": round(pct_change(10), 2),
"20d": round(pct_change(20), 2),
"60d": round(pct_change(60), 2),
}
@staticmethod
def _calculate_volume_metrics(volumes: np.ndarray) -> dict:
"""Calculate volume analysis metrics."""
current_vol = volumes[-1]
avg_20d = float(np.mean(volumes[-20:]))
relative_pct = (current_vol / avg_20d * 100) if avg_20d > 0 else 100.0
# 5-day volume trend (linear regression slope)
if len(volumes) >= 5:
x = np.arange(5)
y = volumes[-5:]
slope, _, _, _, _ = stats.linregress(x, y)
trend_5d = float(slope)
else:
trend_5d = 0.0
return {
"avg_20d": round(avg_20d, 0),
"relative_pct": round(relative_pct, 2),
"trend_5d": round(trend_5d, 2),
}
@staticmethod
def _calculate_swing_points(
ohlcv_records: List[TickerOHLCV], lookback: int
) -> dict:
"""Calculate swing highs and lows."""
recent = ohlcv_records[-lookback:]
highs = [(d.high, d.date) for d in recent]
lows = [(d.low, d.date) for d in recent]
swing_high, swing_high_date = max(highs, key=lambda x: x[0])
swing_low, swing_low_date = min(lows, key=lambda x: x[0])
current_price = ohlcv_records[-1].close
swing_range = swing_high - swing_low
position_pct = (
((current_price - swing_low) / swing_range * 100)
if swing_range > 0
else 50.0
)
return {
"high": round(swing_high, 2),
"high_date": swing_high_date,
"low": round(swing_low, 2),
"low_date": swing_low_date,
"position_pct": round(position_pct, 2),
}
@staticmethod
def _calculate_volatility(closes: np.ndarray, current_price: float) -> dict:
"""Calculate historical volatility."""
returns = np.diff(closes) / closes[:-1]
vol_20d = (
float(np.std(returns[-20:]) * np.sqrt(252) * 100)
if len(returns) >= 20
else 0.0
)
vol_60d = (
float(np.std(returns[-60:]) * np.sqrt(252) * 100)
if len(returns) >= 60
else 0.0
)
return {
"vol_20d": round(vol_20d, 2),
"vol_60d": round(vol_60d, 2),
}
@staticmethod
def _calculate_trend_metrics(closes: np.ndarray) -> dict:
"""Calculate trend strength using linear regression."""
if len(closes) < 20:
return {"slope": 0.0, "r_squared": 0.0}
x = np.arange(20)
y = closes[-20:]
slope, intercept, r_value, _, _ = stats.linregress(x, y)
return {
"slope": round(float(slope), 4),
"r_squared": round(float(r_value**2), 4),
}
@staticmethod
def _calculate_consecutive_days(closes: np.ndarray) -> dict:
"""Count consecutive up/down days."""
if len(closes) < 2:
return {"up": 0, "down": 0}
changes = np.diff(closes)
# Count consecutive days in same direction
up_days = 0
down_days = 0
for change in reversed(changes):
if change > 0:
up_days += 1
if down_days > 0:
break
elif change < 0:
down_days += 1
if up_days > 0:
break
else:
break
return {
"up": up_days if down_days == 0 else 0,
"down": down_days if up_days == 0 else 0,
}

View File

@ -22,6 +22,89 @@ class TickerOHLCV(SQLModel, table=True):
sa_relationship_kwargs={"uselist": False}, sa_relationship_kwargs={"uselist": False},
) )
context: Optional["TickerContext"] = Relationship(
back_populates="ohlcv",
sa_relationship_kwargs={"uselist": False},
)
class TickerContext(SQLModel, table=True):
"""
Enhanced metadata for each ticker/date providing multi-timeframe context,
volume analysis, and price metrics that complement technical indicators.
This table is calculated AFTER OHLCV data is saved and provides the
missing context identified in the trading analysis feedback.
"""
__tablename__ = "context"
__table_args__ = (
ForeignKeyConstraint(["ticker", "date"], ["ohlcv.ticker", "ohlcv.date"]),
)
ticker: str = Field(primary_key=True)
date: datetime = Field(primary_key=True, index=True)
# ========================================================================
# MOVING AVERAGES (Multi-Timeframe Context)
# ========================================================================
sma_20: float # 20-day Simple Moving Average
sma_50: float # 50-day Simple Moving Average
sma_200: float # 200-day Simple Moving Average
ema_20: float # 20-day Exponential Moving Average
ema_50: float # 50-day Exponential Moving Average
ema_200: float # 200-day Exponential Moving Average
# Distance from moving averages (%)
dist_sma_20_pct: float # % distance from 20-day SMA
dist_sma_50_pct: float # % distance from 50-day SMA
dist_sma_200_pct: float # % distance from 200-day SMA
# ========================================================================
# PRICE CHANGES (Historical Performance)
# ========================================================================
change_1d_pct: float # 1-day price change %
change_5d_pct: float # 5-day price change %
change_10d_pct: float # 10-day price change %
change_20d_pct: float # 20-day (1 month) price change %
change_60d_pct: float # 60-day (3 month) price change %
# ========================================================================
# VOLUME ANALYSIS
# ========================================================================
volume_20d_avg: float # 20-day average volume
volume_relative_pct: float # Current volume vs 20-day avg (%)
volume_5d_trend: float # 5-day volume trend (slope)
# ========================================================================
# SWING POINTS (Support/Resistance from Price Action)
# ========================================================================
swing_high_20d: float # Highest high in last 20 days
swing_high_20d_date: datetime # Date of swing high
swing_low_20d: float # Lowest low in last 20 days
swing_low_20d_date: datetime # Date of swing low
position_in_range_pct: float # Position between swing low/high (%)
swing_high_60d: float # Highest high in last 60 days
swing_low_60d: float # Lowest low in last 60 days
# ========================================================================
# VOLATILITY CONTEXT
# ========================================================================
atr_pct_of_price: float # ATR as % of current price
volatility_20d: float # 20-day historical volatility (std dev of returns)
volatility_60d: float # 60-day historical volatility
# ========================================================================
# TREND STRENGTH
# ========================================================================
trend_strength_20d: float # Linear regression slope (20-day)
trend_r_squared_20d: float # R² of trend line (how linear the trend is)
consecutive_up_days: int # Consecutive days closing higher
consecutive_down_days: int # Consecutive days closing lower
ohlcv: TickerOHLCV = Relationship(back_populates="context")
class IndicatorsData(SQLModel, table=True): class IndicatorsData(SQLModel, table=True):
__tablename__ = "indicators" __tablename__ = "indicators"

View File

@ -9,6 +9,7 @@ from paperone.utils import (
from paperone.database import TradingDataCRUD from paperone.database import TradingDataCRUD
from paperone.indicators import IndicatorService from paperone.indicators import IndicatorService
from paperone.client import Fetcher from paperone.client import Fetcher
from paperone.metadata import TickerContextService
from rich.progress import track from rich.progress import track
from datetime import datetime from datetime import datetime
@ -21,6 +22,7 @@ def main() -> NoReturn:
fetcher = Fetcher() fetcher = Fetcher()
crud = TradingDataCRUD(f"sqlite:///{DB_FILE}") crud = TradingDataCRUD(f"sqlite:///{DB_FILE}")
ind = IndicatorService(crud) ind = IndicatorService(crud)
meta = TickerContextService(crud)
date = datetime.now() date = datetime.now()
days_range = 360 * 10 days_range = 360 * 10
@ -72,6 +74,15 @@ def main() -> NoReturn:
ind.calculate_and_save_indicators(ticker=ticker, target_date=calc_date) ind.calculate_and_save_indicators(ticker=ticker, target_date=calc_date)
for calc_date in track(ohlcv_dates, description=f"{ticker} Metadata"):
existing_metadata = crud.get_context(ticker, calc_date)
if existing_metadata:
continue
meta.calculate_and_save_metadata(ticker=ticker, target_date=calc_date)
exit(0) exit(0)