Skip to content

asset-history Chunk 2 — Source Adapters, Reconcile, CLI, Notify

Section titled “asset-history Chunk 2 — Source Adapters, Reconcile, CLI, Notify”

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Build the four data source adapters, conflict reconciliation engine, notification wrapper, and full Typer CLI — so asset-history fetch XIC.TO --source yahoo works end-to-end and pytest -m "not slow" passes.

Architecture: Source adapters implement a common BaseSource ABC. reconcile.py reads price_sources, detects conflicts, writes to prices, appends to data/conflicts.log. notify.py delegates to the existing notify_manager utility. cli.py wires all commands via Typer. Tests mock HTTP — no live API calls except @pytest.mark.slow tests.

Tech Stack: Python 3.11+, yfinance, requests, Typer, pytest-mock, sqlite3 (stdlib)

Prerequisite: Chunk 1 complete — pytest tests/test_db.py passes, virtual environment active.

Spec: /mnt/d/FSS/KB/Business/_WorkingOn/Projects/asset-history/asset-history-design.md


ActionFileResponsibility
Createsrc/asset_history/sources/__init__.pyPackage marker
Createsrc/asset_history/sources/base.pyPriceRecord dataclass + BaseSource ABC
Createsrc/asset_history/sources/yahoo.pyyfinance adapter — primary source
Createsrc/asset_history/sources/bank_of_canada.pyBoC valet API — CA price validation
Createsrc/asset_history/sources/tiingo.pyTiingo — US dividend cross-reference
Createsrc/asset_history/sources/fred.pyFRED API — US price validation
Createsrc/asset_history/reconcile.pyConflict detection + prices table writer
Createsrc/asset_history/notify.pynotify_manager wrapper with graceful fallback
Createsrc/asset_history/cli.pyAll Typer CLI commands
Createtests/fixtures/xic_sample.jsonSynthetic XIC.TO data for unit tests
Createtests/fixtures/spy_sample.jsonSynthetic SPY data for unit tests
Createtests/test_sources_mock.pyAdapter parsing tests (mocked HTTP)
Createtests/test_reconcile.pyConflict detection tests (synthetic data)
Createtests/test_cli.pyCLI command tests (temp DB, no network)
Createtests/test_sources_live.pyLive integration tests (@pytest.mark.slow)

Pre-Condition: Discover Bank of Canada TSX Series ID

Section titled “Pre-Condition: Discover Bank of Canada TSX Series ID”

Before writing bank_of_canada.py, identify the exact series ID for the TSX Composite index.

  • Run this command and find the TSX equity series:
Terminal window
curl -s "https://www.bankofcanada.ca/valet/lists/series/json" \
| python3 -m json.tool \
| grep -i -B1 -A3 "tsx\|equity\|stock\|composite"
  • Record the series ID — look for a daily equity/stock market series. It will be a code like FXVECTOR_STOX or similar. If you find multiple candidates, pick the daily closing value for the S&P/TSX Composite Index.

  • Verify the series returns data:

Terminal window
SERIES_ID="<series_id_you_found>"
curl -s "https://www.bankofcanada.ca/valet/observations/${SERIES_ID}/json?recent=5" \
| python3 -m json.tool

Expected: JSON with an observations array containing recent daily values.

  • If no suitable daily TSX series is found in BoC: Use ^GSPTSE from Stooq as the CA validation source instead. Stooq’s TSX data has been independently verified as not proxying Yahoo Finance for this index. Change the source name to 'stooq' in the adapter and update TICKER_TO_STOOQ_SYMBOL accordingly.

Files:

  • Create: tests/fixtures/xic_sample.json

  • Create: tests/fixtures/spy_sample.json

  • Step 1.1: Create tests/fixtures/xic_sample.json

Synthetic data — 10 trading days with one dividend event. Used by reconcile tests.

{
"ticker": "XIC.TO",
"yahoo": [
{"date": "2024-01-02", "close": 28.00, "adj_close": 28.00, "dividend_amount": 0.0},
{"date": "2024-01-03", "close": 28.14, "adj_close": 28.14, "dividend_amount": 0.0},
{"date": "2024-01-04", "close": 28.07, "adj_close": 28.07, "dividend_amount": 0.0},
{"date": "2024-01-05", "close": 27.95, "adj_close": 27.95, "dividend_amount": 0.0},
{"date": "2024-01-08", "close": 28.10, "adj_close": 28.10, "dividend_amount": 0.0},
{"date": "2024-01-09", "close": 28.10, "adj_close": 27.84, "dividend_amount": 0.26},
{"date": "2024-01-10", "close": 27.90, "adj_close": 27.65, "dividend_amount": 0.0},
{"date": "2024-01-11", "close": 28.01, "adj_close": 27.76, "dividend_amount": 0.0},
{"date": "2024-01-12", "close": 28.15, "adj_close": 27.90, "dividend_amount": 0.0},
{"date": "2024-01-15", "close": 28.08, "adj_close": 27.83, "dividend_amount": 0.0}
],
"bank_of_canada": [
{"date": "2024-01-02", "close": 21100.0, "adj_close": null, "dividend_amount": 0.0},
{"date": "2024-01-03", "close": 21215.0, "adj_close": null, "dividend_amount": 0.0},
{"date": "2024-01-04", "close": 21160.0, "adj_close": null, "dividend_amount": 0.0},
{"date": "2024-01-05", "close": 21070.0, "adj_close": null, "dividend_amount": 0.0},
{"date": "2024-01-08", "close": 21180.0, "adj_close": null, "dividend_amount": 0.0},
{"date": "2024-01-09", "close": 21180.0, "adj_close": null, "dividend_amount": 0.0},
{"date": "2024-01-10", "close": 21030.0, "adj_close": null, "dividend_amount": 0.0},
{"date": "2024-01-11", "close": 21112.0, "adj_close": null, "dividend_amount": 0.0},
{"date": "2024-01-12", "close": 21220.0, "adj_close": null, "dividend_amount": 0.0},
{"date": "2024-01-15", "close": 21167.0, "adj_close": null, "dividend_amount": 0.0}
]
}
  • Step 1.2: Create tests/fixtures/spy_sample.json

Synthetic data — 10 trading days with one conflict case embedded.

