Scheduling Module
The src/scheduling/ module orchestrates automated data refresh and model retraining via APScheduler cron jobs: daily bars (5pm ET weekdays), weekly fundamentals (6am Saturday), biweekly alternative data (7am 1st/15th), and weekly XGBoost retraining (2am Sunday). All jobs are idempotent via Redis tracking keys.
Purpose
The scheduler provides:
- APScheduler integration with 4 cron jobs for automated data pipeline
- Idempotency via Redis keys (prevents duplicate work on retries)
- Manual triggers via API endpoints (for on-demand execution)
- Lazy imports in job functions (avoids load-time errors)
- Market hours safety (prevents intraday IBKR requests outside 9:30am-5:30pm ET)
Key Components
Scheduler Setup
TradingScheduler (src/scheduling/scheduler.py)
APScheduler instance with 4 jobs.
Initialization:
from src.scheduling.scheduler import TradingScheduler
scheduler = TradingScheduler()
scheduler.setup() # Register jobs
scheduler.start() # Start background thread
# Jobs run automatically per cron schedules
# On shutdown:
scheduler.stop()
Job Registration:
scheduler.add_job(
daily_bars_job,
trigger="cron",
hour=17,
minute=0,
day_of_week="mon-fri",
timezone="America/New_York",
id="daily_bars",
replace_existing=True
)
Configuration (config/settings.yaml):
scheduler:
enabled: true
daily_bars_cron: "0 17 * * 1-5" # 5pm ET Mon-Fri
weekly_fundamentals_cron: "0 6 * * 6" # 6am ET Saturday
biweekly_altdata_cron: "0 7 1,15 * *" # 7am ET 1st and 15th
weekly_retrain_cron: "0 2 * * 0" # 2am ET Sunday
Job Implementations
All jobs in src/scheduling/jobs.py.
1. daily_bars_job
Schedule: 5pm ET Monday-Friday (after US market close at 4pm ET).
Purpose: Fetch daily OHLCV bars for all S&P 500 symbols.
Data Source: AlphaVantageAdapter (primary) or IBKRDataAdapter (fallback).
Logic:
async def daily_bars_job() -> None:
logger.info("job.daily_bars.start")
# Lazy imports to avoid load-time errors
from src.core.redis import get_redis
from src.data.adapters.alpha_vantage_adapter import AlphaVantageAdapter
from src.data.storage import TimeSeriesStorage
from src.data.universe import SymbolUniverse
universe = SymbolUniverse()
symbols = await universe.get_symbols()
today = date.today()
end_dt = datetime(today.year, today.month, today.day, 23, 59, 59, tzinfo=timezone.utc)
start_dt = end_dt - timedelta(days=7) # 7-day lookback (covers weekends + holidays)
redis = await get_redis()
done_key = f"jobs:daily_bars:{today.isoformat()}:done"
av = AlphaVantageAdapter()
storage = TimeSeriesStorage()
fetched = skipped = errors = 0
for symbol in symbols:
# Idempotency check
if await redis.sismember(done_key, symbol):
skipped += 1
continue
try:
bars = await av.get_historical_bars(symbol, start_dt, end_dt)
if bars:
await storage.store_ohlcv(bars)
await redis.sadd(done_key, symbol)
await redis.expire(done_key, 90_000) # 25 hours TTL
fetched += 1
except Exception:
logger.warning("job.daily_bars.symbol_error", symbol=symbol, exc_info=True)
errors += 1
logger.info("job.daily_bars.complete", symbols=len(symbols), fetched=fetched, skipped=skipped, errors=errors)
Idempotency Key: jobs:daily_bars:{date}:done (Redis set containing completed symbols).
TTL: 25 hours (expires before next day’s run).
2. weekly_fundamentals_job
Schedule: 6am ET Saturday (after market close Friday, before Sunday retraining).
Purpose: Fetch fundamental data (P/E, ROE, debt-to-equity, etc.) for all S&P 500 symbols.
Data Source: AlphaVantageAdapter (Company Overview + Income Statement).
Logic:
async def weekly_fundamentals_job() -> None:
logger.info("job.weekly_fundamentals.start")
from src.core.redis import get_redis
from src.data.adapters.alpha_vantage_adapter import AlphaVantageAdapter
from src.data.storage import TimeSeriesStorage
from src.data.universe import SymbolUniverse
universe = SymbolUniverse()
symbols = await universe.get_symbols()
today = date.today()
redis = await get_redis()
week_key = f"{today.year}-W{today.isocalendar()[1]}" # "2024-W12"
done_key = f"jobs:weekly_fundamentals:{week_key}:done"
# Check if job already ran this week
if await redis.get(done_key):
logger.info("job.weekly_fundamentals.already_done", week=week_key)
return
av = AlphaVantageAdapter()
storage = TimeSeriesStorage()
fetched = errors = 0
for symbol in symbols:
try:
fundamentals = await av.get_fundamentals(symbol)
if fundamentals:
await storage.store_fundamentals([fundamentals])
fetched += 1
except Exception:
logger.warning("job.weekly_fundamentals.symbol_error", symbol=symbol, exc_info=True)
errors += 1
await redis.set(done_key, "1", ex=8 * 86400) # 8 days TTL
logger.info("job.weekly_fundamentals.complete", symbols=len(symbols), fetched=fetched, errors=errors)
Idempotency Key: jobs:weekly_fundamentals:{year}-W{week_number}:done (Redis string).
TTL: 8 days (expires before next weekly run).
3. biweekly_altdata_job
Schedule: 7am ET on 1st and 15th of each month.
Purpose: Fetch alternative data: SEC Form 4 (insider trades) and FINRA short interest.
Data Sources:
EdgarAdapterfor Form 4 filings (insider transactions)FinraAdapterfor short interest reports
Logic:
async def biweekly_altdata_job() -> None:
logger.info("job.biweekly_altdata.start")
from src.core.redis import get_redis
from src.data.adapters.edgar_adapter import EdgarAdapter
from src.data.adapters.finra_adapter import FinraAdapter
from src.data.storage import TimeSeriesStorage
from src.data.universe import SymbolUniverse
redis = await get_redis()
last_run = await redis.get("jobs:altdata:last_run")
# Only run if > 12 days since last run (prevents double-trigger)
if last_run:
last_run_dt = datetime.fromisoformat(last_run)
if (datetime.now(timezone.utc) - last_run_dt).days < 12:
logger.info("job.biweekly_altdata.skipped_recent_run")
return
universe = SymbolUniverse()
symbols = await universe.get_symbols()
end = datetime.now(timezone.utc)
start = end - timedelta(days=30) # 30-day lookback
edgar = EdgarAdapter()
finra = FinraAdapter()
storage = TimeSeriesStorage()
insider_count = short_count = 0
for symbol in symbols:
try:
# Insider transactions (Form 4)
insider_txns = await edgar.get_insider_transactions(symbol, start, end)
if insider_txns:
await storage.store_insider_transactions(insider_txns)
insider_count += len(insider_txns)
# Short interest
short_interest = await finra.get_short_interest(symbol, start, end)
if short_interest:
await storage.store_short_interest(short_interest)
short_count += len(short_interest)
except Exception:
logger.warning("job.biweekly_altdata.symbol_error", symbol=symbol, exc_info=True)
await redis.set("jobs:altdata:last_run", datetime.now(timezone.utc).isoformat())
logger.info("job.biweekly_altdata.complete", insider_txns=insider_count, short_reports=short_count)
Idempotency Key: jobs:altdata:last_run (Redis string with ISO timestamp).
Frequency Guard: Only runs if > 12 days since last run (prevents accidental double-trigger on 1st/15th).
4. weekly_retrain_job
Schedule: 2am ET Sunday (after weekly fundamentals on Saturday).
Purpose: Retrain XGBoost model with latest feature data.
Logic:
async def weekly_retrain_job() -> None:
logger.info("job.weekly_retrain.start")
from src.signals.feature_store import FeatureStore
from src.signals.ml.pipeline import MLPipeline
from src.signals.ml.registry import ModelRegistry
from src.data.universe import SymbolUniverse
universe = SymbolUniverse()
symbols = await universe.get_symbols()
# 1. Load features (last 2 years)
feature_store = FeatureStore()
end = datetime.now(timezone.utc).date().isoformat()
start = (datetime.now(timezone.utc) - timedelta(days=730)).date().isoformat()
features = feature_store.get_features(symbols, start, end)
if features.empty or len(features) < 500:
logger.warning("job.weekly_retrain.insufficient_data", rows=len(features))
return
# 2. Prepare target (forward returns)
pipeline = MLPipeline()
target = pipeline.prepare_target(features, close_col="close")
# 3. Train model
metrics = pipeline.train(features, target)
# 4. Save model to disk
version_id = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
model_path = f"models/xgb_{version_id}.pkl"
pipeline.save(model_path)
# 5. Register with model registry
registry = ModelRegistry(models_dir="models")
version = registry.register(version_id, model_path, metrics)
# 6. Promote to active if accuracy > 0.55
if metrics.get("accuracy", 0.0) > 0.55:
registry.promote(version_id)
logger.info("job.weekly_retrain.promoted", version=version_id, accuracy=metrics["accuracy"])
else:
logger.warning("job.weekly_retrain.low_accuracy", version=version_id, accuracy=metrics["accuracy"])
logger.info("job.weekly_retrain.complete", version=version_id, metrics=metrics)
Idempotency: Models are versioned by timestamp. If retrain runs twice, two versions are created (no data corruption).
Auto-Promotion: Model is promoted to active if accuracy > 0.55 (configurable threshold).
Market Hours Safety
is_market_hours_request_safe (src/scheduling/jobs.py)
Prevents intraday IBKR historical data requests outside US equity market hours.
Rationale:
- IBKR rejects intraday bar requests (1Min, 5Min, 15Min, 1Hour) outside market hours (9:30am-5:30pm ET Mon-Fri).
- Daily/weekly/monthly bars are always safe to request.
Usage:
from src.scheduling.jobs import is_market_hours_request_safe
if not is_market_hours_request_safe(timeframe="5Min"):
logger.warning("intraday_request_skipped_outside_market_hours")
return # Skip request
# Safe to request intraday bars
bars = await ibkr_adapter.get_historical_bars(symbol, start, end, timeframe="5Min")
Logic:
- If timeframe is “1Day”, “1Week”, “1Month”: return True (always safe).
- If timeframe is “1Min”, “5Min”, “15Min”, “1Hour”:
- Check if current time (US/Eastern) is within 9:30am-5:30pm Mon-Fri.
- Return True if within hours, False otherwise.
Manual Triggers
Jobs can be triggered manually via API endpoints (useful for testing, backfills, urgent updates).
API Endpoints:
POST /api/system/scheduler/trigger/daily_bars
POST /api/system/scheduler/trigger/weekly_fundamentals
POST /api/system/scheduler/trigger/biweekly_altdata
POST /api/system/scheduler/trigger/weekly_retrain
Example:
curl -X POST http://localhost:8000/api/system/scheduler/trigger/daily_bars
Implementation (src/api/routes/system.py):
@router.post("/scheduler/trigger/{job_name}")
async def trigger_job(job_name: str):
scheduler = request.app.state.scheduler
if not scheduler:
raise HTTPException(500, "Scheduler not running")
job_map = {
"daily_bars": daily_bars_job,
"weekly_fundamentals": weekly_fundamentals_job,
"biweekly_altdata": biweekly_altdata_job,
"weekly_retrain": weekly_retrain_job
}
if job_name not in job_map:
raise HTTPException(404, f"Job '{job_name}' not found")
# Run job in background task (non-blocking)
asyncio.create_task(job_map[job_name]())
return {"status": "triggered", "job": job_name}
Configuration
Settings (config/settings.yaml):
scheduler:
enabled: true
daily_bars_cron: "0 17 * * 1-5" # 5pm ET Mon-Fri
weekly_fundamentals_cron: "0 6 * * 6" # 6am ET Saturday
biweekly_altdata_cron: "0 7 1,15 * *" # 7am ET 1st and 15th
weekly_retrain_cron: "0 2 * * 0" # 2am ET Sunday
Environment Variable Overrides:
export SA_SCHEDULER__ENABLED=false # Disable scheduler (manual triggers only)
export SA_SCHEDULER__DAILY_BARS_CRON="0 18 * * 1-5" # Change to 6pm ET
Integration with Other Modules
- Data Ingestion (
src/data/): Jobs fetch data via adapters (AlphaVantageAdapter,EdgarAdapter,FinraAdapter). - ML Pipeline (
src/signals/):weekly_retrain_jobtrains XGBoost model with latest features. - API (
src/api/routes/system.py): Exposes manual trigger endpoints. - Monitoring (
src/monitoring/): Prometheus metrics for job success/failure rates, execution time.
Critical Patterns
- Idempotency: All jobs track completion in Redis. Safe to retry on failure.
- Lazy imports: Adapters and storage imported inside job functions, not at module top-level.
- 7-day lookback for daily bars: Covers weekends and market holidays (e.g., Monday fetches Fri-Thu bars).
- Market hours safety: Intraday IBKR requests only during 9:30am-5:30pm ET Mon-Fri.
- TTL on Redis keys: Keys expire after next expected run (prevents stale data).
- Graceful degradation: If AlphaVantage fails, jobs log warnings but don’t crash the system.
- Timezone-aware: All cron schedules use
America/New_Yorktimezone.