Writing a Custom Data Adapter
Data adapters fetch market data from external sources (APIs, databases, files). All adapters implement the DataSourceInterface abstract base class.
DataSourceInterface
Location: src/core/interfaces.py
from abc import ABC, abstractmethod
from datetime import datetime
from src.core.models import OHLCV, FundamentalData
class DataSourceInterface(ABC):
@abstractmethod
async def get_historical_bars(
self, symbol: str, start: datetime, end: datetime, timeframe: str = "1Day"
) -> list[OHLCV]:
"""Fetch historical OHLCV bars for a symbol."""
...
@abstractmethod
async def get_latest_bar(self, symbol: str) -> OHLCV | None:
"""Fetch the most recent OHLCV bar for a symbol."""
...
@abstractmethod
async def get_fundamentals(self, symbol: str) -> FundamentalData | None:
"""Fetch fundamental financial metrics for a symbol."""
...
@abstractmethod
async def health_check(self) -> bool:
"""Check if the data source is reachable and healthy."""
...
Example: Custom CSV Data Adapter
Use Case
Load historical data from local CSV files (useful for offline development or custom datasets).
Implementation
Location: src/data/adapters/csv_adapter.py
from __future__ import annotations
import csv
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import structlog
from src.core.interfaces import DataSourceInterface
from src.core.models import OHLCV, FundamentalData
logger = structlog.get_logger(__name__)
class CSVDataAdapter(DataSourceInterface):
"""Data adapter for loading OHLCV data from CSV files.
Expected directory structure:
data/
AAPL.csv
MSFT.csv
...
CSV format:
timestamp,open,high,low,close,volume
2024-01-02 00:00:00+00:00,180.50,182.00,180.00,181.50,50000000
"""
def __init__(self, data_dir: str | Path) -> None:
self._data_dir = Path(data_dir)
if not self._data_dir.exists():
raise ValueError(f"Data directory does not exist: {data_dir}")
logger.info("csv_adapter_initialized", data_dir=str(self._data_dir))
def _csv_path(self, symbol: str) -> Path:
"""Return the CSV file path for a symbol."""
return self._data_dir / f"{symbol}.csv"
def _parse_timestamp(self, ts_str: str) -> datetime:
"""Parse timestamp string to datetime."""
# Try ISO format first
try:
return datetime.fromisoformat(ts_str)
except ValueError:
pass
# Try common date format
try:
dt = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S")
return dt.replace(tzinfo=timezone.utc)
except ValueError:
pass
# Try date-only format
dt = datetime.strptime(ts_str, "%Y-%m-%d")
return dt.replace(tzinfo=timezone.utc)
async def get_historical_bars(
self, symbol: str, start: datetime, end: datetime, timeframe: str = "1Day"
) -> list[OHLCV]:
"""Load historical bars from CSV file."""
csv_path = self._csv_path(symbol)
if not csv_path.exists():
logger.warning("csv_file_not_found", symbol=symbol, path=str(csv_path))
return []
bars: list[OHLCV] = []
try:
with open(csv_path, "r") as f:
reader = csv.DictReader(f)
for row in reader:
timestamp = self._parse_timestamp(row["timestamp"])
# Filter to requested time range
if not (start <= timestamp <= end):
continue
bar = OHLCV(
symbol=symbol,
timestamp=timestamp,
open=float(row["open"]),
high=float(row["high"]),
low=float(row["low"]),
close=float(row["close"]),
volume=int(row["volume"]),
adjusted_close=float(row.get("adjusted_close", row["close"])),
source="csv",
)
bars.append(bar)
logger.info("csv_bars_loaded", symbol=symbol, count=len(bars))
return bars
except Exception as exc:
logger.error("csv_load_failed", symbol=symbol, error=str(exc))
raise
async def get_latest_bar(self, symbol: str) -> OHLCV | None:
"""Return the most recent bar from CSV."""
csv_path = self._csv_path(symbol)
if not csv_path.exists():
return None
try:
# Read last line of file (most recent bar if sorted ascending)
with open(csv_path, "rb") as f:
# Seek to end and read backwards to find last line
f.seek(0, 2)
file_size = f.tell()
if file_size == 0:
return None
# Read last 1KB (should contain last line)
f.seek(max(0, file_size - 1024))
lines = f.read().decode("utf-8").splitlines()
if len(lines) < 2: # Header + at least one data row
return None
# Parse last line
last_line = lines[-1]
row = dict(zip(lines[0].split(","), last_line.split(",")))
return OHLCV(
symbol=symbol,
timestamp=self._parse_timestamp(row["timestamp"]),
open=float(row["open"]),
high=float(row["high"]),
low=float(row["low"]),
close=float(row["close"]),
volume=int(row["volume"]),
source="csv",
)
except Exception as exc:
logger.error("csv_latest_bar_failed", symbol=symbol, error=str(exc))
return None
async def get_fundamentals(self, symbol: str) -> FundamentalData | None:
"""CSV adapter does not provide fundamental data."""
return None
async def health_check(self) -> bool:
"""Check if data directory is accessible."""
try:
return self._data_dir.exists() and self._data_dir.is_dir()
except Exception:
return False
Rate Limiting
For API-based adapters, implement rate limiting:
from src.data.rate_limiter import RateLimiter
class APIDataAdapter(DataSourceInterface):
def __init__(self, api_key: str) -> None:
self._api_key = api_key
# 5 requests per minute
self._rate_limiter = RateLimiter(
name="custom_api",
max_tokens=5,
refill_rate=5 / 60, # 5 per minute = 0.083 per second
)
async def get_historical_bars(
self, symbol: str, start: datetime, end: datetime, timeframe: str = "1Day"
) -> list[OHLCV]:
# Wait for rate limiter
await self._rate_limiter.acquire()
# Make API call
response = await self._fetch_from_api(symbol, start, end)
return self._parse_response(response)
RateLimiter usage:
await rate_limiter.acquire()— Blocks until a token is available- Tokens refill automatically based on
refill_rate - Backed by Redis for distributed rate limiting
Error Handling
Transient Errors
Retry with exponential backoff:
import asyncio
async def get_historical_bars(self, symbol: str, start: datetime, end: datetime, timeframe: str = "1Day") -> list[OHLCV]:
for attempt in range(3):
try:
await self._rate_limiter.acquire()
response = await self._api_call(symbol, start, end)
return self._parse_response(response)
except (ConnectionError, TimeoutError) as exc:
if attempt == 2: # Last attempt
raise
wait = 2 ** attempt # 1s, 2s, 4s
logger.warning("api_retry", symbol=symbol, attempt=attempt + 1, wait=wait, error=str(exc))
await asyncio.sleep(wait)
Invalid Data
Return empty list rather than raising:
async def get_historical_bars(...) -> list[OHLCV]:
try:
response = await self._api_call(symbol, start, end)
if not response or "error" in response:
logger.warning("api_no_data", symbol=symbol)
return [] # Empty list, not an error
return self._parse_response(response)
except Exception as exc:
logger.error("api_call_failed", symbol=symbol, error=str(exc))
raise # Re-raise unexpected errors
Testing
Unit Test
Location: tests/unit/test_csv_adapter.py
import pytest
from datetime import datetime, timezone
from pathlib import Path
from src.data.adapters.csv_adapter import CSVDataAdapter
@pytest.fixture
def sample_csv(tmp_path: Path) -> Path:
"""Create a sample CSV file."""
csv_file = tmp_path / "AAPL.csv"
csv_file.write_text(
"timestamp,open,high,low,close,volume\n"
"2024-01-02,180.50,182.00,180.00,181.50,50000000\n"
"2024-01-03,181.50,183.00,181.00,182.50,60000000\n"
)
return tmp_path
@pytest.mark.asyncio
async def test_csv_adapter_historical_bars(sample_csv):
adapter = CSVDataAdapter(sample_csv)
bars = await adapter.get_historical_bars(
symbol="AAPL",
start=datetime(2024, 1, 1, tzinfo=timezone.utc),
end=datetime(2024, 1, 31, tzinfo=timezone.utc),
)
assert len(bars) == 2
assert bars[0].symbol == "AAPL"
assert bars[0].close == 181.50
assert bars[1].close == 182.50
@pytest.mark.asyncio
async def test_csv_adapter_latest_bar(sample_csv):
adapter = CSVDataAdapter(sample_csv)
latest = await adapter.get_latest_bar("AAPL")
assert latest is not None
assert latest.symbol == "AAPL"
assert latest.close == 182.50 # Last row
@pytest.mark.asyncio
async def test_csv_adapter_health_check(sample_csv):
adapter = CSVDataAdapter(sample_csv)
is_healthy = await adapter.health_check()
assert is_healthy is True
Registration
Option 1: Configuration-Driven
Add to config/settings.yaml:
data:
sources:
- name: csv
adapter: src.data.adapters.csv_adapter.CSVDataAdapter
config:
data_dir: /path/to/csv/files
Option 2: Programmatic
from src.data.manager import DataManager
from src.data.adapters.csv_adapter import CSVDataAdapter
manager = DataManager()
csv_adapter = CSVDataAdapter("/path/to/csv/files")
manager.register_adapter("csv", csv_adapter)
# Use adapter
bars = await manager.get_historical_bars(
symbol="AAPL",
start=datetime(2024, 1, 1, tzinfo=timezone.utc),
end=datetime(2024, 12, 31, tzinfo=timezone.utc),
source="csv",
)
Real-World Example: Alpha Vantage Adapter
Reference implementation at src/data/adapters/alpha_vantage_adapter.py:
Key features:
- Rate limiting (5 req/min free tier)
- Retry with exponential backoff
- API error handling
- Response parsing and normalization
- Health check via test API call
Study this adapter for best practices on API integration.
Best Practices
- Always implement rate limiting for API adapters (avoid bans)
- Retry transient errors (network issues, timeouts) with exponential backoff
- Return empty list for “no data” rather than raising exceptions
- Log all API calls for debugging and audit trail
- Normalize timestamps to UTC — use timezone-aware
datetimeobjects - Cache responses for frequently accessed data (use Redis with TTL)
- Test health check — ensure it’s fast (<1s) and reliable
- Handle API changes — version your adapter, don’t assume API stability
- Document data format — CSV schema, API response structure, units (USD, shares, etc.)
- Respect provider TOS — don’t exceed rate limits, don’t resell data
Advanced: WebSocket Feed Adapter
For real-time streaming data:
from src.core.interfaces import DataSourceInterface
class WebSocketFeedAdapter(DataSourceInterface):
def __init__(self, ws_url: str) -> None:
self._ws_url = ws_url
self._ws = None
self._subscribers: dict[str, list[callable]] = {}
async def connect(self) -> None:
"""Establish WebSocket connection."""
self._ws = await websockets.connect(self._ws_url)
asyncio.create_task(self._listen())
async def _listen(self) -> None:
"""Listen for messages and dispatch to subscribers."""
async for message in self._ws:
data = json.loads(message)
symbol = data["symbol"]
if symbol in self._subscribers:
bar = self._parse_bar(data)
for callback in self._subscribers[symbol]:
await callback(bar)
async def subscribe(self, symbol: str, callback: callable) -> None:
"""Subscribe to real-time updates for a symbol."""
if symbol not in self._subscribers:
self._subscribers[symbol] = []
self._subscribers[symbol].append(callback)
# Send subscription message to server
await self._ws.send(json.dumps({"action": "subscribe", "symbol": symbol}))
See src/data/feeds/ibkr_feed.py for a complete WebSocket feed implementation.
Next Steps
- Read existing adapters in
src/data/adapters/for examples - Check Data Pipeline for integration points
- Review ADR-002 for data source selection criteria