{
"ticker": "SPY",
"yahoo": [
{"date": "2024-01-02", "close": 474.00, "adj_close": 474.00, "dividend_amount": 0.0},
{"date": "2024-01-03", "close": 470.00, "adj_close": 470.00, "dividend_amount": 0.0},
{"date": "2024-01-04", "close": 473.00, "adj_close": 473.00, "dividend_amount": 0.0},
{"date": "2024-01-05", "close": 471.00, "adj_close": 471.00, "dividend_amount": 0.0},
{"date": "2024-01-08", "close": 476.00, "adj_close": 476.00, "dividend_amount": 0.0},
{"date": "2024-01-09", "close": 475.00, "adj_close": 473.52, "dividend_amount": 1.48},
{"date": "2024-01-10", "close": 477.00, "adj_close": 475.52, "dividend_amount": 0.0},
{"date": "2024-01-11", "close": 478.00, "adj_close": 476.52, "dividend_amount": 0.0},
{"date": "2024-01-12", "close": 480.00, "adj_close": 478.52, "dividend_amount": 0.0},
{"date": "2024-01-15", "close": 479.00, "adj_close": 477.52, "dividend_amount": 0.0}
],
"tiingo": [
{"date": "2024-01-02", "close": 474.00, "adj_close": 474.00, "dividend_amount": 0.0},
{"date": "2024-01-03", "close": 470.00, "adj_close": 470.00, "dividend_amount": 0.0},
{"date": "2024-01-04", "close": 473.00, "adj_close": 473.00, "dividend_amount": 0.0},
{"date": "2024-01-05", "close": 471.00, "adj_close": 471.00, "dividend_amount": 0.0},
{"date": "2024-01-08", "close": 476.00, "adj_close": 476.00, "dividend_amount": 0.0},
{"date": "2024-01-09", "close": 475.00, "adj_close": 473.52, "dividend_amount": 1.48},
{"date": "2024-01-10", "close": 477.00, "adj_close": 475.52, "dividend_amount": 0.0},
{"date": "2024-01-11", "close": 478.00, "adj_close": 476.52, "dividend_amount": 0.0},
{"date": "2024-01-12", "close": 480.00, "adj_close": 478.52, "dividend_amount": 0.0},
{"date": "2024-01-15", "close": 479.00, "adj_close": 477.52, "dividend_amount": 0.0}
],
"tiingo_conflict": [
{"date": "2024-01-09", "close": 475.00, "adj_close": 473.52, "dividend_amount": 0.0}
],
"fred": [
{"date": "2024-01-02", "close": 4742.83, "adj_close": null, "dividend_amount": 0.0},
{"date": "2024-01-03", "close": 4704.81, "adj_close": null, "dividend_amount": 0.0},
{"date": "2024-01-04", "close": 4697.24, "adj_close": null, "dividend_amount": 0.0},
{"date": "2024-01-05", "close": 4697.24, "adj_close": null, "dividend_amount": 0.0},
{"date": "2024-01-08", "close": 4763.54, "adj_close": null, "dividend_amount": 0.0},
{"date": "2024-01-09", "close": 4756.50, "adj_close": null, "dividend_amount": 0.0},
{"date": "2024-01-10", "close": 4839.81, "adj_close": null, "dividend_amount": 0.0},
{"date": "2024-01-11", "close": 4783.83, "adj_close": null, "dividend_amount": 0.0},
{"date": "2024-01-12", "close": 4839.81, "adj_close": null, "dividend_amount": 0.0},
{"date": "2024-01-15", "close": 4765.98, "adj_close": null, "dividend_amount": 0.0}
]
}
  • Step 1.3: Commit fixtures
Terminal window
mkdir -p tests/fixtures
git add tests/fixtures/
git commit -m "test: add synthetic price fixtures for unit tests"

Task 2: Source adapter interface (base.py)

Section titled “Task 2: Source adapter interface (base.py)”

Files:

  • Create: src/asset_history/sources/__init__.py

  • Create: src/asset_history/sources/base.py

  • Step 2.1: Create src/asset_history/sources/__init__.py (empty)

  • Step 2.2: Create src/asset_history/sources/base.py

