From 8d07939527001ac4f2e5e789d27f7c70656373ce Mon Sep 17 00:00:00 2001 From: Bastian Wagner Date: Tue, 5 May 2026 19:26:43 +0200 Subject: [PATCH] Init --- .dockerignore | 13 + .env.example | 35 + .gitignore | 13 + Dockerfile | 41 ++ README.md | 152 +++++ docker-compose.debug.yml | 25 + docker-compose.yml | 11 + pyproject.toml | 26 + requirements-dev.txt | 3 + requirements.txt | 4 + scripts/browser-debug.sh | 16 + src/mywhoosh_garmin_sync/__init__.py | 4 + src/mywhoosh_garmin_sync/__main__.py | 5 + src/mywhoosh_garmin_sync/cli.py | 65 ++ src/mywhoosh_garmin_sync/config.py | 174 +++++ src/mywhoosh_garmin_sync/fit_crc.py | 39 ++ src/mywhoosh_garmin_sync/fit_device.py | 381 +++++++++++ src/mywhoosh_garmin_sync/garmin.py | 136 ++++ src/mywhoosh_garmin_sync/logging_setup.py | 11 + src/mywhoosh_garmin_sync/models.py | 21 + src/mywhoosh_garmin_sync/mywhoosh.py | 759 ++++++++++++++++++++++ src/mywhoosh_garmin_sync/service.py | 133 ++++ src/mywhoosh_garmin_sync/state.py | 219 +++++++ tests/conftest.py | 45 ++ tests/test_config.py | 35 + tests/test_fit_device.py | 119 ++++ tests/test_garmin.py | 59 ++ tests/test_mywhoosh.py | 58 ++ tests/test_state.py | 44 ++ 29 files changed, 2646 insertions(+) create mode 100644 .dockerignore create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 README.md create mode 100644 docker-compose.debug.yml create mode 100644 docker-compose.yml create mode 100644 pyproject.toml create mode 100644 requirements-dev.txt create mode 100644 requirements.txt create mode 100644 scripts/browser-debug.sh create mode 100644 src/mywhoosh_garmin_sync/__init__.py create mode 100644 src/mywhoosh_garmin_sync/__main__.py create mode 100644 src/mywhoosh_garmin_sync/cli.py create mode 100644 src/mywhoosh_garmin_sync/config.py create mode 100644 src/mywhoosh_garmin_sync/fit_crc.py create mode 100644 src/mywhoosh_garmin_sync/fit_device.py create mode 100644 src/mywhoosh_garmin_sync/garmin.py create mode 100644 src/mywhoosh_garmin_sync/logging_setup.py create mode 100644 src/mywhoosh_garmin_sync/models.py create mode 100644 src/mywhoosh_garmin_sync/mywhoosh.py create mode 100644 src/mywhoosh_garmin_sync/service.py create mode 100644 src/mywhoosh_garmin_sync/state.py create mode 100644 tests/conftest.py create mode 100644 tests/test_config.py create mode 100644 tests/test_fit_device.py create mode 100644 tests/test_garmin.py create mode 100644 tests/test_mywhoosh.py create mode 100644 tests/test_state.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..474ca2a --- /dev/null +++ b/.dockerignore @@ -0,0 +1,13 @@ +.env +.venv +__pycache__ +.pytest_cache +.ruff_cache +.git +data +htmlcov +.coverage +dist +build +*.egg-info + diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..6fe5651 --- /dev/null +++ b/.env.example @@ -0,0 +1,35 @@ +MYWHOOSH_EMAIL= +MYWHOOSH_PASSWORD= +GARMIN_EMAIL= +GARMIN_PASSWORD= + +POLL_INTERVAL_SECONDS=3600 +DATA_DIR=/data +LOG_LEVEL=INFO +DRY_RUN=true +DASHBOARD_ENABLED=true +DASHBOARD_BIND=0.0.0.0 +DASHBOARD_PORT=8080 + +MYWHOOSH_LOGIN_URL=https://www.event.mywhoosh.com/login/ +MYWHOOSH_ACTIVITY_URL=https://event.mywhoosh.com/user/activities +MYWHOOSH_HEADLESS=true +MYWHOOSH_BROWSER_STATE_DIR=/data/browser +MYWHOOSH_AUTH_STATE_PATH=/data/mywhoosh_auth_state.json +MYWHOOSH_TIMEOUT_SECONDS=45 +MYWHOOSH_MAX_DOWNLOADS_PER_RUN=10 +MYWHOOSH_DOWNLOAD_TEXT_HINTS=fit,download +MYWHOOSH_ACTIVITIES_BUTTON_TEXT=ACTIVITIES +MYWHOOSH_DOWNLOAD_BUTTON_SELECTOR=.btnDownload +MYWHOOSH_SLOW_MO_MS=0 +MYWHOOSH_MANUAL_LOGIN_WAIT_SECONDS=0 +MYWHOOSH_DEBUG_SCREENSHOTS=false +MYWHOOSH_DEBUG_DIR=/data/debug + +GARMIN_TOKENSTORE=/data/garmin_tokens +GARMIN_MFA_CODE= + +TARGET_GARMIN_MANUFACTURER_ID=1 +TARGET_GARMIN_PRODUCT_ID=3578 +TARGET_GARMIN_PRODUCT_NAME=Edge 1030 Plus +TARGET_GARMIN_SERIAL_NUMBER= diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a4fd138 --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +.env +.venv/ +__pycache__/ +.pytest_cache/ +.ruff_cache/ +*.pyc +htmlcov/ +.coverage +data/ +dist/ +build/ +*.egg-info/ + diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..6eec3f5 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,41 @@ +FROM python:3.12-slim + +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + PIP_NO_CACHE_DIR=1 + +WORKDIR /app + +ARG BROWSER_DEBUG_TOOLS=false + +RUN apt-get update \ + && apt-get install -y --no-install-recommends ca-certificates curl \ + && rm -rf /var/lib/apt/lists/* + +RUN if [ "$BROWSER_DEBUG_TOOLS" = "true" ]; then \ + apt-get update \ + && apt-get install -y --no-install-recommends \ + fluxbox \ + novnc \ + websockify \ + x11vnc \ + xvfb \ + && rm -rf /var/lib/apt/lists/*; \ + fi + +COPY requirements.txt requirements-dev.txt pyproject.toml ./ +COPY src ./src +COPY tests ./tests +COPY scripts ./scripts + +RUN pip install --upgrade pip \ + && pip install -r requirements.txt \ + && pip install . \ + && chmod +x /app/scripts/browser-debug.sh \ + && python -m playwright install --with-deps chromium + +COPY README.md ./ + +VOLUME ["/data"] +ENTRYPOINT ["python", "-m", "mywhoosh_garmin_sync"] +CMD ["serve"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..1d8ec2f --- /dev/null +++ b/README.md @@ -0,0 +1,152 @@ +# MyWhoosh Garmin Sync + +Containerized background sync for personal MyWhoosh activities: + +1. Log in to MyWhoosh with Playwright. +2. Find and download new `.fit` activity files. +3. Rewrite FIT device metadata locally to Garmin Edge 1030 Plus. +4. Upload the converted activity to Garmin Connect. +5. Persist sessions, downloaded files, converted files, and sync state under `/data`. + +## Quick Start + +Create your local env file: + +```sh +cp .env.example .env +``` + +Edit `.env`, then run: + +```sh +docker compose up -d --build +docker compose logs -f sync +``` + +The default polling interval is hourly. + +## Configuration + +Required values: + +```env +MYWHOOSH_EMAIL= +MYWHOOSH_PASSWORD= +GARMIN_EMAIL= +GARMIN_PASSWORD= +``` + +Useful optional values: + +```env +POLL_INTERVAL_SECONDS=3600 +DATA_DIR=/data +LOG_LEVEL=INFO +DRY_RUN=false +MYWHOOSH_LOGIN_URL=https://www.mywhoosh.com/login/ +MYWHOOSH_ACTIVITY_URL=https://www.mywhoosh.com/profile/ +MYWHOOSH_ACTIVITIES_BUTTON_TEXT=ACTIVITIES +MYWHOOSH_DOWNLOAD_BUTTON_SELECTOR=.btnDownload +TARGET_GARMIN_PRODUCT_ID=3578 +TARGET_GARMIN_SERIAL_NUMBER= +``` + +If Garmin MFA is required on first login, set `GARMIN_MFA_CODE` for one run, let the token store persist under `/data/garmin_tokens`, then remove the variable. + +## Commands + +Inside the container: + +```sh +python -m mywhoosh_garmin_sync run-once +python -m mywhoosh_garmin_sync serve +python -m mywhoosh_garmin_sync convert --input /data/raw/example.fit --output /data/converted/example.fit +``` + +`DRY_RUN=true` downloads and converts files but does not upload to Garmin. + +Run tests from the built image: + +```sh +docker build -t mywhoosh-garmin-sync:test . +docker run --rm --entrypoint sh mywhoosh-garmin-sync:test -c "pip install -r requirements-dev.txt && pytest" +``` + +## Browser Debugging + +Use the debug compose file when MyWhoosh blocks automated login with cookies, captcha, or bot checks: + +```sh +docker compose -f docker-compose.debug.yml up --build +``` + +Then open noVNC: + +```text +http://localhost:6080/vnc.html +``` + +Click `Connect`. You should see Chromium inside the container. Cookie consent is accepted automatically where the banner uses a normal consent button. Captcha/bot checks are left for you to solve manually. The debug run uses: + +```env +MYWHOOSH_HEADLESS=false +MYWHOOSH_MANUAL_LOGIN_WAIT_SECONDS=900 +MYWHOOSH_SLOW_MO_MS=250 +MYWHOOSH_DEBUG_SCREENSHOTS=true +DRY_RUN=true +``` + +If this runs on a remote Linux server, keep the default localhost-only port binding and tunnel it: + +```sh +ssh -L 6080:localhost:6080 user@your-server +``` + +Then open `http://localhost:6080/vnc.html` on your own machine. During the 900-second manual-login window, accept cookies, solve the challenge, and finish the MyWhoosh login in the visible browser. The browser profile is stored in `./data/browser`, so the normal headless service can reuse the session later. + +Debug screenshots are written to: + +```text +./data/debug +``` + +After a successful manual login, stop the debug container and run the normal service: + +```sh +docker compose up -d --build +``` + +## Session Persistence + +MyWhoosh auth is persisted in two places under the mounted `./data` directory: + +```text +./data/browser +./data/mywhoosh_auth_state.json +``` + +Keep that directory when recreating containers. Use `docker compose down` when switching between debug and normal mode, but do not use `docker compose down -v` and do not delete `./data` unless you want to log in again. + +If Chromium reports that the profile is locked after a crash, stop all containers and remove only these root files: + +```text +./data/browser/SingletonCookie +./data/browser/SingletonLock +./data/browser/SingletonSocket +``` + +## Notes + +MyWhoosh does not appear to publish a stable public activity export API. This project uses a persisted headless browser session and conservative hourly polling. If MyWhoosh changes its page structure, adjust these `.env` values before changing code: + +```env +MYWHOOSH_LOGIN_URL= +MYWHOOSH_ACTIVITY_URL= +MYWHOOSH_DOWNLOAD_TEXT_HINTS=fit,download +MYWHOOSH_ACTIVITIES_BUTTON_TEXT=ACTIVITIES +MYWHOOSH_DOWNLOAD_BUTTON_SELECTOR=.btnDownload +``` + +The FIT conversion patches `file_id` and `device_info` messages where Garmin manufacturer/product fields are present, then recalculates header and file CRCs. The default Garmin product ID is configurable so it can be corrected without a rebuild if Garmin FIT profile values change. + +This automates personal account actions. Keep polling conservative and make sure the way you use it is compatible with the services' terms. diff --git a/docker-compose.debug.yml b/docker-compose.debug.yml new file mode 100644 index 0000000..5b9b65c --- /dev/null +++ b/docker-compose.debug.yml @@ -0,0 +1,25 @@ +services: + sync-debug: + build: + context: . + args: + BROWSER_DEBUG_TOOLS: "true" + env_file: + - .env + environment: + DATA_DIR: /data + DRY_RUN: "true" + LOG_LEVEL: DEBUG + MYWHOOSH_HEADLESS: "false" + MYWHOOSH_SLOW_MO_MS: "250" + MYWHOOSH_MANUAL_LOGIN_WAIT_SECONDS: "900" + MYWHOOSH_DEBUG_SCREENSHOTS: "true" + volumes: + - ./data:/data + ports: + - "127.0.0.1:6080:6080" + - "127.0.0.1:5900:5900" + shm_size: "1gb" + restart: "no" + entrypoint: ["/app/scripts/browser-debug.sh"] + command: ["run-once"] diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..232b8c0 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,11 @@ +services: + sync: + build: . + env_file: + - .env + volumes: + - ./data:/data + ports: + - "127.0.0.1:8080:8080" + restart: unless-stopped + command: ["serve"] diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..374c9f2 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,26 @@ +[build-system] +requires = ["setuptools>=68"] +build-backend = "setuptools.build_meta" + +[project] +name = "mywhoosh-garmin-sync" +version = "0.1.0" +description = "Sync MyWhoosh FIT activities to Garmin Connect with local device metadata conversion." +requires-python = ">=3.12" +dependencies = [ + "curl_cffi>=0.11,<1", + "garminconnect>=0.3.3,<0.4", + "playwright>=1.52,<2", + "python-dotenv>=1.0,<2", +] + +[project.scripts] +mywhoosh-garmin-sync = "mywhoosh_garmin_sync.cli:main" + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +pythonpath = ["src"] +addopts = "-q" diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..8039097 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,3 @@ +-r requirements.txt +pytest>=8.0,<9 + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..88cb9bc --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +curl_cffi>=0.11,<1 +garminconnect>=0.3.3,<0.4 +playwright>=1.52,<2 +python-dotenv>=1.0,<2 diff --git a/scripts/browser-debug.sh b/scripts/browser-debug.sh new file mode 100644 index 0000000..f2f0b18 --- /dev/null +++ b/scripts/browser-debug.sh @@ -0,0 +1,16 @@ +#!/bin/sh +set -eu + +export DISPLAY="${DISPLAY:-:99}" +VNC_PORT="${VNC_PORT:-5900}" +NOVNC_PORT="${NOVNC_PORT:-6080}" +VNC_RESOLUTION="${VNC_RESOLUTION:-1440x1000x24}" + +Xvfb "$DISPLAY" -screen 0 "$VNC_RESOLUTION" -ac +extension RANDR >/tmp/xvfb.log 2>&1 & +fluxbox >/tmp/fluxbox.log 2>&1 & +x11vnc -display "$DISPLAY" -forever -shared -nopw -rfbport "$VNC_PORT" >/tmp/x11vnc.log 2>&1 & +websockify --web=/usr/share/novnc/ "$NOVNC_PORT" "localhost:$VNC_PORT" >/tmp/novnc.log 2>&1 & + +echo "noVNC is available on port $NOVNC_PORT. Open /vnc.html and click Connect." + +exec python -m mywhoosh_garmin_sync "$@" diff --git a/src/mywhoosh_garmin_sync/__init__.py b/src/mywhoosh_garmin_sync/__init__.py new file mode 100644 index 0000000..237790b --- /dev/null +++ b/src/mywhoosh_garmin_sync/__init__.py @@ -0,0 +1,4 @@ +"""MyWhoosh to Garmin Connect sync service.""" + +__version__ = "0.1.0" + diff --git a/src/mywhoosh_garmin_sync/__main__.py b/src/mywhoosh_garmin_sync/__main__.py new file mode 100644 index 0000000..1daf881 --- /dev/null +++ b/src/mywhoosh_garmin_sync/__main__.py @@ -0,0 +1,5 @@ +from .cli import main + +if __name__ == "__main__": + raise SystemExit(main()) + diff --git a/src/mywhoosh_garmin_sync/cli.py b/src/mywhoosh_garmin_sync/cli.py new file mode 100644 index 0000000..7d1ace4 --- /dev/null +++ b/src/mywhoosh_garmin_sync/cli.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +import argparse +import asyncio +from pathlib import Path + +from .config import Settings +from .fit_device import GarminDevice, convert_fit_device +from .logging_setup import setup_logging +from .service import SyncService + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="mywhoosh-garmin-sync", + description="Download MyWhoosh FIT files, rewrite device metadata, and upload to Garmin Connect.", + ) + subparsers = parser.add_subparsers(dest="command") + + subparsers.add_parser("serve", help="Run the periodic background sync loop.") + subparsers.add_parser("run-once", help="Run one sync cycle and exit.") + + convert_parser = subparsers.add_parser("convert", help="Convert one FIT file locally.") + convert_parser.add_argument("--input", required=True, type=Path) + convert_parser.add_argument("--output", required=True, type=Path) + + return parser + + +async def _run_async(args: argparse.Namespace, settings: Settings) -> int: + if args.command == "run-once": + service = SyncService(settings) + await service.run_once() + return 0 + + if args.command == "convert": + device = GarminDevice( + manufacturer_id=settings.target_garmin_manufacturer_id, + product_id=settings.target_garmin_product_id, + product_name=settings.target_garmin_product_name, + serial_number=settings.target_garmin_serial_number, + ) + result = convert_fit_device(args.input, args.output, device) + print( + f"Converted {args.input} -> {args.output}; " + f"patched {result.patched_field_count} fields" + ) + return 0 + + service = SyncService(settings) + await service.serve() + return 0 + + +def main(argv: list[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + if args.command is None: + args.command = "serve" + + settings = Settings.from_env() + settings.ensure_directories() + setup_logging(settings.log_level) + return asyncio.run(_run_async(args, settings)) + diff --git a/src/mywhoosh_garmin_sync/config.py b/src/mywhoosh_garmin_sync/config.py new file mode 100644 index 0000000..218ce3b --- /dev/null +++ b/src/mywhoosh_garmin_sync/config.py @@ -0,0 +1,174 @@ +from __future__ import annotations + +import os +from dataclasses import dataclass +from pathlib import Path + +from dotenv import load_dotenv + + +def _bool_env(name: str, default: bool) -> bool: + value = os.getenv(name) + if value is None or value == "": + return default + return value.strip().lower() in {"1", "true", "yes", "y", "on"} + + +def _int_env(name: str, default: int) -> int: + value = os.getenv(name) + if value is None or value == "": + return default + return int(value) + + +def _optional_int_env(name: str) -> int | None: + value = os.getenv(name) + if value is None or value.strip() == "": + return None + return int(value) + + +def _csv_env(name: str, default: list[str]) -> list[str]: + value = os.getenv(name) + if value is None or value.strip() == "": + return default + return [item.strip().lower() for item in value.split(",") if item.strip()] + + +def _required_env(name: str) -> str: + value = os.getenv(name) + if value is None or value.strip() == "": + raise ValueError(f"Missing required environment variable: {name}") + return value + + +@dataclass(frozen=True) +class Settings: + mywhoosh_email: str + mywhoosh_password: str + garmin_email: str + garmin_password: str + + poll_interval_seconds: int + data_dir: Path + raw_dir: Path + converted_dir: Path + browser_state_dir: Path + mywhoosh_auth_state_path: Path + garmin_tokenstore: Path + db_path: Path + log_level: str + dry_run: bool + dashboard_enabled: bool + dashboard_bind: str + dashboard_port: int + + mywhoosh_login_url: str + mywhoosh_activity_url: str + mywhoosh_headless: bool + mywhoosh_timeout_seconds: int + mywhoosh_max_downloads_per_run: int + mywhoosh_download_text_hints: list[str] + mywhoosh_activities_button_text: str + mywhoosh_download_button_selector: str + mywhoosh_slow_mo_ms: int + mywhoosh_manual_login_wait_seconds: int + mywhoosh_debug_screenshots: bool + mywhoosh_debug_dir: Path + + garmin_mfa_code: str | None + + target_garmin_manufacturer_id: int + target_garmin_product_id: int + target_garmin_product_name: str + target_garmin_serial_number: int | None + + @classmethod + def from_env(cls) -> "Settings": + load_dotenv() + + data_dir = Path(os.getenv("DATA_DIR", "/data")) + return cls( + mywhoosh_email=_required_env("MYWHOOSH_EMAIL"), + mywhoosh_password=_required_env("MYWHOOSH_PASSWORD"), + garmin_email=_required_env("GARMIN_EMAIL"), + garmin_password=_required_env("GARMIN_PASSWORD"), + poll_interval_seconds=_int_env("POLL_INTERVAL_SECONDS", 3600), + data_dir=data_dir, + raw_dir=Path(os.getenv("RAW_DIR", str(data_dir / "raw"))), + converted_dir=Path( + os.getenv("CONVERTED_DIR", str(data_dir / "converted")) + ), + browser_state_dir=Path( + os.getenv("MYWHOOSH_BROWSER_STATE_DIR", str(data_dir / "browser")) + ), + mywhoosh_auth_state_path=Path( + os.getenv( + "MYWHOOSH_AUTH_STATE_PATH", + str(data_dir / "mywhoosh_auth_state.json"), + ) + ), + garmin_tokenstore=Path( + os.getenv("GARMIN_TOKENSTORE", str(data_dir / "garmin_tokens")) + ), + db_path=Path(os.getenv("STATE_DB", str(data_dir / "state.sqlite3"))), + log_level=os.getenv("LOG_LEVEL", "INFO").upper(), + dry_run=_bool_env("DRY_RUN", False), + dashboard_enabled=_bool_env("DASHBOARD_ENABLED", True), + dashboard_bind=os.getenv("DASHBOARD_BIND", "0.0.0.0"), + dashboard_port=_int_env("DASHBOARD_PORT", 8080), + mywhoosh_login_url=os.getenv( + "MYWHOOSH_LOGIN_URL", "https://www.mywhoosh.com/login/" + ), + mywhoosh_activity_url=os.getenv( + "MYWHOOSH_ACTIVITY_URL", "https://www.mywhoosh.com/profile/" + ), + mywhoosh_headless=_bool_env("MYWHOOSH_HEADLESS", True), + mywhoosh_timeout_seconds=_int_env("MYWHOOSH_TIMEOUT_SECONDS", 45), + mywhoosh_max_downloads_per_run=_int_env( + "MYWHOOSH_MAX_DOWNLOADS_PER_RUN", 10 + ), + mywhoosh_download_text_hints=_csv_env( + "MYWHOOSH_DOWNLOAD_TEXT_HINTS", ["fit", "download"] + ), + mywhoosh_activities_button_text=os.getenv( + "MYWHOOSH_ACTIVITIES_BUTTON_TEXT", "ACTIVITIES" + ), + mywhoosh_download_button_selector=os.getenv( + "MYWHOOSH_DOWNLOAD_BUTTON_SELECTOR", ".btnDownload" + ), + mywhoosh_slow_mo_ms=_int_env("MYWHOOSH_SLOW_MO_MS", 0), + mywhoosh_manual_login_wait_seconds=_int_env( + "MYWHOOSH_MANUAL_LOGIN_WAIT_SECONDS", 0 + ), + mywhoosh_debug_screenshots=_bool_env( + "MYWHOOSH_DEBUG_SCREENSHOTS", False + ), + mywhoosh_debug_dir=Path( + os.getenv("MYWHOOSH_DEBUG_DIR", str(data_dir / "debug")) + ), + garmin_mfa_code=os.getenv("GARMIN_MFA_CODE") or None, + target_garmin_manufacturer_id=_int_env( + "TARGET_GARMIN_MANUFACTURER_ID", 1 + ), + target_garmin_product_id=_int_env("TARGET_GARMIN_PRODUCT_ID", 3578), + target_garmin_product_name=os.getenv( + "TARGET_GARMIN_PRODUCT_NAME", "Edge 1030 Plus" + ), + target_garmin_serial_number=_optional_int_env( + "TARGET_GARMIN_SERIAL_NUMBER" + ), + ) + + def ensure_directories(self) -> None: + for path in ( + self.data_dir, + self.raw_dir, + self.converted_dir, + self.browser_state_dir, + self.mywhoosh_auth_state_path.parent, + self.garmin_tokenstore, + self.mywhoosh_debug_dir, + self.db_path.parent, + ): + path.mkdir(parents=True, exist_ok=True) diff --git a/src/mywhoosh_garmin_sync/fit_crc.py b/src/mywhoosh_garmin_sync/fit_crc.py new file mode 100644 index 0000000..633c347 --- /dev/null +++ b/src/mywhoosh_garmin_sync/fit_crc.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +CRC_TABLE = ( + 0x0000, + 0xCC01, + 0xD801, + 0x1400, + 0xF001, + 0x3C00, + 0x2800, + 0xE401, + 0xA001, + 0x6C00, + 0x7800, + 0xB401, + 0x5000, + 0x9C01, + 0x8801, + 0x4400, +) + + +def update_crc(crc: int, byte: int) -> int: + tmp = CRC_TABLE[crc & 0xF] + crc = (crc >> 4) & 0x0FFF + crc = crc ^ tmp ^ CRC_TABLE[byte & 0xF] + + tmp = CRC_TABLE[crc & 0xF] + crc = (crc >> 4) & 0x0FFF + crc = crc ^ tmp ^ CRC_TABLE[(byte >> 4) & 0xF] + return crc & 0xFFFF + + +def fit_crc(data: bytes | bytearray | memoryview) -> int: + crc = 0 + for byte in data: + crc = update_crc(crc, byte) + return crc + diff --git a/src/mywhoosh_garmin_sync/fit_device.py b/src/mywhoosh_garmin_sync/fit_device.py new file mode 100644 index 0000000..f278580 --- /dev/null +++ b/src/mywhoosh_garmin_sync/fit_device.py @@ -0,0 +1,381 @@ +from __future__ import annotations + +import logging +import struct +from dataclasses import dataclass +from pathlib import Path + +from .fit_crc import fit_crc + +logger = logging.getLogger(__name__) + +FILE_ID_MESG_NUM = 0 +DEVICE_INFO_MESG_NUM = 23 +GARMIN_MANUFACTURER_ID = 1 + + +@dataclass(frozen=True) +class GarminDevice: + manufacturer_id: int = GARMIN_MANUFACTURER_ID + product_id: int = 3578 + product_name: str = "Edge 1030 Plus" + serial_number: int | None = None + + +@dataclass(frozen=True) +class FitConversionResult: + source_path: Path + output_path: Path + patched_field_count: int + header_crc: int | None + file_crc: int + + +@dataclass(frozen=True) +class FieldDefinition: + num: int + size: int + base_type: int + + +@dataclass(frozen=True) +class LocalDefinition: + global_message_num: int + endian: str + fields: tuple[FieldDefinition, ...] + record_size: int + developer_field_size: int + + +@dataclass(frozen=True) +class DeviceFieldValue: + global_message_num: int + field_num: int + value: int | str + + +class FitFormatError(ValueError): + """Raised when a file is not a valid enough FIT file for metadata patching.""" + + +def convert_fit_device( + source_path: Path, output_path: Path, device: GarminDevice | None = None +) -> FitConversionResult: + device = device or GarminDevice() + data = bytearray(source_path.read_bytes()) + _validate_fit_container(data) + + patched_count = _patch_device_metadata(data, device) + header_crc = _rewrite_header_crc(data) + file_crc = _rewrite_file_crc(data) + + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_bytes(data) + logger.info( + "Converted FIT metadata for %s -> %s; patched_fields=%d", + source_path, + output_path, + patched_count, + ) + return FitConversionResult( + source_path=source_path, + output_path=output_path, + patched_field_count=patched_count, + header_crc=header_crc, + file_crc=file_crc, + ) + + +def is_fit_file(path: Path) -> bool: + try: + data = path.read_bytes() + _validate_fit_container(data) + except (OSError, FitFormatError): + return False + return True + + +def read_device_field_values(path: Path) -> list[DeviceFieldValue]: + data = bytearray(path.read_bytes()) + _validate_fit_container(data) + values: list[DeviceFieldValue] = [] + for definition, field_offsets in _iter_data_fields(data): + for field, offset in field_offsets: + if definition.global_message_num == FILE_ID_MESG_NUM and field.num in { + 1, + 2, + 3, + 8, + }: + values.append( + DeviceFieldValue( + definition.global_message_num, + field.num, + _read_field_value(data, offset, field, definition.endian), + ) + ) + if definition.global_message_num == DEVICE_INFO_MESG_NUM and field.num in { + 2, + 3, + 4, + 27, + }: + values.append( + DeviceFieldValue( + definition.global_message_num, + field.num, + _read_field_value(data, offset, field, definition.endian), + ) + ) + return values + + +def _validate_fit_container(data: bytearray) -> None: + if len(data) < 14: + raise FitFormatError("FIT file is too small") + + header_size = data[0] + if header_size not in {12, 14}: + raise FitFormatError(f"Unsupported FIT header size: {header_size}") + + if len(data) < header_size + 2: + raise FitFormatError("FIT file is shorter than its header") + + if bytes(data[8:12]) != b".FIT": + raise FitFormatError("Missing .FIT signature") + + data_size = struct.unpack_from(" int: + patched_count = 0 + eligible_field_count = 0 + for definition, field_offsets in _iter_data_fields(data): + for field, offset in field_offsets: + target_value: int | str | None = None + if definition.global_message_num == FILE_ID_MESG_NUM: + if field.num == 1: + target_value = device.manufacturer_id + elif field.num == 2: + target_value = device.product_id + elif field.num == 3 and device.serial_number is not None: + target_value = device.serial_number + elif field.num == 8: + target_value = device.product_name + elif definition.global_message_num == DEVICE_INFO_MESG_NUM: + if field.num == 2: + target_value = device.manufacturer_id + elif field.num == 3 and device.serial_number is not None: + target_value = device.serial_number + elif field.num == 4: + target_value = device.product_id + elif field.num == 27: + target_value = device.product_name + + if target_value is not None: + eligible_field_count += 1 + if _write_field_value(data, offset, field, definition.endian, target_value): + patched_count += 1 + + if eligible_field_count == 0: + raise FitFormatError("No writable file_id or device_info device fields found") + return patched_count + + +def _iter_data_fields( + data: bytearray, +) -> list[tuple[LocalDefinition, list[tuple[FieldDefinition, int]]]]: + header_size = data[0] + data_size = struct.unpack_from("> 5) & 0x03 + definition = definitions.get(local_message_type) + if definition is None: + raise FitFormatError( + f"Compressed timestamp record used unknown local definition {local_message_type}" + ) + field_offsets, offset = _collect_field_offsets(definition, offset) + data_records.append((definition, field_offsets)) + continue + + local_message_type = record_header & 0x0F + is_definition = bool(record_header & 0x40) + has_developer_fields = bool(record_header & 0x20) + + if is_definition: + definition, offset = _read_definition( + data, offset, local_message_type, has_developer_fields + ) + definitions[local_message_type] = definition + continue + + definition = definitions.get(local_message_type) + if definition is None: + raise FitFormatError( + f"Data record used unknown local definition {local_message_type}" + ) + field_offsets, offset = _collect_field_offsets(definition, offset) + data_records.append((definition, field_offsets)) + + if offset != end_offset: + raise FitFormatError("FIT parser did not end on data boundary") + return data_records + + +def _read_definition( + data: bytearray, + offset: int, + local_message_type: int, + has_developer_fields: bool, +) -> tuple[LocalDefinition, int]: + del local_message_type + if offset + 5 > len(data): + raise FitFormatError("Truncated FIT definition message") + + offset += 1 + architecture = data[offset] + offset += 1 + endian = ">" if architecture == 1 else "<" + global_message_num = struct.unpack_from(f"{endian}H", data, offset)[0] + offset += 2 + field_count = data[offset] + offset += 1 + + fields: list[FieldDefinition] = [] + record_size = 0 + for _ in range(field_count): + if offset + 3 > len(data): + raise FitFormatError("Truncated FIT field definition") + field = FieldDefinition( + num=data[offset], + size=data[offset + 1], + base_type=data[offset + 2], + ) + fields.append(field) + record_size += field.size + offset += 3 + + developer_field_size = 0 + if has_developer_fields: + if offset >= len(data): + raise FitFormatError("Truncated FIT developer field count") + developer_field_count = data[offset] + offset += 1 + for _ in range(developer_field_count): + if offset + 3 > len(data): + raise FitFormatError("Truncated FIT developer fields") + developer_field_size += data[offset + 1] + offset += 3 + record_size += developer_field_size + + return ( + LocalDefinition( + global_message_num=global_message_num, + endian=endian, + fields=tuple(fields), + record_size=record_size, + developer_field_size=developer_field_size, + ), + offset, + ) + + +def _collect_field_offsets( + definition: LocalDefinition, offset: int +) -> tuple[list[tuple[FieldDefinition, int]], int]: + field_offsets: list[tuple[FieldDefinition, int]] = [] + current_offset = offset + for field in definition.fields: + field_offsets.append((field, current_offset)) + current_offset += field.size + current_offset += definition.developer_field_size + return field_offsets, current_offset + + +def _read_field_value( + data: bytearray, offset: int, field: FieldDefinition, endian: str +) -> int | str: + base_type = field.base_type & 0x1F + if base_type in {0x03, 0x04, 0x0B} and field.size >= 2: + return struct.unpack_from(f"{endian}H", data, offset)[0] + if base_type in {0x05, 0x06, 0x0C} and field.size >= 4: + return struct.unpack_from(f"{endian}I", data, offset)[0] + if base_type == 0x07: + raw = bytes(data[offset : offset + field.size]) + if 0 in raw: + raw = raw[: raw.index(0)] + return raw.decode("utf-8", errors="replace") + raw = bytes(data[offset : offset + field.size]) + return int.from_bytes(raw, "little") + + +def _write_field_value( + data: bytearray, + offset: int, + field: FieldDefinition, + endian: str, + value: int | str, +) -> bool: + if isinstance(value, str): + encoded = value.encode("utf-8") + if not encoded or field.size == 0 or len(encoded) + 1 > field.size: + return False + replacement = encoded + b"\x00" + b"\x00" * (field.size - len(encoded) - 1) + if bytes(data[offset : offset + field.size]) == replacement: + return False + data[offset : offset + field.size] = replacement + return True + + if field.size == 1: + replacement = struct.pack("B", value) + elif field.size == 2: + replacement = struct.pack(f"{endian}H", value) + elif field.size == 4: + replacement = struct.pack(f"{endian}I", value) + else: + return False + + if bytes(data[offset : offset + field.size]) == replacement: + return False + data[offset : offset + field.size] = replacement + return True + + +def _rewrite_header_crc(data: bytearray) -> int | None: + header_size = data[0] + if header_size != 14: + return None + header_crc = fit_crc(data[:12]) + struct.pack_into(" int: + file_crc = fit_crc(data[:-2]) + struct.pack_into(" Any: + ... + + def upload_activity(self, activity_path: str) -> Any: + ... + + +@dataclass(frozen=True) +class UploadResult: + status: str + duplicate: bool + garmin_activity_id: str | None + raw_response: Any + + +class GarminUploadBlocked(RuntimeError): + """Raised when Garmin login needs user action such as MFA.""" + + +class GarminUploader: + def __init__( + self, + settings: Settings, + client_factory: Callable[..., GarminClientProtocol] | None = None, + ) -> None: + self.settings = settings + self._client_factory = client_factory + self._client: GarminClientProtocol | None = None + + def upload(self, fit_path: Path) -> UploadResult: + client = self._ensure_client() + try: + response = client.upload_activity(str(fit_path)) + except Exception as exc: + if _looks_duplicate_error(exc): + logger.info("Garmin already has activity for %s", fit_path) + return UploadResult( + status="duplicate", + duplicate=True, + garmin_activity_id=None, + raw_response=str(exc), + ) + raise + + return UploadResult( + status="uploaded", + duplicate=False, + garmin_activity_id=_extract_activity_id(response), + raw_response=response, + ) + + def _ensure_client(self) -> GarminClientProtocol: + if self._client is not None: + return self._client + + factory = self._client_factory or _default_garmin_factory + client = factory( + self.settings.garmin_email, + self.settings.garmin_password, + prompt_mfa=self._prompt_mfa, + ) + try: + client.login(str(self.settings.garmin_tokenstore)) + except RuntimeError: + raise + except Exception as exc: + if "mfa" in str(exc).lower(): + raise GarminUploadBlocked( + "Garmin MFA is required. Set GARMIN_MFA_CODE for one run." + ) from exc + raise + self._client = client + return client + + def _prompt_mfa(self) -> str: + if self.settings.garmin_mfa_code: + return self.settings.garmin_mfa_code + raise GarminUploadBlocked( + "Garmin requested MFA but GARMIN_MFA_CODE is not set." + ) + + +def _default_garmin_factory(*args: Any, **kwargs: Any) -> GarminClientProtocol: + from garminconnect import Garmin + + return Garmin(*args, **kwargs) + + +def _looks_duplicate_error(exc: Exception) -> bool: + text = str(exc).lower() + return any(token in text for token in ("duplicate", "already exists", "409")) + + +def _extract_activity_id(response: Any) -> str | None: + if not isinstance(response, dict): + return None + + candidates = [ + response.get("activityId"), + response.get("activity_id"), + response.get("id"), + ] + detailed_import = response.get("detailedImportResult") + if isinstance(detailed_import, dict): + candidates.extend( + [ + detailed_import.get("uploadId"), + detailed_import.get("activityId"), + ] + ) + + for key in ("successes", "success", "importedActivities"): + items = response.get(key) + if isinstance(items, list) and items: + first = items[0] + if isinstance(first, dict): + candidates.extend([first.get("activityId"), first.get("id")]) + + for candidate in candidates: + if candidate is not None: + return str(candidate) + return None + diff --git a/src/mywhoosh_garmin_sync/logging_setup.py b/src/mywhoosh_garmin_sync/logging_setup.py new file mode 100644 index 0000000..cd64024 --- /dev/null +++ b/src/mywhoosh_garmin_sync/logging_setup.py @@ -0,0 +1,11 @@ +from __future__ import annotations + +import logging + + +def setup_logging(level: str = "INFO") -> None: + logging.basicConfig( + level=getattr(logging, level.upper(), logging.INFO), + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + ) + diff --git a/src/mywhoosh_garmin_sync/models.py b/src/mywhoosh_garmin_sync/models.py new file mode 100644 index 0000000..20dafd1 --- /dev/null +++ b/src/mywhoosh_garmin_sync/models.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path + + +@dataclass(frozen=True) +class DownloadCandidate: + source_ref: str + title: str + href: str | None + element_index: int + click_selector: str + + +@dataclass(frozen=True) +class DownloadedActivity: + source_ref: str + title: str + url: str | None + raw_path: Path diff --git a/src/mywhoosh_garmin_sync/mywhoosh.py b/src/mywhoosh_garmin_sync/mywhoosh.py new file mode 100644 index 0000000..9841514 --- /dev/null +++ b/src/mywhoosh_garmin_sync/mywhoosh.py @@ -0,0 +1,759 @@ +from __future__ import annotations + +import asyncio +import hashlib +import json +import logging +import re +from datetime import UTC, datetime +from pathlib import Path +from typing import Callable +from urllib.parse import urljoin, urlparse + +from playwright.async_api import BrowserContext, Page, TimeoutError, async_playwright + +from .config import Settings +from .fit_device import is_fit_file +from .models import DownloadCandidate, DownloadedActivity + +logger = logging.getLogger(__name__) + +COOKIE_ACCEPT_TEXT_PATTERN = ( + r"^(accept( all)?( cookies?)?|allow all( cookies?)?|agree|i agree|got it|ok|okay)$" +) +COOKIE_ACCEPT_TEXT_RE = re.compile(COOKIE_ACCEPT_TEXT_PATTERN, re.I) +COOKIE_ACCEPT_SELECTORS = ( + "#onetrust-accept-btn-handler", + "button#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll", + "button.cky-btn-accept", + "button[id*='accept'][id*='cookie' i]", + "button[id*='cookie'][id*='accept' i]", + "button[class*='accept'][class*='cookie' i]", + "button[class*='cookie'][class*='accept' i]", + "button[data-testid*='accept' i]", + "button[data-test*='accept' i]", + "[role='button'][aria-label*='accept' i]", +) +COOKIE_ACCEPT_JS_PATTERN = ( + r"^(accept( all)?( cookies?)?|allow all( cookies?)?|agree|i agree|got it|ok|okay)$" +) +CHALLENGE_TEXT_TOKENS = ( + "captcha", + "recaptcha", + "hcaptcha", + "turnstile", + "not a robot", + "i'm not a robot", + "i am not a robot", + "verify you are human", + "verify that you are human", + "checking your browser", + "security check", + "cloudflare", + "cf-challenge", + "cf-turnstile", +) +CHALLENGE_URL_TOKENS = ( + "captcha", + "recaptcha", + "hcaptcha", + "turnstile", + "challenge", + "cloudflare", + "verify", +) + + +class MyWhooshCrawler: + def __init__(self, settings: Settings) -> None: + self.settings = settings + + async def download_new_activities( + self, should_skip_source: Callable[[str], bool] + ) -> list[DownloadedActivity]: + timeout_ms = self.settings.mywhoosh_timeout_seconds * 1000 + async with async_playwright() as playwright: + context = await playwright.chromium.launch_persistent_context( + user_data_dir=str(self.settings.browser_state_dir), + headless=self.settings.mywhoosh_headless, + accept_downloads=True, + args=["--no-sandbox", "--disable-dev-shm-usage"], + slow_mo=self.settings.mywhoosh_slow_mo_ms, + timeout=timeout_ms, + viewport={"width": 1440, "height": 1000}, + ) + page: Page | None = None + try: + await self._restore_auth_state(context) + page = context.pages[0] if context.pages else await context.new_page() + page.set_default_timeout(timeout_ms) + await self._ensure_logged_in(page) + candidates = await self._find_candidates(page) + logger.info("Found %d possible MyWhoosh download links", len(candidates)) + + downloaded: list[DownloadedActivity] = [] + for candidate in candidates: + if len(downloaded) >= self.settings.mywhoosh_max_downloads_per_run: + break + if should_skip_source(candidate.source_ref): + logger.debug("Skipping already terminal source %s", candidate.source_ref) + continue + try: + activity = await self._download_candidate( + context, page, candidate + ) + except Exception: + logger.exception( + "Failed downloading MyWhoosh candidate %s", + candidate.source_ref, + ) + continue + if activity is not None: + downloaded.append(activity) + return downloaded + finally: + await self._save_auth_state(context) + await context.close() + + async def _ensure_logged_in(self, page: Page) -> None: + await self._goto(page, self.settings.mywhoosh_activity_url) + await self._dismiss_cookie_banner(page) + await self._handle_challenge_if_needed(page, "activity-before-login") + await self._debug_screenshot(page, "activity-before-login") + if not await self._login_form_visible(page): + return + + logger.info("Logging in to MyWhoosh") + await self._goto(page, self.settings.mywhoosh_login_url) + await self._dismiss_cookie_banner(page) + await self._handle_challenge_if_needed(page, "login-page") + await self._debug_screenshot(page, "login-page") + + email_selector = ( + 'input[type="email"], input[name*="email" i], input[name*="user" i], ' + 'input[autocomplete="username"]' + ) + password_selector = 'input[type="password"], input[autocomplete="current-password"]' + + try: + await page.locator(email_selector).first.fill(self.settings.mywhoosh_email) + await page.locator(password_selector).first.fill( + self.settings.mywhoosh_password + ) + await self._dismiss_cookie_banner(page) + + submit = page.locator( + 'button[type="submit"], input[type="submit"], button:has-text("Login"), ' + 'button:has-text("Log in"), button:has-text("Sign in")' + ).first + await submit.click() + except TimeoutError: + if self.settings.mywhoosh_manual_login_wait_seconds <= 0: + raise + logger.warning( + "Could not complete automatic MyWhoosh login. Waiting for manual login." + ) + await self._wait_for_manual_login(page) + return + + await self._wait_for_quiet(page) + await self._dismiss_cookie_banner(page) + await self._handle_challenge_if_needed(page, "after-login-submit") + await self._debug_screenshot(page, "after-login-submit") + if self.settings.mywhoosh_manual_login_wait_seconds > 0: + await self._wait_for_manual_login(page) + await self._goto(page, self.settings.mywhoosh_activity_url) + await self._dismiss_cookie_banner(page) + await self._handle_challenge_if_needed(page, "activity-after-login") + + if await self._login_form_visible(page): + if self.settings.mywhoosh_manual_login_wait_seconds > 0: + await self._wait_for_manual_login(page) + await self._goto(page, self.settings.mywhoosh_activity_url) + await self._dismiss_cookie_banner(page) + await self._handle_challenge_if_needed(page, "activity-after-manual-login") + if not await self._login_form_visible(page): + return + raise RuntimeError("MyWhoosh login did not complete; check credentials or MFA") + + async def _find_candidates(self, page: Page) -> list[DownloadCandidate]: + await self._goto(page, self.settings.mywhoosh_activity_url) + await self._dismiss_cookie_banner(page) + await self._handle_challenge_if_needed(page, "activity-download-scan") + await self._open_activities_view(page) + await self._debug_screenshot(page, "activity-download-scan") + hints = self.settings.mywhoosh_download_text_hints + raw_items = await page.evaluate( + """ + ({ hints, downloadSelector }) => { + const safeQueryAll = (selector) => { + if (!selector) { + return []; + } + try { + return Array.from(document.querySelectorAll(selector)); + } catch { + return []; + } + }; + const safeMatches = (el, selector) => { + if (!selector) { + return false; + } + try { + return el.matches(selector); + } catch { + return false; + } + }; + const priorityElements = safeQueryAll(downloadSelector); + const generalElements = Array.from( + document.querySelectorAll('a, button, [role="button"]') + ); + const elements = []; + const seen = new Set(); + for (const el of [...priorityElements, ...generalElements]) { + if (seen.has(el)) { + continue; + } + seen.add(el); + elements.push(el); + } + + return elements.map((el, index) => { + const marker = `mywhoosh-sync-${index}`; + el.setAttribute('data-mywhoosh-sync-index', marker); + const style = window.getComputedStyle(el); + const rect = el.getBoundingClientRect(); + const visible = style.visibility !== 'hidden' + && style.display !== 'none' + && rect.width > 0 + && rect.height > 0; + const href = el.href || el.getAttribute('href') || null; + const text = (el.innerText || el.textContent || '').trim(); + const aria = el.getAttribute('aria-label') || ''; + const download = el.getAttribute('download') || ''; + const className = typeof el.className === 'string' ? el.className : ''; + const row = el.closest( + 'tr, li, article, [role="row"], .activity, .activity-row, ' + + '.activity-card, .ride, .ride-row, .workout, .card' + ); + const rowText = row ? (row.innerText || row.textContent || '').trim() : ''; + const matchesDownloadSelector = safeMatches(el, downloadSelector); + const haystack = `${href || ''} ${text} ${aria} ${download} ${className} ${rowText}`.toLowerCase(); + return { + index, + href, + text, + aria, + download, + className, + rowText, + matchesDownloadSelector, + visible, + haystack, + clickSelector: `[data-mywhoosh-sync-index="${marker}"]`, + }; + }).filter((item) => { + return item.visible + && ( + item.matchesDownloadSelector + || hints.some((hint) => item.haystack.includes(hint)) + ); + }); + } + """, + { + "hints": hints, + "downloadSelector": self.settings.mywhoosh_download_button_selector, + }, + ) + first_selector_match = next( + (item for item in raw_items if item.get("matchesDownloadSelector")), None + ) + if first_selector_match is not None: + raw_items = [first_selector_match] + + candidates: list[DownloadCandidate] = [] + seen: set[str] = set() + for item in raw_items: + if _looks_like_app_download(item.get("haystack", "")): + continue + href = item.get("href") + text = item.get("text") or item.get("aria") or item.get("download") or "activity" + absolute_href = ( + urljoin(self.settings.mywhoosh_activity_url, href) + if href and not href.startswith("javascript:") + else None + ) + row_text = item.get("rowText") or "" + source_ref = _source_ref(absolute_href, text, item["index"], row_text) + if source_ref in seen: + continue + seen.add(source_ref) + candidates.append( + DownloadCandidate( + source_ref=source_ref, + title=_clean_title(row_text or text), + href=absolute_href, + element_index=item["index"], + click_selector=item["clickSelector"], + ) + ) + return candidates + + async def _download_candidate( + self, context: BrowserContext, page: Page, candidate: DownloadCandidate + ) -> DownloadedActivity | None: + if candidate.href: + downloaded = await self._download_direct(context, candidate) + if downloaded is not None: + return downloaded + + locator = page.locator(candidate.click_selector).first + try: + async with page.expect_download( + timeout=self.settings.mywhoosh_timeout_seconds * 1000 + ) as download_info: + await locator.click() + download = await download_info.value + suggested = download.suggested_filename or f"{candidate.source_ref}.fit" + raw_path = self._raw_output_path(candidate, suggested) + await download.save_as(raw_path) + except TimeoutError: + logger.warning("Clicking candidate did not produce a download: %s", candidate) + return None + + if not is_fit_file(raw_path): + raw_path.unlink(missing_ok=True) + logger.warning("Downloaded file was not a valid FIT file: %s", raw_path) + return None + + return DownloadedActivity( + source_ref=candidate.source_ref, + title=candidate.title, + url=candidate.href, + raw_path=raw_path, + ) + + async def _download_direct( + self, context: BrowserContext, candidate: DownloadCandidate + ) -> DownloadedActivity | None: + if candidate.href is None: + return None + response = await context.request.get(candidate.href) + if not response.ok: + logger.debug("Direct download failed for %s: %s", candidate.href, response.status) + return None + + body = await response.body() + if len(body) < 14 or body[8:12] != b".FIT": + return None + + filename = _filename_from_headers( + response.headers.get("content-disposition", "") + ) or _filename_from_url(candidate.href) or f"{candidate.source_ref}.fit" + raw_path = self._raw_output_path(candidate, filename) + raw_path.write_bytes(body) + + if not is_fit_file(raw_path): + raw_path.unlink(missing_ok=True) + logger.warning("Direct response was FIT-like but invalid: %s", candidate.href) + return None + + return DownloadedActivity( + source_ref=candidate.source_ref, + title=candidate.title, + url=candidate.href, + raw_path=raw_path, + ) + + def _raw_output_path(self, candidate: DownloadCandidate, filename: str) -> Path: + safe_name = _safe_filename(filename) + if not safe_name.lower().endswith(".fit"): + safe_name = f"{safe_name}.fit" + return self.settings.raw_dir / f"{candidate.source_ref}_{safe_name}" + + async def _goto(self, page: Page, url: str) -> None: + await page.goto(url, wait_until="domcontentloaded") + await self._wait_for_quiet(page) + + async def _wait_for_quiet(self, page: Page) -> None: + try: + await page.wait_for_load_state("networkidle", timeout=10_000) + except TimeoutError: + await asyncio.sleep(1) + + async def _restore_auth_state(self, context: BrowserContext) -> None: + path = self.settings.mywhoosh_auth_state_path + if not path.exists(): + return + + try: + state = json.loads(path.read_text(encoding="utf-8")) + cookies = state.get("cookies") or [] + origins = state.get("origins") or [] + if cookies: + await context.add_cookies(cookies) + if origins: + await context.add_init_script(_storage_restore_script(origins)) + logger.info("Restored MyWhoosh auth state from %s", path) + except Exception: + logger.exception("Failed restoring MyWhoosh auth state from %s", path) + + async def _save_auth_state(self, context: BrowserContext) -> None: + path = self.settings.mywhoosh_auth_state_path + try: + state = await context.storage_state() + state.setdefault("origins", []) + for browser_page in context.pages: + if browser_page.is_closed(): + continue + await self._capture_page_storage(browser_page, state) + + path.parent.mkdir(parents=True, exist_ok=True) + tmp_path = path.with_suffix(f"{path.suffix}.tmp") + tmp_path.write_text(json.dumps(state, indent=2), encoding="utf-8") + tmp_path.replace(path) + logger.info("Saved MyWhoosh auth state to %s", path) + except Exception: + logger.exception("Failed saving MyWhoosh auth state to %s", path) + + async def _capture_page_storage(self, page: Page, state: dict) -> None: + try: + storage = await page.evaluate( + """ + () => { + const entries = (storage) => { + const items = []; + for (let index = 0; index < storage.length; index += 1) { + const name = storage.key(index); + items.push({ name, value: storage.getItem(name) }); + } + return items; + }; + return { + origin: window.location.origin, + localStorage: entries(window.localStorage), + sessionStorage: entries(window.sessionStorage), + }; + } + """ + ) + except Exception: + return + + origin = storage.get("origin") + if not origin or origin == "null": + return + + origins = state.setdefault("origins", []) + existing = next( + (item for item in origins if item.get("origin") == origin), + None, + ) + if existing is None: + origins.append(storage) + return + + existing["localStorage"] = storage.get("localStorage", []) + existing["sessionStorage"] = storage.get("sessionStorage", []) + + async def _login_form_visible(self, page: Page) -> bool: + try: + return await page.locator('input[type="password"]').first.is_visible() + except Exception: + return False + + async def _open_activities_view(self, page: Page) -> None: + label = self.settings.mywhoosh_activities_button_text.strip() + if not label: + return + + label_re = re.compile(rf"^\s*{re.escape(label)}\s*$", re.I) + locators = ( + page.get_by_role("button", name=label_re), + page.get_by_role("link", name=label_re), + page.get_by_role("tab", name=label_re), + page.locator("a, button, [role='button'], [role='tab']").filter( + has_text=label_re + ), + ) + for locator in locators: + if await self._click_first_visible(locator): + logger.info("Opened MyWhoosh %s view", label) + await self._wait_for_quiet(page) + await self._dismiss_cookie_banner(page) + await self._handle_challenge_if_needed(page, "after-activities-click") + await self._debug_screenshot(page, "after-activities-click") + return + + logger.debug("No visible MyWhoosh %s button found", label) + + async def _dismiss_cookie_banner(self, page: Page) -> None: + clicked = False + for _ in range(3): + clicked_this_round = False + for selector in COOKIE_ACCEPT_SELECTORS: + if await self._click_first_visible(page.locator(selector)): + clicked = True + clicked_this_round = True + break + + if not clicked_this_round and await self._click_first_visible( + page.get_by_role("button", name=COOKIE_ACCEPT_TEXT_RE) + ): + clicked = True + clicked_this_round = True + + if not clicked_this_round and await self._click_cookie_banner_by_script(page): + clicked = True + clicked_this_round = True + + if not clicked_this_round: + break + + await asyncio.sleep(0.5) + + if clicked: + logger.info("Accepted MyWhoosh cookie consent") + await self._wait_for_quiet(page) + + async def _click_first_visible(self, locator) -> bool: + try: + count = await locator.count() + except Exception: + count = 1 + + for index in range(min(count, 5)): + try: + item = locator.nth(index) + if await item.is_visible(timeout=500): + await item.click(timeout=1500) + return True + except Exception: + continue + return False + + async def _click_cookie_banner_by_script(self, page: Page) -> bool: + try: + return bool( + await page.evaluate( + """ + (pattern) => { + const re = new RegExp(pattern, 'i'); + const candidates = Array.from(document.querySelectorAll( + 'button, [role="button"], input[type="button"], input[type="submit"]' + )); + for (const element of candidates) { + const style = window.getComputedStyle(element); + const rect = element.getBoundingClientRect(); + const visible = style.visibility !== 'hidden' + && style.display !== 'none' + && rect.width > 0 + && rect.height > 0; + if (!visible) { + continue; + } + const label = ( + element.innerText + || element.value + || element.getAttribute('aria-label') + || '' + ).replace(/\\s+/g, ' ').trim().toLowerCase(); + if (re.test(label)) { + element.click(); + return true; + } + } + return false; + } + """, + COOKIE_ACCEPT_JS_PATTERN, + ) + ) + except Exception: + return False + + async def _handle_challenge_if_needed(self, page: Page, stage: str) -> None: + if not await self._challenge_visible(page): + return + + await self._debug_screenshot(page, f"challenge-{_safe_debug_name(stage)}") + if self.settings.mywhoosh_manual_login_wait_seconds <= 0: + raise RuntimeError( + "MyWhoosh presented a captcha/bot challenge. Run " + "`docker compose -f docker-compose.debug.yml up --build`, open noVNC, " + "solve the challenge manually, then rerun the normal service." + ) + + logger.warning( + "MyWhoosh presented a captcha/bot challenge at %s. Waiting for manual solve.", + stage, + ) + await self._wait_for_manual_login(page) + + async def _challenge_visible(self, page: Page) -> bool: + if _looks_like_challenge_url(page.url): + return True + + try: + challenge_signal = await page.evaluate( + """ + () => { + const frameText = Array.from(document.querySelectorAll('iframe')) + .map((frame) => [ + frame.getAttribute('src') || '', + frame.getAttribute('title') || '', + frame.getAttribute('name') || '', + frame.getAttribute('id') || '' + ].join(' ')) + .join('\\n'); + return [ + document.title || '', + document.body?.innerText || '', + frameText + ].join('\\n'); + } + """ + ) + except Exception: + return False + + return _looks_like_challenge_text(challenge_signal) + + async def _wait_for_manual_login(self, page: Page) -> None: + seconds = self.settings.mywhoosh_manual_login_wait_seconds + logger.warning( + "Manual MyWhoosh login window is open for %d seconds. " + "Use the visible browser to accept cookies, solve challenges, and finish login.", + seconds, + ) + await self._debug_screenshot(page, "manual-login-start") + deadline = asyncio.get_running_loop().time() + seconds + while asyncio.get_running_loop().time() < deadline: + if await self._manual_login_complete(page): + await self._wait_for_quiet(page) + logger.info("Manual MyWhoosh login appears complete") + await self._debug_screenshot(page, "manual-login-complete") + return + await asyncio.sleep(2) + raise RuntimeError("Timed out waiting for manual MyWhoosh login") + + async def _manual_login_complete(self, page: Page) -> bool: + if await self._login_form_visible(page): + return False + if await self._challenge_visible(page): + return False + current_url = page.url.lower() + blocking_url_tokens = ("login", "signin", "sign-in", "auth", "captcha", "challenge") + return not any(token in current_url for token in blocking_url_tokens) + + async def _debug_screenshot(self, page: Page, name: str) -> None: + if not self.settings.mywhoosh_debug_screenshots: + return + timestamp = datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ") + path = self.settings.mywhoosh_debug_dir / f"{timestamp}_{name}.png" + try: + await page.screenshot(path=str(path), full_page=True) + logger.debug("Saved debug screenshot %s", path) + except Exception: + logger.exception("Failed saving debug screenshot %s", path) + + +def _source_ref(href: str | None, text: str, index: int, row_text: str = "") -> str: + stable = href or f"{row_text or text}|{index}" + return hashlib.sha256(stable.encode("utf-8")).hexdigest()[:24] + + +def _storage_restore_script(origins: list[dict]) -> str: + origins_json = json.dumps(origins) + return f""" + (() => {{ + const origins = {origins_json}; + const originState = origins.find((item) => item.origin === window.location.origin); + if (!originState) {{ + return; + }} + const restore = (storage, entries) => {{ + for (const entry of entries || []) {{ + if (entry && entry.name !== null && entry.value !== null) {{ + storage.setItem(entry.name, entry.value); + }} + }} + }}; + try {{ + restore(window.localStorage, originState.localStorage); + restore(window.sessionStorage, originState.sessionStorage); + }} catch {{ + // Ignore blocked storage access. The persisted Chromium profile is still available. + }} + }})(); + """ + + +def _clean_title(value: str) -> str: + value = re.sub(r"\s+", " ", value).strip() + return value[:120] or "MyWhoosh activity" + + +def _looks_like_cookie_accept_label(value: str) -> bool: + normalized = re.sub(r"\s+", " ", value).strip().lower() + return bool(COOKIE_ACCEPT_TEXT_RE.match(normalized)) + + +def _looks_like_challenge_text(value: str) -> bool: + normalized = _normalize_challenge_signal(value) + return any(token in normalized for token in CHALLENGE_TEXT_TOKENS) + + +def _looks_like_challenge_url(url: str) -> bool: + normalized = _normalize_challenge_signal(url) + return any(token in normalized for token in CHALLENGE_URL_TOKENS) + + +def _normalize_challenge_signal(value: str) -> str: + value = value.replace("\u2019", "'") + return re.sub(r"\s+", " ", value).strip().lower() + + +def _looks_like_app_download(haystack: str) -> bool: + return any( + token in haystack + for token in ( + "download app", + "app store", + "google play", + "play.google.com", + "apps.apple.com", + "windows", + "macos", + "android", + "ios", + ) + ) + + +def _safe_filename(value: str) -> str: + value = _clean_title(value) + value = re.sub(r"[^A-Za-z0-9._-]+", "_", value).strip("._") + return value or "activity.fit" + + +def _safe_debug_name(value: str) -> str: + value = re.sub(r"[^A-Za-z0-9._-]+", "-", value).strip("-") + return value or "page" + + +def _filename_from_headers(content_disposition: str) -> str | None: + match = re.search(r'filename\*?=(?:UTF-8\'\')?"?([^";]+)"?', content_disposition) + if not match: + return None + return match.group(1) + + +def _filename_from_url(url: str | None) -> str | None: + if not url: + return None + name = Path(urlparse(url).path).name + return name or None diff --git a/src/mywhoosh_garmin_sync/service.py b/src/mywhoosh_garmin_sync/service.py new file mode 100644 index 0000000..24d8d22 --- /dev/null +++ b/src/mywhoosh_garmin_sync/service.py @@ -0,0 +1,133 @@ +from __future__ import annotations + +import asyncio +import logging +import signal +from pathlib import Path + +from .config import Settings +from .fit_device import GarminDevice, convert_fit_device +from .garmin import GarminUploadBlocked, GarminUploader +from .models import DownloadedActivity +from .mywhoosh import MyWhooshCrawler +from .state import ActivityStore, sha256_file + +logger = logging.getLogger(__name__) + + +class SyncService: + def __init__( + self, + settings: Settings, + crawler: MyWhooshCrawler | None = None, + uploader: GarminUploader | None = None, + store: ActivityStore | None = None, + ) -> None: + self.settings = settings + self.crawler = crawler or MyWhooshCrawler(settings) + self.uploader = uploader or GarminUploader(settings) + self.store = store or ActivityStore(settings.db_path) + self.device = GarminDevice( + manufacturer_id=settings.target_garmin_manufacturer_id, + product_id=settings.target_garmin_product_id, + product_name=settings.target_garmin_product_name, + serial_number=settings.target_garmin_serial_number, + ) + + async def serve(self) -> None: + stop_event = asyncio.Event() + + try: + loop = asyncio.get_running_loop() + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, stop_event.set) + except (NotImplementedError, RuntimeError): + pass + + logger.info( + "Starting sync loop; interval=%ss dry_run=%s", + self.settings.poll_interval_seconds, + self.settings.dry_run, + ) + while not stop_event.is_set(): + try: + await self.run_once() + except Exception: + logger.exception("Sync cycle failed") + + try: + await asyncio.wait_for( + stop_event.wait(), timeout=self.settings.poll_interval_seconds + ) + except TimeoutError: + continue + + async def run_once(self) -> None: + self.store.initialize() + downloads = await self.crawler.download_new_activities( + self.store.is_terminal_source + ) + logger.info("Downloaded %d candidate activities", len(downloads)) + + for activity in downloads: + await asyncio.to_thread(self._process_activity, activity) + + def _process_activity(self, activity: DownloadedActivity) -> None: + try: + raw_hash = sha256_file(activity.raw_path) + self.store.record_downloaded( + source_ref=activity.source_ref, + title=activity.title, + source_url=activity.url, + raw_path=activity.raw_path, + raw_sha256=raw_hash, + ) + + if self.store.is_uploaded_hash(raw_hash): + self.store.mark_duplicate( + activity.source_ref, + "Raw file hash was already uploaded from another source.", + ) + return + + converted_path = self._converted_path(activity) + result = convert_fit_device(activity.raw_path, converted_path, self.device) + converted_hash = sha256_file(converted_path) + self.store.mark_converted( + activity.source_ref, + converted_path, + converted_hash, + result.patched_field_count, + ) + + if self.store.is_uploaded_hash(converted_hash): + self.store.mark_duplicate( + activity.source_ref, + "Converted file hash was already uploaded from another source.", + ) + return + + if self.settings.dry_run: + logger.info("Dry run enabled; not uploading %s", converted_path) + return + + upload = self.uploader.upload(converted_path) + if upload.duplicate: + self.store.mark_duplicate( + activity.source_ref, "Garmin reported a duplicate activity." + ) + else: + self.store.mark_uploaded( + activity.source_ref, + garmin_activity_id=upload.garmin_activity_id, + ) + except GarminUploadBlocked as exc: + logger.error("Upload blocked: %s", exc) + self.store.mark_failed(activity.source_ref, str(exc)) + except Exception as exc: + logger.exception("Failed processing %s", activity.raw_path) + self.store.mark_failed(activity.source_ref, str(exc)) + + def _converted_path(self, activity: DownloadedActivity) -> Path: + return self.settings.converted_dir / activity.raw_path.name + diff --git a/src/mywhoosh_garmin_sync/state.py b/src/mywhoosh_garmin_sync/state.py new file mode 100644 index 0000000..6dec7ae --- /dev/null +++ b/src/mywhoosh_garmin_sync/state.py @@ -0,0 +1,219 @@ +from __future__ import annotations + +import hashlib +import sqlite3 +from contextlib import contextmanager +from datetime import UTC, datetime +from pathlib import Path +from typing import Iterator + +TERMINAL_STATUSES = {"uploaded", "duplicate"} + + +def utc_now() -> str: + return datetime.now(UTC).isoformat(timespec="seconds") + + +def sha256_file(path: Path) -> str: + digest = hashlib.sha256() + with path.open("rb") as handle: + for chunk in iter(lambda: handle.read(1024 * 1024), b""): + digest.update(chunk) + return digest.hexdigest() + + +class ActivityStore: + def __init__(self, db_path: Path) -> None: + self.db_path = db_path + + def initialize(self) -> None: + self.db_path.parent.mkdir(parents=True, exist_ok=True) + with self._connect() as conn: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS activities ( + source_ref TEXT PRIMARY KEY, + title TEXT, + source_url TEXT, + raw_path TEXT, + converted_path TEXT, + raw_sha256 TEXT, + converted_sha256 TEXT, + status TEXT NOT NULL, + error_message TEXT, + attempts INTEGER NOT NULL DEFAULT 0, + patched_field_count INTEGER, + garmin_activity_id TEXT, + first_seen_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + uploaded_at TEXT + ) + """ + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_activities_raw_sha ON activities(raw_sha256)" + ) + conn.execute( + """ + CREATE INDEX IF NOT EXISTS idx_activities_converted_sha + ON activities(converted_sha256) + """ + ) + + def is_terminal_source(self, source_ref: str) -> bool: + row = self._fetch_one( + "SELECT status FROM activities WHERE source_ref = ?", (source_ref,) + ) + return bool(row and row["status"] in TERMINAL_STATUSES) + + def is_uploaded_hash(self, file_hash: str) -> bool: + row = self._fetch_one( + """ + SELECT source_ref FROM activities + WHERE status IN ('uploaded', 'duplicate') + AND (raw_sha256 = ? OR converted_sha256 = ?) + LIMIT 1 + """, + (file_hash, file_hash), + ) + return row is not None + + def record_downloaded( + self, + source_ref: str, + title: str, + source_url: str | None, + raw_path: Path, + raw_sha256: str, + ) -> None: + now = utc_now() + with self._connect() as conn: + conn.execute( + """ + INSERT INTO activities ( + source_ref, title, source_url, raw_path, raw_sha256, status, + first_seen_at, updated_at + ) + VALUES (?, ?, ?, ?, ?, 'downloaded', ?, ?) + ON CONFLICT(source_ref) DO UPDATE SET + title = excluded.title, + source_url = excluded.source_url, + raw_path = excluded.raw_path, + raw_sha256 = excluded.raw_sha256, + status = CASE + WHEN activities.status IN ('uploaded', 'duplicate') + THEN activities.status + ELSE 'downloaded' + END, + error_message = NULL, + updated_at = excluded.updated_at + """, + ( + source_ref, + title, + source_url, + str(raw_path), + raw_sha256, + now, + now, + ), + ) + + def mark_converted( + self, + source_ref: str, + converted_path: Path, + converted_sha256: str, + patched_field_count: int, + ) -> None: + self._execute( + """ + UPDATE activities + SET converted_path = ?, + converted_sha256 = ?, + patched_field_count = ?, + status = CASE + WHEN status IN ('uploaded', 'duplicate') THEN status + ELSE 'converted' + END, + error_message = NULL, + updated_at = ? + WHERE source_ref = ? + """, + ( + str(converted_path), + converted_sha256, + patched_field_count, + utc_now(), + source_ref, + ), + ) + + def mark_uploaded(self, source_ref: str, garmin_activity_id: str | None) -> None: + now = utc_now() + self._execute( + """ + UPDATE activities + SET status = 'uploaded', + garmin_activity_id = ?, + error_message = NULL, + uploaded_at = ?, + updated_at = ? + WHERE source_ref = ? + """, + (garmin_activity_id, now, now, source_ref), + ) + + def mark_duplicate(self, source_ref: str, message: str) -> None: + now = utc_now() + self._execute( + """ + UPDATE activities + SET status = 'duplicate', + error_message = ?, + uploaded_at = COALESCE(uploaded_at, ?), + updated_at = ? + WHERE source_ref = ? + """, + (message, now, now, source_ref), + ) + + def mark_failed(self, source_ref: str, message: str) -> None: + self._execute( + """ + UPDATE activities + SET status = 'failed', + error_message = ?, + attempts = attempts + 1, + updated_at = ? + WHERE source_ref = ? + """, + (message[:1000], utc_now(), source_ref), + ) + + def get_status(self, source_ref: str) -> str | None: + row = self._fetch_one( + "SELECT status FROM activities WHERE source_ref = ?", (source_ref,) + ) + return row["status"] if row else None + + @contextmanager + def _connect(self) -> Iterator[sqlite3.Connection]: + conn = sqlite3.connect(self.db_path) + conn.row_factory = sqlite3.Row + try: + yield conn + conn.commit() + finally: + conn.close() + + def _execute(self, sql: str, params: tuple[object, ...]) -> None: + with self._connect() as conn: + conn.execute(sql, params) + + def _fetch_one( + self, sql: str, params: tuple[object, ...] + ) -> sqlite3.Row | None: + with self._connect() as conn: + return conn.execute(sql, params).fetchone() + diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..26f4b42 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,45 @@ +from pathlib import Path + +import pytest + +from mywhoosh_garmin_sync.config import Settings + + +@pytest.fixture +def settings(tmp_path: Path) -> Settings: + return Settings( + mywhoosh_email="whoosh@example.com", + mywhoosh_password="whoosh-pass", + garmin_email="garmin@example.com", + garmin_password="garmin-pass", + poll_interval_seconds=3600, + data_dir=tmp_path, + raw_dir=tmp_path / "raw", + converted_dir=tmp_path / "converted", + browser_state_dir=tmp_path / "browser", + mywhoosh_auth_state_path=tmp_path / "mywhoosh_auth_state.json", + garmin_tokenstore=tmp_path / "garmin_tokens", + db_path=tmp_path / "state.sqlite3", + log_level="INFO", + dry_run=False, + dashboard_enabled=True, + dashboard_bind="127.0.0.1", + dashboard_port=8080, + mywhoosh_login_url="https://www.mywhoosh.com/login/", + mywhoosh_activity_url="https://www.mywhoosh.com/profile/", + mywhoosh_headless=True, + mywhoosh_timeout_seconds=1, + mywhoosh_max_downloads_per_run=10, + mywhoosh_download_text_hints=["fit", "download"], + mywhoosh_activities_button_text="ACTIVITIES", + mywhoosh_download_button_selector=".btnDownload", + mywhoosh_slow_mo_ms=0, + mywhoosh_manual_login_wait_seconds=0, + mywhoosh_debug_screenshots=False, + mywhoosh_debug_dir=tmp_path / "debug", + garmin_mfa_code=None, + target_garmin_manufacturer_id=1, + target_garmin_product_id=3578, + target_garmin_product_name="Edge 1030 Plus", + target_garmin_serial_number=None, + ) diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 0000000..c2566a9 --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,35 @@ +from pathlib import Path + +from mywhoosh_garmin_sync.config import Settings + + +def test_settings_from_env(monkeypatch, tmp_path): + monkeypatch.setenv("MYWHOOSH_EMAIL", "whoosh@example.com") + monkeypatch.setenv("MYWHOOSH_PASSWORD", "whoosh-pass") + monkeypatch.setenv("GARMIN_EMAIL", "garmin@example.com") + monkeypatch.setenv("GARMIN_PASSWORD", "garmin-pass") + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + monkeypatch.setenv("DRY_RUN", "true") + monkeypatch.setenv("POLL_INTERVAL_SECONDS", "123") + monkeypatch.setenv("TARGET_GARMIN_SERIAL_NUMBER", "123456") + monkeypatch.setenv("MYWHOOSH_MANUAL_LOGIN_WAIT_SECONDS", "900") + + settings = Settings.from_env() + + assert settings.data_dir == tmp_path + assert settings.raw_dir == tmp_path / "raw" + assert settings.dry_run is True + assert settings.poll_interval_seconds == 123 + assert settings.dashboard_enabled is True + assert settings.dashboard_bind == "0.0.0.0" + assert settings.dashboard_port == 8080 + assert settings.target_garmin_product_id == 3578 + assert settings.target_garmin_serial_number == 123456 + assert settings.mywhoosh_manual_login_wait_seconds == 900 + assert settings.mywhoosh_activities_button_text == "ACTIVITIES" + assert settings.mywhoosh_download_button_selector == ".btnDownload" + assert settings.mywhoosh_auth_state_path == tmp_path / "mywhoosh_auth_state.json" + + settings.ensure_directories() + assert Path(settings.raw_dir).is_dir() + assert Path(settings.mywhoosh_debug_dir).is_dir() diff --git a/tests/test_fit_device.py b/tests/test_fit_device.py new file mode 100644 index 0000000..453713c --- /dev/null +++ b/tests/test_fit_device.py @@ -0,0 +1,119 @@ +from pathlib import Path +import struct + +import pytest + +from mywhoosh_garmin_sync.fit_crc import fit_crc +from mywhoosh_garmin_sync.fit_device import ( + DeviceFieldValue, + FitFormatError, + GarminDevice, + convert_fit_device, + read_device_field_values, +) + + +def test_convert_fit_device_patches_metadata_and_crc(tmp_path: Path): + source = tmp_path / "source.fit" + output = tmp_path / "output.fit" + source.write_bytes(_minimal_activity_fit()) + + result = convert_fit_device( + source, + output, + GarminDevice(product_id=3578, product_name="Edge 1030 Plus", serial_number=42), + ) + + assert result.patched_field_count == 8 + values = read_device_field_values(output) + assert DeviceFieldValue(0, 1, 1) in values + assert DeviceFieldValue(0, 2, 3578) in values + assert DeviceFieldValue(0, 3, 42) in values + assert DeviceFieldValue(0, 8, "Edge 1030 Plus") in values + assert DeviceFieldValue(23, 2, 1) in values + assert DeviceFieldValue(23, 3, 42) in values + assert DeviceFieldValue(23, 4, 3578) in values + assert DeviceFieldValue(23, 27, "Edge 1030 Plus") in values + + data = output.read_bytes() + assert struct.unpack_from(" bytes: + data = bytearray() + + data.extend( + _definition( + local=0, + global_message=0, + fields=[ + (0, 1, 0x00), + (1, 2, 0x84), + (2, 2, 0x84), + (3, 4, 0x8C), + (8, 20, 0x07), + ], + ) + ) + data.extend(b"\x00") + data.extend(struct.pack(" bytes: + result = bytearray() + result.append(0x40 | local) + result.append(0) + result.append(0) + result.extend(struct.pack(" bytes: + encoded = value.encode("utf-8")[: size - 1] + b"\x00" + return encoded + b"\x00" * (size - len(encoded)) diff --git a/tests/test_garmin.py b/tests/test_garmin.py new file mode 100644 index 0000000..19b2294 --- /dev/null +++ b/tests/test_garmin.py @@ -0,0 +1,59 @@ +from dataclasses import replace + +import pytest + +from mywhoosh_garmin_sync.garmin import GarminUploadBlocked, GarminUploader + + +class FakeGarminClient: + def __init__(self, *args, **kwargs): + self.prompt_mfa = kwargs.get("prompt_mfa") + self.logged_in = False + + def login(self, tokenstore=None): + self.logged_in = True + + def upload_activity(self, activity_path): + return {"activityId": 12345, "path": activity_path} + + +class DuplicateGarminClient(FakeGarminClient): + def upload_activity(self, activity_path): + raise RuntimeError("409 duplicate activity already exists") + + +def test_garmin_uploader_success(settings, tmp_path): + fit = tmp_path / "activity.fit" + fit.write_bytes(b"fit") + uploader = GarminUploader(settings, client_factory=FakeGarminClient) + + result = uploader.upload(fit) + + assert result.status == "uploaded" + assert result.garmin_activity_id == "12345" + + +def test_garmin_uploader_duplicate(settings, tmp_path): + fit = tmp_path / "activity.fit" + fit.write_bytes(b"fit") + uploader = GarminUploader(settings, client_factory=DuplicateGarminClient) + + result = uploader.upload(fit) + + assert result.duplicate is True + assert result.status == "duplicate" + + +def test_mfa_prompt_blocks_without_code(settings): + uploader = GarminUploader(settings, client_factory=FakeGarminClient) + + with pytest.raises(GarminUploadBlocked): + uploader._prompt_mfa() + + +def test_mfa_prompt_returns_env_code(settings): + settings = replace(settings, garmin_mfa_code="123456") + uploader = GarminUploader(settings, client_factory=FakeGarminClient) + + assert uploader._prompt_mfa() == "123456" + diff --git a/tests/test_mywhoosh.py b/tests/test_mywhoosh.py new file mode 100644 index 0000000..d163081 --- /dev/null +++ b/tests/test_mywhoosh.py @@ -0,0 +1,58 @@ +from mywhoosh_garmin_sync.mywhoosh import ( + _looks_like_challenge_text, + _looks_like_challenge_url, + _looks_like_cookie_accept_label, + _source_ref, + _storage_restore_script, +) + + +def test_cookie_accept_label_matching(): + assert _looks_like_cookie_accept_label("Accept") + assert _looks_like_cookie_accept_label("Accept all cookies") + assert _looks_like_cookie_accept_label("Allow all") + assert _looks_like_cookie_accept_label("I agree") + assert _looks_like_cookie_accept_label("Got it") + + assert not _looks_like_cookie_accept_label("Reject all") + assert not _looks_like_cookie_accept_label("Download app") + + +def test_challenge_text_matching(): + assert _looks_like_challenge_text("I'm not a robot") + assert _looks_like_challenge_text("Verify you are human") + assert _looks_like_challenge_text("iframe title recaptcha") + assert _looks_like_challenge_text("Cloudflare security check") + + assert not _looks_like_challenge_text("Welcome to your activities") + + +def test_challenge_url_matching_does_not_flag_plain_login(): + assert _looks_like_challenge_url("https://example.test/cdn-cgi/challenge-platform") + assert _looks_like_challenge_url("https://example.test/verify") + + assert not _looks_like_challenge_url("https://event.mywhoosh.com/login/") + assert not _looks_like_challenge_url("https://event.mywhoosh.com/user/activities") + + +def test_source_ref_prefers_href_then_row_text(): + assert _source_ref("https://example.test/a.fit", "Download", 0, "Ride A") == _source_ref( + "https://example.test/a.fit", "Download", 99, "Ride B" + ) + assert _source_ref(None, "", 0, "Ride A") != _source_ref(None, "", 1, "Ride B") + + +def test_storage_restore_script_contains_session_storage(): + script = _storage_restore_script( + [ + { + "origin": "https://event.mywhoosh.com", + "localStorage": [{"name": "token", "value": "local"}], + "sessionStorage": [{"name": "session-token", "value": "session"}], + } + ] + ) + + assert "https://event.mywhoosh.com" in script + assert "localStorage" in script + assert "sessionStorage" in script diff --git a/tests/test_state.py b/tests/test_state.py new file mode 100644 index 0000000..f6eb740 --- /dev/null +++ b/tests/test_state.py @@ -0,0 +1,44 @@ +from pathlib import Path + +from mywhoosh_garmin_sync.state import ActivityStore + + +def test_state_transitions_and_hash_lookup(tmp_path: Path): + store = ActivityStore(tmp_path / "state.sqlite3") + store.initialize() + + store.record_downloaded( + source_ref="source-1", + title="Ride", + source_url="https://example.test/ride.fit", + raw_path=tmp_path / "raw.fit", + raw_sha256="raw-hash", + ) + assert store.get_status("source-1") == "downloaded" + assert store.is_terminal_source("source-1") is False + + store.mark_converted( + source_ref="source-1", + converted_path=tmp_path / "converted.fit", + converted_sha256="converted-hash", + patched_field_count=4, + ) + assert store.get_status("source-1") == "converted" + assert store.is_uploaded_hash("converted-hash") is False + + store.mark_uploaded("source-1", "123") + assert store.get_status("source-1") == "uploaded" + assert store.is_terminal_source("source-1") is True + assert store.is_uploaded_hash("raw-hash") is True + assert store.is_uploaded_hash("converted-hash") is True + + +def test_failed_activity_can_be_retried(tmp_path: Path): + store = ActivityStore(tmp_path / "state.sqlite3") + store.initialize() + store.record_downloaded("source-1", "Ride", None, tmp_path / "raw.fit", "hash") + store.mark_failed("source-1", "network failed") + + assert store.get_status("source-1") == "failed" + assert store.is_terminal_source("source-1") is False +