"""
Abstract base class for all data source adapters.
Each source adapter must implement fetch_full() and fetch_since().
Government index sources (BoC, FRED) may return adj_close=None — this is expected.
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import date
@dataclass
class PriceRecord:
"""One row of price+dividend data from a single source."""
ticker: str
date: date
close: float | None # Raw closing price (or index level for gov sources)
adj_close: float | None # Dividend/split-adjusted price; None for gov index sources
dividend_amount: float # Distribution amount on this date; 0.0 if no dividend
class BaseSource(ABC):
"""All source adapters implement this interface."""
source_name: str # class attribute — must be set on each subclass
@abstractmethod
def fetch_full(self, ticker: str) -> list[PriceRecord]:
"""
Fetch all available historical data for ticker.
Used for initial backfill. May take minutes for long histories.
Raises ValueError if ticker is not supported by this source.
"""
...
@abstractmethod
def fetch_since(self, ticker: str, since: date) -> list[PriceRecord]:
"""
Fetch records from `since` date (inclusive) to today.
Used for incremental weekly updates.
Raises ValueError if ticker is not supported by this source.
"""
...
  • Step 2.3: Commit
Terminal window
git add src/asset_history/sources/
git commit -m "feat: source adapter interface — PriceRecord dataclass + BaseSource ABC"

Files:

  • Create: src/asset_history/sources/yahoo.py

  • Test in: tests/test_sources_mock.py (added in Task 7)

  • Step 3.1: Create src/asset_history/sources/yahoo.py

"""
Yahoo Finance source adapter using yfinance.
Primary source for all tickers. Returns adj_close and dividend_amount.
yfinance returns a DataFrame; this adapter converts it to PriceRecord list.
"""
import yfinance as yf
from datetime import date, timedelta
from asset_history.sources.base import BaseSource, PriceRecord
class YahooSource(BaseSource):
source_name = "yahoo"
def fetch_full(self, ticker: str) -> list[PriceRecord]:
"""Fetch all available history (yfinance default start: 1970-01-01)."""
return self._fetch(ticker, start="1970-01-01")
def fetch_since(self, ticker: str, since: date) -> list[PriceRecord]:
"""Fetch from `since` to today."""
return self._fetch(ticker, start=since.isoformat())
def _fetch(self, ticker: str, start: str) -> list[PriceRecord]:
t = yf.Ticker(ticker)
df = t.history(start=start, auto_adjust=False)
if df.empty:
return []
# Build dividends lookup: date -> amount
dividends = {}
for idx, row in t.dividends.items():
d = idx.date() if hasattr(idx, 'date') else idx
dividends[d] = float(row)
records = []
for idx, row in df.iterrows():
d = idx.date() if hasattr(idx, 'date') else idx
records.append(PriceRecord(
ticker=ticker,
date=d,
close=float(row["Close"]),
adj_close=float(row["Adj Close"]),
dividend_amount=dividends.get(d, 0.0),
))
return records
  • Step 3.2: Commit
Terminal window
git add src/asset_history/sources/yahoo.py
git commit -m "feat: Yahoo Finance source adapter"

Files:

  • Create: src/asset_history/sources/bank_of_canada.py

Note: Use the series ID you discovered in the Pre-Condition step.

  • Step 4.1: Create src/asset_history/sources/bank_of_canada.py
"""
Bank of Canada valet API source adapter.
Provides TSX Composite index level for CA price validation.
Returns close = index level, adj_close = None (index, not ETF).
API docs: https://www.bankofcanada.ca/valet/docs
"""
import requests
from datetime import date, datetime
from asset_history.sources.base import BaseSource, PriceRecord
# Map ETF ticker → BoC series ID
# Series ID found via: GET https://www.bankofcanada.ca/valet/lists/series/json
TICKER_TO_BOC_SERIES: dict[str, str] = {
"XIC.TO": "<REPLACE_WITH_DISCOVERED_SERIES_ID>",
# Add new CA tickers here as needed
}
_BASE_URL = "https://www.bankofcanada.ca/valet"
class BankOfCanadaSource(BaseSource):
source_name = "bank_of_canada"
def fetch_full(self, ticker: str) -> list[PriceRecord]:
series = self._get_series(ticker)
return self._fetch(ticker, series, start_date="1977-01-01")
def fetch_since(self, ticker: str, since: date) -> list[PriceRecord]:
series = self._get_series(ticker)
return self._fetch(ticker, series, start_date=since.isoformat())
def _get_series(self, ticker: str) -> str:
if ticker not in TICKER_TO_BOC_SERIES:
raise ValueError(
f"No Bank of Canada series mapping for ticker '{ticker}'. "
f"Supported: {list(TICKER_TO_BOC_SERIES.keys())}"
)
return TICKER_TO_BOC_SERIES[ticker]
def _fetch(self, ticker: str, series: str, start_date: str) -> list[PriceRecord]:
url = f"{_BASE_URL}/observations/{series}/json"
resp = requests.get(url, params={"start_date": start_date}, timeout=30)
resp.raise_for_status()
data = resp.json()
records = []
for obs in data.get("observations", []):
d_str = obs["d"]
val = obs.get(series, {}).get("v")
if val is None:
continue
records.append(PriceRecord(
ticker=ticker,
date=date.fromisoformat(d_str),
close=float(val),
adj_close=None,
dividend_amount=0.0,
))
return records
  • Step 4.2: Commit
Terminal window
git add src/asset_history/sources/bank_of_canada.py
git commit -m "feat: Bank of Canada source adapter (CA TSX index validation)"

Files:

  • Create: src/asset_history/sources/tiingo.py

  • Step 5.1: Create src/asset_history/sources/tiingo.py

"""
Tiingo source adapter.
Provides US ETF price + dividend data for cross-reference.
Free tier: 500 requests/day. Full SPY history to 1993.
Rate limit: on HTTP 429, wait 60s and retry once.
"""
import os
import time
import requests
from datetime import date
from asset_history.sources.base import BaseSource, PriceRecord
class TiingoSource(BaseSource):
source_name = "tiingo"
def __init__(self):
self._api_key = os.environ.get("TIINGO_API_KEY", "")
def fetch_full(self, ticker: str) -> list[PriceRecord]:
return self._fetch(ticker, start_date="1993-01-01")
def fetch_since(self, ticker: str, since: date) -> list[PriceRecord]:
return self._fetch(ticker, start_date=since.isoformat())
def _fetch(self, ticker: str, start_date: str) -> list[PriceRecord]:
url = f"https://api.tiingo.com/tiingo/daily/{ticker}/prices"
params = {"startDate": start_date, "token": self._api_key}
resp = self._get_with_retry(url, params)
if resp is None:
return []
data = resp.json()
records = []
for row in data:
d = date.fromisoformat(row["date"][:10])
records.append(PriceRecord(
ticker=ticker,
date=d,
close=float(row.get("close") or 0),
adj_close=float(row.get("adjClose") or 0),
dividend_amount=float(row.get("divCash") or 0.0),
))
return records
def _get_with_retry(self, url: str, params: dict):
resp = requests.get(url, params=params, timeout=30)
if resp.status_code == 429:
time.sleep(60)
resp = requests.get(url, params=params, timeout=30)
if not resp.ok:
print(f"[tiingo] Request failed: {resp.status_code} — skipping")
return None
return resp
  • Step 5.2: Commit
Terminal window
git add src/asset_history/sources/tiingo.py
git commit -m "feat: Tiingo source adapter (US dividend cross-reference)"

Files:

  • Create: src/asset_history/sources/fred.py

  • Step 6.1: Create src/asset_history/sources/fred.py

"""
FRED (St. Louis Fed) source adapter.
Provides S&P 500 index level for US price validation.
Series SP500: daily close, 2012-present.
Returns close = index level, adj_close = None.
"""
import os
import requests
from datetime import date
from asset_history.sources.base import BaseSource, PriceRecord
TICKER_TO_FRED_SERIES: dict[str, str] = {
"SPY": "SP500",
"IVV": "SP500",
# Add new US tickers here as needed
}
_BASE_URL = "https://api.stlouisfed.org/fred/series/observations"
class FREDSource(BaseSource):
source_name = "fred"
def __init__(self):
self._api_key = os.environ.get("FRED_API_KEY", "")
def fetch_full(self, ticker: str) -> list[PriceRecord]:
series = self._get_series(ticker)
return self._fetch(ticker, series, observation_start="2012-01-01")
def fetch_since(self, ticker: str, since: date) -> list[PriceRecord]:
series = self._get_series(ticker)
return self._fetch(ticker, series, observation_start=since.isoformat())
def _get_series(self, ticker: str) -> str:
if ticker not in TICKER_TO_FRED_SERIES:
raise ValueError(
f"No FRED series mapping for ticker '{ticker}'. "
f"Supported: {list(TICKER_TO_FRED_SERIES.keys())}"
)
return TICKER_TO_FRED_SERIES[ticker]
def _fetch(self, ticker: str, series: str, observation_start: str) -> list[PriceRecord]:
params = {
"series_id": series,
"api_key": self._api_key,
"file_type": "json",
"observation_start": observation_start,
}
resp = requests.get(_BASE_URL, params=params, timeout=30)
resp.raise_for_status()
data = resp.json()
records = []
for obs in data.get("observations", []):
val = obs.get("value", ".")
if val == ".": # FRED uses "." for missing values
continue
records.append(PriceRecord(
ticker=ticker,
date=date.fromisoformat(obs["date"]),
close=float(val),
adj_close=None,
dividend_amount=0.0,
))
return records
  • Step 6.2: Commit
Terminal window
git add src/asset_history/sources/fred.py
git commit -m "feat: FRED source adapter (US S&P 500 index validation)"

Task 7: Source adapter tests (mocked HTTP)

Section titled “Task 7: Source adapter tests (mocked HTTP)”

Files:

  • Create: tests/test_sources_mock.py

  • Step 7.1: Write tests/test_sources_mock.py

"""
Source adapter tests — all HTTP calls mocked.
Verifies that each adapter correctly parses its source's response format
into a list of PriceRecord objects.
"""
import json
from datetime import date
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from asset_history.sources.base import PriceRecord
from asset_history.sources.yahoo import YahooSource
from asset_history.sources.bank_of_canada import BankOfCanadaSource
from asset_history.sources.tiingo import TiingoSource
from asset_history.sources.fred import FREDSource
FIXTURES = Path(__file__).parent / "fixtures"
# ---------------------------------------------------------------------------
# Yahoo Finance
# ---------------------------------------------------------------------------
class TestYahooSource:
def test_source_name(self):
assert YahooSource.source_name == "yahoo"
def test_fetch_returns_price_records(self, mocker):
import pandas as pd
import numpy as np
dates = pd.to_datetime(["2024-01-02", "2024-01-03"])
df = pd.DataFrame({
"Close": [28.00, 28.14],
"Adj Close": [28.00, 28.14],
"Open": [27.90, 28.05],
"High": [28.10, 28.20],
"Low": [27.85, 28.00],
"Volume": [1000000, 1100000],
}, index=dates)
mock_ticker = MagicMock()
mock_ticker.history.return_value = df
mock_ticker.dividends = pd.Series([], dtype=float)
mocker.patch("yfinance.Ticker", return_value=mock_ticker)
records = YahooSource().fetch_since("XIC.TO", date(2024, 1, 1))
assert len(records) == 2
assert all(isinstance(r, PriceRecord) for r in records)
assert records[0].ticker == "XIC.TO"
assert records[0].adj_close == 28.00
assert records[0].dividend_amount == 0.0
def test_dividend_date_extracted(self, mocker):
import pandas as pd
dates = pd.to_datetime(["2024-01-09"])
df = pd.DataFrame({
"Close": [28.10], "Adj Close": [27.84],
"Open": [28.05], "High": [28.15], "Low": [27.80], "Volume": [1200000],
}, index=dates)
div_index = pd.to_datetime(["2024-01-09"])
divs = pd.Series([0.26], index=div_index)
mock_ticker = MagicMock()
mock_ticker.history.return_value = df
mock_ticker.dividends = divs
mocker.patch("yfinance.Ticker", return_value=mock_ticker)
records = YahooSource().fetch_since("XIC.TO", date(2024, 1, 9))
assert records[0].dividend_amount == pytest.approx(0.26)
def test_empty_dataframe_returns_empty_list(self, mocker):
import pandas as pd
mock_ticker = MagicMock()
mock_ticker.history.return_value = pd.DataFrame()
mock_ticker.dividends = pd.Series([], dtype=float)
mocker.patch("yfinance.Ticker", return_value=mock_ticker)
records = YahooSource().fetch_full("XIC.TO")
assert records == []
# ---------------------------------------------------------------------------
# Bank of Canada
# ---------------------------------------------------------------------------
class TestBankOfCanadaSource:
def test_source_name(self):
assert BankOfCanadaSource.source_name == "bank_of_canada"
def test_unsupported_ticker_raises(self):
with pytest.raises(ValueError, match="No Bank of Canada series mapping"):
BankOfCanadaSource().fetch_full("UNKNOWN.TO")
def test_fetch_parses_observations(self, mocker):
# Get the actual series name from the adapter
from asset_history.sources.bank_of_canada import TICKER_TO_BOC_SERIES
series = TICKER_TO_BOC_SERIES.get("XIC.TO", "SERIES_ID")
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.json.return_value = {
"observations": [
{"d": "2024-01-02", series: {"v": "21100.0"}},
{"d": "2024-01-03", series: {"v": "21215.0"}},
]
}
mock_resp.raise_for_status = MagicMock()
mocker.patch("requests.get", return_value=mock_resp)
records = BankOfCanadaSource().fetch_since("XIC.TO", date(2024, 1, 1))
assert len(records) == 2
assert records[0].close == pytest.approx(21100.0)
assert records[0].adj_close is None # government index — no adj_close
assert records[0].dividend_amount == 0.0
def test_skips_missing_values(self, mocker):
from asset_history.sources.bank_of_canada import TICKER_TO_BOC_SERIES
series = TICKER_TO_BOC_SERIES.get("XIC.TO", "SERIES_ID")
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.json.return_value = {
"observations": [
{"d": "2024-01-02", series: {"v": "21100.0"}},
{"d": "2024-01-03"}, # missing value — skip
]
}
mock_resp.raise_for_status = MagicMock()
mocker.patch("requests.get", return_value=mock_resp)
records = BankOfCanadaSource().fetch_since("XIC.TO", date(2024, 1, 1))
assert len(records) == 1
# ---------------------------------------------------------------------------
# Tiingo
# ---------------------------------------------------------------------------
class TestTiingoSource:
def test_source_name(self):
assert TiingoSource.source_name == "tiingo"
def test_fetch_parses_response(self, mocker):
mock_resp = MagicMock()
mock_resp.ok = True
mock_resp.status_code = 200
mock_resp.json.return_value = [
{"date": "2024-01-02T00:00:00+00:00", "close": 474.0,
"adjClose": 474.0, "divCash": 0.0},
{"date": "2024-01-09T00:00:00+00:00", "close": 475.0,
"adjClose": 473.52, "divCash": 1.48},
]
mocker.patch("requests.get", return_value=mock_resp)
records = TiingoSource().fetch_since("SPY", date(2024, 1, 1))
assert len(records) == 2
assert records[1].dividend_amount == pytest.approx(1.48)
assert records[1].adj_close == pytest.approx(473.52)
def test_rate_limit_retries_once(self, mocker):
mock_429 = MagicMock()
mock_429.status_code = 429
mock_429.ok = False
mock_ok = MagicMock()
mock_ok.status_code = 200
mock_ok.ok = True
mock_ok.json.return_value = []
mocker.patch("requests.get", side_effect=[mock_429, mock_ok])
mocker.patch("time.sleep") # don't actually sleep in tests
records = TiingoSource().fetch_full("SPY")
assert records == []
def test_failed_request_returns_empty(self, mocker):
mock_resp = MagicMock()
mock_resp.status_code = 500
mock_resp.ok = False
mocker.patch("requests.get", return_value=mock_resp)
records = TiingoSource().fetch_full("SPY")
assert records == []
# ---------------------------------------------------------------------------
# FRED
# ---------------------------------------------------------------------------
class TestFREDSource:
def test_source_name(self):
assert FREDSource.source_name == "fred"
def test_unsupported_ticker_raises(self):
with pytest.raises(ValueError, match="No FRED series mapping"):
FREDSource().fetch_full("UNKNOWN")
def test_fetch_parses_observations(self, mocker):
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.json.return_value = {
"observations": [
{"date": "2024-01-02", "value": "4742.83"},
{"date": "2024-01-03", "value": "4704.81"},
{"date": "2024-01-04", "value": "."}, # missing — skip
]
}
mock_resp.raise_for_status = MagicMock()
mocker.patch("requests.get", return_value=mock_resp)
records = FREDSource().fetch_since("SPY", date(2024, 1, 1))
assert len(records) == 2 # "." row skipped
assert records[0].close == pytest.approx(4742.83)
assert records[0].adj_close is None
  • Step 7.2: Run tests
Terminal window
pytest tests/test_sources_mock.py -v

Expected: All tests PASS.

  • Step 7.3: Commit
Terminal window
git add tests/test_sources_mock.py
git commit -m "test: source adapter mock tests — all adapters parse correctly"

Files:

  • Create: src/asset_history/reconcile.py

  • Test in: tests/test_reconcile.py

  • Step 8.1: Write failing tests first — tests/test_reconcile.py

"""
Tests for conflict detection and reconciliation.
Uses synthetic fixture data — no DB or network required.
"""
import json
import tempfile
from datetime import date
from pathlib import Path
import pytest
from asset_history.db import create_tables, get_connection, insert_ticker, insert_price_source
from asset_history.reconcile import reconcile_ticker, ConflictReport
FIXTURES = Path(__file__).parent / "fixtures"
@pytest.fixture
def conn(tmp_path):
con = get_connection(tmp_path / "test.db")
create_tables(con)
insert_ticker(con, "XIC.TO", "iShares TSX", "ca", "etf", ["yahoo", "bank_of_canada"])
insert_ticker(con, "SPY", "SPDR S&P 500", "us", "etf", ["yahoo", "tiingo", "fred"])
yield con
con.close()
class TestReconcileAgreement:
def test_two_sources_agree_gives_verified(self, conn, tmp_path):
data = json.loads((FIXTURES / "xic_sample.json").read_text())
for row in data["yahoo"]:
insert_price_source(conn, "XIC.TO", row["date"], "yahoo",
row["close"], row["adj_close"], row["dividend_amount"])
for row in data["bank_of_canada"]:
insert_price_source(conn, "XIC.TO", row["date"], "bank_of_canada",
row["close"], row["adj_close"], row["dividend_amount"])
report = reconcile_ticker(conn, "XIC.TO", log_path=tmp_path / "conflicts.log")
prices = conn.execute(
"SELECT confidence FROM prices WHERE ticker='XIC.TO'"
).fetchall()
confidences = {row[0] for row in prices}
assert "conflict" not in confidences
assert len(report.conflicts) == 0
def test_single_source_gives_single_source_confidence(self, conn, tmp_path):
data = json.loads((FIXTURES / "xic_sample.json").read_text())
for row in data["yahoo"]:
insert_price_source(conn, "XIC.TO", row["date"], "yahoo",
row["close"], row["adj_close"], row["dividend_amount"])
reconcile_ticker(conn, "XIC.TO", log_path=tmp_path / "conflicts.log")
prices = conn.execute(
"SELECT confidence FROM prices WHERE ticker='XIC.TO'"
).fetchall()
assert all(row[0] == "single-source" for row in prices)
class TestReconcileConflict:
def test_dividend_mismatch_flagged(self, conn, tmp_path):
"""Yahoo has dividend on 2024-01-09; tiingo_conflict has 0.0 on same date."""
data = json.loads((FIXTURES / "spy_sample.json").read_text())
# Insert yahoo rows
for row in data["yahoo"]:
insert_price_source(conn, "SPY", row["date"], "yahoo",
row["close"], row["adj_close"], row["dividend_amount"])
# Insert tiingo with missing dividend on 2024-01-09
for row in data["tiingo_conflict"]:
insert_price_source(conn, "SPY", row["date"], "tiingo",
row["close"], row["adj_close"], row["dividend_amount"])
report = reconcile_ticker(conn, "SPY", log_path=tmp_path / "conflicts.log")
assert len(report.conflicts) >= 1
conflict_dates = {c.date for c in report.conflicts}
assert "2024-01-09" in conflict_dates
# The conflict row should be written to prices table
row = conn.execute(
"SELECT confidence FROM prices WHERE ticker='SPY' AND date='2024-01-09'"
).fetchone()
assert row[0] == "conflict"
def test_conflict_appended_to_log(self, conn, tmp_path):
log_path = tmp_path / "conflicts.log"
data = json.loads((FIXTURES / "spy_sample.json").read_text())
for row in data["yahoo"]:
insert_price_source(conn, "SPY", row["date"], "yahoo",
row["close"], row["adj_close"], row["dividend_amount"])
for row in data["tiingo_conflict"]:
insert_price_source(conn, "SPY", row["date"], "tiingo",
row["close"], row["adj_close"], row["dividend_amount"])
reconcile_ticker(conn, "SPY", log_path=log_path)
assert log_path.exists()
content = log_path.read_text()
assert "SPY" in content
assert "2024-01-09" in content
class TestReconcileFirstRowSkipped:
def test_first_row_skipped_in_return_calculation(self, conn, tmp_path):
"""No error when only one row exists — can't compute return."""
insert_price_source(conn, "XIC.TO", "2024-01-02", "yahoo",
28.00, 28.00, 0.0)
# Should not raise
reconcile_ticker(conn, "XIC.TO", log_path=tmp_path / "conflicts.log")
  • Step 8.2: Run tests — verify FAIL with ImportError
Terminal window
pytest tests/test_reconcile.py -v

Expected: FAIL with ImportError: cannot import name 'reconcile_ticker'

  • Step 8.3: Implement src/asset_history/reconcile.py
"""
Conflict detection and reconciliation engine.
Reads price_sources rows, compares across sources, writes reconciled
rows to prices table, appends conflicts to log file.
"""
from dataclasses import dataclass, field
from pathlib import Path
from datetime import datetime, timezone
import sqlite3
@dataclass
class Conflict:
ticker: str
date: str
conflict_type: str # 'dividend_mismatch' | 'return_divergence'
description: str
@dataclass
class ConflictReport:
ticker: str
conflicts: list[Conflict] = field(default_factory=list)
def is_clean(self) -> bool:
return len(self.conflicts) == 0
_RETURN_THRESHOLD = 0.005 # 0.5% single-day return divergence
_DIVIDEND_TOLERANCE = 0.01 # 1% dividend amount tolerance
def reconcile_ticker(
conn: sqlite3.Connection,
ticker: str,
log_path: Path | None = None,
primary_source: str = "yahoo",
) -> ConflictReport:
"""
Compare all source rows for ticker, write reconciled prices table,
flag conflicts. Returns ConflictReport.
"""
report = ConflictReport(ticker=ticker)
# Fetch all source rows grouped by date
rows = conn.execute(
"SELECT date, source, close, adj_close, dividend_amount "
"FROM price_sources WHERE ticker=? ORDER BY date ASC, source ASC",
(ticker,)
).fetchall()
if not rows:
return report
# Group by date: {date: {source: row}}
by_date: dict[str, dict] = {}
for row in rows:
d, src = row[0], row[1]
by_date.setdefault(d, {})[src] = row
# Build return series per source for return-rate comparison
return_series: dict[str, list[tuple[str, float]]] = {} # source -> [(date, return)]
prev_close: dict[str, float] = {}
sorted_dates = sorted(by_date.keys())
for d in sorted_dates:
sources_on_date = by_date[d]
for src, row in sources_on_date.items():
close = row[2] # raw close (or index level)
if close is None:
continue
if src in prev_close:
r = close / prev_close[src] - 1.0
return_series.setdefault(src, []).append((d, r))
prev_close[src] = close
# Check return-rate divergence between pairs of sources
source_list = list(return_series.keys())
for i in range(len(source_list)):
for j in range(i + 1, len(source_list)):
src_a, src_b = source_list[i], source_list[j]
ret_a = dict(return_series[src_a])
ret_b = dict(return_series[src_b])
common_dates = set(ret_a.keys()) & set(ret_b.keys())
for d in sorted(common_dates):
diff = abs(ret_a[d] - ret_b[d])
if diff > _RETURN_THRESHOLD:
report.conflicts.append(Conflict(
ticker=ticker,
date=d,
conflict_type="return_divergence",
description=(
f"{src_a} return={ret_a[d]:.4f}, "
f"{src_b} return={ret_b[d]:.4f}, "
f"diff={diff:.4f} > threshold={_RETURN_THRESHOLD}"
),
))
# Check dividend mismatches on each date
for d, sources_on_date in by_date.items():
divs = {
src: row[4]
for src, row in sources_on_date.items()
if row[4] is not None
}
if len(divs) < 2:
continue
div_values = list(divs.values())
has_nonzero = any(v > 0 for v in div_values)
has_zero = any(v == 0 for v in div_values)
if has_nonzero and has_zero:
report.conflicts.append(Conflict(
ticker=ticker,
date=d,
conflict_type="dividend_mismatch",
description=f"Sources disagree on dividend: {divs}",
))
elif has_nonzero:
max_v, min_v = max(div_values), min(div_values)
if max_v > 0 and abs(max_v - min_v) / max_v > _DIVIDEND_TOLERANCE:
report.conflicts.append(Conflict(
ticker=ticker,
date=d,
conflict_type="dividend_mismatch",
description=f"Dividend amounts differ: {divs}",
))
conflict_dates = {c.date for c in report.conflicts}
# Write reconciled prices rows
for d, sources_on_date in by_date.items():
primary = sources_on_date.get(primary_source)
if primary is None:
primary = next(iter(sources_on_date.values()))
used_source = next(iter(sources_on_date.keys()))
else:
used_source = primary_source
close = primary[2] if primary[2] is not None else 0.0
adj_close = primary[3] if primary[3] is not None else close
dividend_amount = primary[4] if primary[4] is not None else 0.0
if d in conflict_dates:
confidence = "conflict"
elif len(sources_on_date) > 1:
confidence = "verified"
else:
confidence = "single-source"
conn.execute(
"INSERT OR REPLACE INTO prices "
"(ticker, date, close, adj_close, dividend_amount, primary_source, confidence) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(ticker, d, close, adj_close, dividend_amount, used_source, confidence),
)
conn.commit()
# Append to log file if there are conflicts
if report.conflicts and log_path is not None:
Path(log_path).parent.mkdir(parents=True, exist_ok=True)
with open(log_path, "a") as f:
ts = datetime.now(timezone.utc).isoformat()
for c in report.conflicts:
f.write(
f"{ts} | CONFLICT | {c.ticker} | {c.date} | "
f"{c.conflict_type} | {c.description}\n"
)
return report
  • Step 8.4: Run tests
Terminal window
pytest tests/test_reconcile.py -v

Expected: All tests PASS.

  • Step 8.5: Commit
Terminal window
git add src/asset_history/reconcile.py tests/test_reconcile.py
git commit -m "feat: reconciliation engine — conflict detection, prices table writer, log"

Files:

  • Create: src/asset_history/notify.py

  • Step 9.1: Create src/asset_history/notify.py

"""
Notification wrapper for asset-history.
Delegates to the notify_manager utility (D:/FSS/Software/Utils/PythonUtils/notify_manager/).
Falls back to stdout if notify_manager is not importable (e.g., different machine).
Configure NOTIFY_MANAGER_PATH in .env to point to the parent directory of notify_manager/.
"""
import os
import sys
def _get_send_alert():
"""Import notify_manager.send_alert via sys.path. Returns None if unavailable."""
nm_path = os.getenv(
"NOTIFY_MANAGER_PATH",
"/mnt/d/FSS/Software/Utils/PythonUtils"
)
if nm_path not in sys.path:
sys.path.insert(0, nm_path)
try:
from notify_manager.notify_manager import send_alert # noqa: PLC0415
return send_alert
except ImportError:
return None
def notify_conflicts(ticker: str, conflicts: list[str]) -> None:
"""
Alert the manager about data conflicts requiring manual resolution.
Uses WARNING level → Email + Telegram per notify_manager level_defaults.
"""
send_alert = _get_send_alert()
if send_alert:
send_alert(
tool_name="asset-history",
errors=conflicts,
level="WARNING",
)
else:
# Graceful fallback — always visible even without notify_manager
print(f"[asset-history] WARNING: {len(conflicts)} conflict(s) for {ticker}")
for c in conflicts:
print(f" {c}")
  • Step 9.2: Commit
Terminal window
git add src/asset_history/notify.py
git commit -m "feat: notify_manager wrapper with graceful fallback"

Files:

  • Create: src/asset_history/cli.py

  • Test in: tests/test_cli.py

  • Step 10.1: Write failing CLI tests — tests/test_cli.py

"""CLI command tests — all run against a temp SQLite DB, no network."""
import json
import tempfile
from pathlib import Path
import pytest
from typer.testing import CliRunner
from asset_history.cli import app
from asset_history.db import create_tables, get_connection, seed_tickers
runner = CliRunner()
@pytest.fixture
def db_file(tmp_path):
db = tmp_path / "test.db"
conn = get_connection(db)
create_tables(conn)
seed_tickers(conn)
conn.close()
return db
@pytest.fixture
def env(db_file, monkeypatch):
monkeypatch.setenv("ASSET_HISTORY_DB", str(db_file))
class TestAddCommand:
def test_add_new_ticker(self, env, db_file):
result = runner.invoke(app, [
"add", "VFV.TO",
"--name", "Vanguard S&P 500",
"--country", "ca",
"--sources", "yahoo,tiingo",
])
assert result.exit_code == 0
conn = get_connection(db_file)
row = conn.execute("SELECT ticker FROM tickers WHERE ticker='VFV.TO'").fetchone()
assert row is not None
def test_add_duplicate_exits_with_error(self, env):
result = runner.invoke(app, [
"add", "XIC.TO", "--name", "Duplicate", "--country", "ca", "--sources", "yahoo"
])
assert result.exit_code != 0
class TestStatusCommand:
def test_status_shows_registered_tickers(self, env):
result = runner.invoke(app, ["status"])
assert result.exit_code == 0
assert "XIC.TO" in result.output
assert "SPY" in result.output
class TestConflictsCommand:
def test_conflicts_shows_none_when_clean(self, env, db_file):
result = runner.invoke(app, ["conflicts"])
assert result.exit_code == 0
assert "No conflicts" in result.output or result.output.strip() == ""
class TestExportCommand:
def test_export_empty_db_writes_valid_json(self, env, db_file, tmp_path):
out = tmp_path / "out.json"
result = runner.invoke(app, [
"export", "XIC.TO", "--format", "json", "--since", "2024-01-01", "--out", str(out)
])
assert result.exit_code == 0
data = json.loads(out.read_text())
assert data["ticker"] == "XIC.TO"
assert data["country"] == "ca"
assert isinstance(data["rows"], list)
  • Step 10.2: Run tests — verify FAIL
Terminal window
pytest tests/test_cli.py -v

Expected: FAIL with ImportError: cannot import name 'app' from 'asset_history.cli'

  • Step 10.3: Implement src/asset_history/cli.py
"""
Typer CLI for asset-history.
All commands read DB path from ASSET_HISTORY_DB env var,
defaulting to ~/projects/asset-history/data/asset_history.db.
"""
import json
import os
from datetime import date, datetime, timezone
from pathlib import Path
import typer
from dotenv import load_dotenv
from asset_history.db import (
create_tables,
get_connection,
insert_ticker,
insert_price_source,
upsert_price,
get_prices,
seed_tickers,
)
from asset_history.reconcile import reconcile_ticker
from asset_history import notify as _notify
load_dotenv()
app = typer.Typer(help="asset-history — historical EOD data tool")
def _get_db():
db_path = os.environ.get(
"ASSET_HISTORY_DB",
str(Path.home() / "projects" / "asset-history" / "data" / "asset_history.db"),
)
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
conn = get_connection(db_path)
create_tables(conn)
return conn
def _log_path() -> Path:
return Path(os.environ.get(
"ASSET_HISTORY_DB",
str(Path.home() / "projects" / "asset-history" / "data" / "asset_history.db"),
)).parent / "conflicts.log"
@app.command()
def add(
ticker: str,
name: str = typer.Option(...),
country: str = typer.Option(...),
asset_type: str = typer.Option("etf"),
sources: str = typer.Option(..., help="Comma-separated: yahoo,tiingo,fred"),
):
"""Register a ticker for tracking."""
conn = _get_db()
source_list = [s.strip() for s in sources.split(",")]
try:
insert_ticker(conn, ticker, name, country, asset_type, source_list)
typer.echo(f"✓ Added {ticker} ({', '.join(source_list)})")
except Exception as e:
typer.echo(f"✗ Failed to add {ticker}: {e}", err=True)
raise typer.Exit(1)
@app.command()
def fetch(
ticker: str,
source: str = typer.Option(...),
since: str = typer.Option(None, help="ISO date YYYY-MM-DD; omit for full backfill"),
):
"""Fetch data from a single source into price_sources table. Does not auto-reconcile."""
conn = _get_db()
adapter = _get_adapter(source)
if adapter is None:
typer.echo(f"Unknown source: {source}", err=True)
raise typer.Exit(1)
if since:
records = adapter.fetch_since(ticker, date.fromisoformat(since))
else:
records = adapter.fetch_full(ticker)
for r in records:
insert_price_source(conn, r.ticker, r.date.isoformat(), source,
r.close, r.adj_close, r.dividend_amount)
typer.echo(f"✓ {ticker}/{source}: {len(records)} rows written to price_sources")
@app.command()
def reconcile(ticker: str):
"""Compare all sources for ticker and write to prices table."""
conn = _get_db()
report = reconcile_ticker(conn, ticker, log_path=_log_path())
if report.is_clean():
typer.echo(f"✓ {ticker}: no conflicts")
else:
typer.echo(f"⚠ {ticker}: {len(report.conflicts)} conflict(s) — see {_log_path()}")
_notify.notify_conflicts(ticker, [c.description for c in report.conflicts])
@app.command()
def update():
"""Incremental update all registered tickers from all their sources."""
conn = _get_db()
tickers = conn.execute("SELECT ticker, sources FROM tickers").fetchall()
for t_row in tickers:
ticker = t_row[0]
sources = json.loads(t_row[1])
for src in sources:
adapter = _get_adapter(src)
if adapter is None:
continue
max_date = conn.execute(
"SELECT MAX(date) FROM price_sources WHERE ticker=? AND source=?",
(ticker, src)
).fetchone()[0]
if max_date is None:
records = adapter.fetch_full(ticker)
else:
from datetime import timedelta
since = date.fromisoformat(max_date) + timedelta(days=1)
records = adapter.fetch_since(ticker, since)
for r in records:
insert_price_source(conn, r.ticker, r.date.isoformat(), src,
r.close, r.adj_close, r.dividend_amount)
typer.echo(f" {ticker}/{src}: +{len(records)} rows")
report = reconcile_ticker(conn, ticker, log_path=_log_path())
if not report.is_clean():
typer.echo(f" ⚠ {ticker}: {len(report.conflicts)} conflict(s)")
_notify.notify_conflicts(ticker, [c.description for c in report.conflicts])
else:
typer.echo(f" ✓ {ticker}: 0 conflicts")
@app.command()
def status():
"""Show DB coverage, last fetch, and conflict counts per ticker."""
conn = _get_db()
tickers = conn.execute("SELECT ticker, country, sources FROM tickers").fetchall()
if not tickers:
typer.echo("No tickers registered. Run: asset-history add ...")
return
for row in tickers:
ticker, country, sources_json = row
sources = json.loads(sources_json)
min_date = conn.execute(
"SELECT MIN(date) FROM prices WHERE ticker=?", (ticker,)
).fetchone()[0]
max_date = conn.execute(
"SELECT MAX(date) FROM prices WHERE ticker=?", (ticker,)
).fetchone()[0]
conflicts = conn.execute(
"SELECT COUNT(*) FROM prices WHERE ticker=? AND confidence='conflict'", (ticker,)
).fetchone()[0]
typer.echo(
f"{ticker} [{country}] sources={','.join(sources)} "
f"coverage={min_date or 'none'}{max_date or 'none'} conflicts={conflicts}"
)
@app.command()
def conflicts():
"""List all unresolved conflict rows."""
conn = _get_db()
rows = conn.execute(
"SELECT ticker, date, primary_source FROM prices WHERE confidence='conflict' ORDER BY ticker, date"
).fetchall()
if not rows:
typer.echo("No conflicts.")
return
for row in rows:
typer.echo(f" {row[0]} {row[1]} (primary: {row[2]})")
@app.command()
def resolve(
ticker: str,
date_str: str = typer.Argument(..., metavar="DATE", help="YYYY-MM-DD"),
accept: str = typer.Option(..., help="Source to accept as canonical"),
):
"""Resolve a conflict by accepting one source's value."""
conn = _get_db()
row = conn.execute(
"SELECT close, adj_close, dividend_amount FROM price_sources "
"WHERE ticker=? AND date=? AND source=?",
(ticker, date_str, accept)
).fetchone()
if row is None:
typer.echo(f"No {accept} row found for {ticker} {date_str}", err=True)
raise typer.Exit(1)
close, adj_close, div = row
upsert_price(conn, ticker, date_str, close, adj_close or close, div or 0.0,
primary_source=accept, confidence="verified")
with open(_log_path(), "a") as f:
ts = datetime.now(timezone.utc).isoformat()
f.write(f"{ts} | RESOLVED | {ticker} | {date_str} | accepted={accept}\n")
typer.echo(f"✓ Resolved {ticker} {date_str} — accepted {accept}")
@app.command()
def export(
ticker: str,
format: str = typer.Option("json"),
since: str = typer.Option(...),
out: str = typer.Option(...),
include_conflicts: bool = typer.Option(False),
):
"""Export reconciled price data to JSON."""
conn = _get_db()
t_row = conn.execute(
"SELECT country FROM tickers WHERE ticker=?", (ticker,)
).fetchone()
country = t_row[0] if t_row else "unknown"
rows = get_prices(conn, ticker, since=since, include_conflicts=include_conflicts)
output = {
"ticker": ticker,
"country": country,
"exported_at": date.today().isoformat(),
"since": since,
"rows": [
{
"date": r["date"],
"adj_close": r["adj_close"],
"dividend_amount": r["dividend_amount"],
"confidence": r["confidence"],
}
for r in rows
],
}
Path(out).write_text(json.dumps(output, indent=2) + "\n")
typer.echo(f"✓ Exported {len(rows)} rows → {out}")
def _get_adapter(source: str):
from asset_history.sources.yahoo import YahooSource
from asset_history.sources.bank_of_canada import BankOfCanadaSource
from asset_history.sources.tiingo import TiingoSource
from asset_history.sources.fred import FREDSource
return {
"yahoo": YahooSource(),
"bank_of_canada": BankOfCanadaSource(),
"tiingo": TiingoSource(),
"fred": FREDSource(),
}.get(source)
if __name__ == "__main__":
app()
  • Step 10.4: Run CLI tests
Terminal window
pytest tests/test_cli.py -v

Expected: All tests PASS.

  • Step 10.5: Commit
Terminal window
git add src/asset_history/cli.py tests/test_cli.py
git commit -m "feat: Typer CLI — all commands implemented and tested"

Files:

  • Create: tests/test_sources_live.py

  • Step 11.1: Create tests/test_sources_live.py

"""
Live integration tests — require network access and API keys.
Run with: pytest -m slow
These tests verify that each source adapter can make real API calls
and returns schema-compliant PriceRecord objects. They do NOT assert
specific price values (those change daily).
"""
import os
from datetime import date, timedelta
import pytest
from asset_history.sources.base import PriceRecord
from asset_history.sources.yahoo import YahooSource
from asset_history.sources.tiingo import TiingoSource
from asset_history.sources.fred import FREDSource
pytestmark = pytest.mark.slow
THIRTY_DAYS_AGO = (date.today() - timedelta(days=30)).isoformat()
def _validate_records(records: list[PriceRecord], source_name: str):
assert len(records) > 0, f"{source_name}: returned no records"
for r in records:
assert isinstance(r, PriceRecord), f"{source_name}: not a PriceRecord"
assert r.date is not None
assert r.dividend_amount >= 0.0
if r.adj_close is not None:
assert r.adj_close > 0.0
class TestYahooLive:
def test_xic_to_fetch_since(self):
records = YahooSource().fetch_since("XIC.TO", date.fromisoformat(THIRTY_DAYS_AGO))
_validate_records(records, "yahoo/XIC.TO")
def test_spy_fetch_since(self):
records = YahooSource().fetch_since("SPY", date.fromisoformat(THIRTY_DAYS_AGO))
_validate_records(records, "yahoo/SPY")
class TestTiingoLive:
def test_spy_fetch_since(self):
if not os.environ.get("TIINGO_API_KEY"):
pytest.skip("TIINGO_API_KEY not set")
records = TiingoSource().fetch_since("SPY", date.fromisoformat(THIRTY_DAYS_AGO))
_validate_records(records, "tiingo/SPY")
class TestFREDLive:
def test_spy_fetch_since(self):
if not os.environ.get("FRED_API_KEY"):
pytest.skip("FRED_API_KEY not set")
records = FREDSource().fetch_since("SPY", date.fromisoformat(THIRTY_DAYS_AGO))
_validate_records(records, "fred/SPY")
  • Step 11.2: Commit
Terminal window
git add tests/test_sources_live.py
git commit -m "test: live integration tests (marked slow, require API keys)"

  • Step 12.1: Run all unit tests
Terminal window
pytest -m "not slow" -v

Expected: All tests PASS. Zero network calls.

  • Step 12.2: Smoke test CLI end-to-end
Terminal window
# Initialize DB with seeds
python -c "
from asset_history.db import get_connection, create_tables, seed_tickers
from pathlib import Path
db = Path('data/asset_history.db')
db.parent.mkdir(exist_ok=True)
conn = get_connection(db)
create_tables(conn)
seed_tickers(conn)
print('DB initialized')
"
# Check status
asset-history status
# Fetch 30 days of real data from Yahoo (free, no API key needed)
asset-history fetch XIC.TO --source yahoo --since 2024-01-01
# Reconcile
asset-history reconcile XIC.TO
# Status again — should show coverage
asset-history status
# Export
asset-history export XIC.TO --format json --since 2024-01-01 --out /tmp/xic-test.json
cat /tmp/xic-test.json | python3 -m json.tool | head -20

Expected: No errors. Status shows XIC.TO with coverage dates. Export JSON is valid.

  • Step 12.3: Run slow tests (optional — requires API keys)
Terminal window
# Only if TIINGO_API_KEY and FRED_API_KEY are configured in .env
pytest -m slow -v
  • Step 12.4: Final commit
Terminal window
git status
git add -A
git commit -m "feat: asset-history v0.1.0 — all adapters, reconcile, CLI, notify complete"

All of the following must be true before handing off:

  • pytest -m "not slow" -v — all tests PASS
  • asset-history fetch XIC.TO --source yahoo --since 2024-01-01 completes without error
  • asset-history status shows XIC.TO with date coverage
  • asset-history update completes (may take a minute for full history)
  • git log --oneline shows at least 10 commits

Completed: 2026-03-25 Session: asset-history Chunk 2 Adapters

Virtual environment creation (python3 -m venv) fails on this system due to missing python3.12-venv. All dependencies remain installed system-wide via pip3 install --break-system-packages from Chunk 1. No changes required.

Finding: Bank of Canada valet API does NOT have a suitable daily TSX Composite index level series. Their API only exposes:

  • MPR report chart data (periodic, normalized values like 33.9, short spans per publication)
  • Research paper one-off series (e.g. SAN_GUIL_20200921_C1_S1)

None of these are continuous daily index level data.

Resolution (per task spec fallback rule): Used Stooq ^GSPTSE as the CA validation source. Stooq is blocked from this WSL environment (returns “No data”), but the adapter is correctly implemented to:

  • Return [] gracefully when Stooq is unavailable
  • Parse Stooq CSV format correctly (tested via mocked HTTP)
  • Keep source_name = "bank_of_canada" for DB compatibility with Chunk 1 seed data
  • Use TICKER_TO_STOOQ_SYMBOL dict (with TICKER_TO_BOC_SERIES alias for legacy imports)

fred.py initially called raise_for_status() which crashes the update command when no FRED_API_KEY is set (HTTP 400). Fixed to match Tiingo’s pattern: log and return [] gracefully. All 44 tests still pass after this fix.

pytest -m "not slow" -v
============================================================
44 passed, 4 deselected in 2.47s
============================================================

Test breakdown:

  • test_db.py: 19 tests — schema, CRUD, constraints, seed (Chunk 1, unchanged)
  • test_sources_mock.py: 15 tests — Yahoo, BankOfCanada/Stooq, Tiingo, FRED adapters
  • test_reconcile.py: 5 tests — conflict detection, dividend mismatch, log appending
  • test_cli.py: 5 tests — add, status, conflicts, export commands
Terminal window
# DB initialized
asset-history status
XIC.TO [ca] sources=yahoo,bank_of_canada coverage=none>none conflicts=0
SPY [us] sources=yahoo,tiingo,fred coverage=none>none conflicts=0
# Fetch real data (Yahoo, no API key needed)
asset-history fetch XIC.TO --source yahoo --since 2024-01-01
XIC.TO/yahoo: 561 rows written to price_sources
# Reconcile (single source → single-source confidence)
asset-history reconcile XIC.TO
XIC.TO: no conflicts
# Status shows real coverage
asset-history status
XIC.TO [ca] sources=yahoo,bank_of_canada coverage=2024-01-02>2026-03-25 conflicts=0
SPY [us] sources=yahoo,tiingo,fred coverage=1993-01-29>2026-03-25 conflicts=0
# Export valid JSON — 561 rows, all single-source confidence
asset-history export XIC.TO --format json --since 2024-01-01 --out /tmp/xic-test.json
Exported 561 rows > /tmp/xic-test.json
# Incremental update completes (Tiingo 403 + FRED 400 without keys — handled gracefully)
asset-history update # exit code 0
GateResult
pytest -m "not slow" -v — all tests PASS✅ 44 passed
asset-history fetch XIC.TO --source yahoo --since 2024-01-01 completes✅ 561 rows
asset-history status shows XIC.TO with date coverage✅ 2024-01-02→2026-03-25
asset-history update completes✅ exit code 0
git log --oneline shows at least 10 commits✅ 11 commits
2157efd fix: FRED adapter handles HTTP errors gracefully instead of raising
de81e62 test: live integration tests (marked slow, require API keys)
c1a7f3c feat: Typer CLI — all commands implemented and tested
ead9bad feat: notify_manager wrapper with graceful fallback
b360b12 feat: reconciliation engine — conflict detection, prices table writer, log
ccc1397 test: source adapter mock tests — all adapters parse correctly
3d30ee3 feat: Yahoo Finance, Bank of Canada (Stooq), Tiingo, and FRED source adapters
06fb159 feat: source adapter interface — PriceRecord dataclass + BaseSource ABC
be6f4ce test: add synthetic price fixtures for unit tests
9dc9354 feat: SQLite schema, CRUD helpers, and seed data — all tests pass
cd0501f feat: project scaffold — pyproject.toml, package structure, gitignore
FileResponsibility
tests/fixtures/xic_sample.jsonSynthetic XIC.TO data (10 days, 1 dividend)
tests/fixtures/spy_sample.jsonSynthetic SPY data (10 days, 1 conflict case)
src/asset_history/sources/__init__.pyPackage marker
src/asset_history/sources/base.pyPriceRecord dataclass + BaseSource ABC
src/asset_history/sources/yahoo.pyyfinance adapter — primary source
src/asset_history/sources/bank_of_canada.pyStooq ^GSPTSE adapter (CA validation)
src/asset_history/sources/tiingo.pyTiingo adapter — US dividend cross-reference
src/asset_history/sources/fred.pyFRED adapter — US S&P 500 validation
src/asset_history/reconcile.pyConflict detection engine + prices table writer
src/asset_history/notify.pynotify_manager wrapper with graceful fallback
src/asset_history/cli.pyFull Typer CLI — 8 commands
tests/test_sources_mock.pyAdapter tests (mocked HTTP)
tests/test_reconcile.pyConflict detection tests (synthetic data)
tests/test_cli.pyCLI tests (temp DB, no network)
tests/test_sources_live.pyLive tests (@pytest.mark.slow)
  1. BankOfCanadaSource uses Stooq internally but keeps source_name="bank_of_canada" and TICKER_TO_BOC_SERIES alias. This preserves Chunk 1 DB seed data (XIC.TO sources=["yahoo","bank_of_canada"]) without requiring migration. When Stooq is accessible (non-WSL environment), asset-history fetch XIC.TO --source bank_of_canada will fetch real ^GSPTSE data.

  2. Dividend comparison skips index sources: reconcile.py only compares dividends between sources where adj_close IS NOT NULL. Government index sources (BoC/FRED) always have adj_close=None and dividend_amount=0.0 — comparing them to ETF sources would generate false positives (e.g., XIC.TO dividend on 2024-01-09 vs. BoC’s 0.0).

  3. FRED error handling made graceful: Aligned with Tiingo — HTTP errors log to stdout and return [] instead of raising. This allows asset-history update to complete when API keys are not configured.

  • Stooq blocked from WSL: bank_of_canada source returns 0 rows in this WSL environment. Fetch will work correctly on a proper server or Windows host where Stooq is accessible.
  • Tiingo 403 without key: Expected — returns 0 rows gracefully.
  • FRED 400 without key: Expected — returns 0 rows gracefully.
  • yfinance “possibly delisted” warning: Cosmetic — appears when since date is tomorrow (data already up-to-date). Returns 0 rows cleanly.