#!/usr/bin/env python3 """Download assets to structured folder per AGENTS rules. Usage: python download.py --page-id j4lexk --title "My Page" --urls urls.txt --sources source_urls.txt - Creates `downloads/--<page-id>/media/` and stores downloads there (date format `YYYYMMDD`, generated by the script). - Saves source URLs provided by user into `downloads/<date>-<title>-<page-id>/urls.txt` (input file given by `--sources`). - Cleans filenames (strip query strings and `@!` size tokens; keep/guess extensions). - Batch downloads, then deletes 0-byte files, compares against planned URLs, retries missing up to 2 times, and reports any remaining failures. Deletes the `--urls` input file when finished. """ import argparse import os import pathlib import sys import time from typing import Dict, List, Optional, Tuple import mimetypes import urllib.parse import requests DATE_FMT = "%Y%m%d" # Default retry policy MAX_RETRIES = 2 TIMEOUT = 30 def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Batch download assets per AGENTS rules" ) parser.add_argument("--page-id", required=True, help="Page ID, e.g. j4lexk") parser.add_argument("--title", required=True, help="Page title for folder naming") parser.add_argument( "--urls", required=True, help="Path to text file with asset URLs (one per line)" ) parser.add_argument( "--sources", required=True, help="Path to text file containing source page URLs to record (will be saved as urls.txt)", ) parser.add_argument( "--retries", type=int, default=MAX_RETRIES, help="Retry count for missing files" ) parser.add_argument( "--timeout", type=int, default=TIMEOUT, help="Request timeout seconds" ) return parser.parse_args() def load_lines(path: str) -> List[str]: with open(path, "r", encoding="utf-8") as f: return [ln.strip() for ln in f if ln.strip()] def ensure_dir(path: pathlib.Path) -> None: path.mkdir(parents=True, exist_ok=True) def sanitize_filename(url: str, content_type: Optional[str]) -> str: """Strip query/size tokens and ensure extension.""" parsed = urllib.parse.urlparse(url) basename = os.path.basename(parsed.path) # remove query-derived size tokens like @!user_image_800x1 if "@!" in basename: basename = basename.split("@!", 1)[0] # fallback name if not basename: basename = "file" root, ext = os.path.splitext(basename) if not ext: ext = guess_extension(content_type) return root + ext def guess_extension(content_type: Optional[str]) -> str: if content_type: ext = mimetypes.guess_extension(content_type.split(";")[0].strip()) if ext: return ext return ".bin" def unique_name(base: pathlib.Path) -> pathlib.Path: if not base.exists(): return base stem = base.stem suffix = base.suffix parent = base.parent idx = 1 while True: candidate = parent / f"{stem}_{idx}{suffix}" if not candidate.exists(): return candidate idx += 1 def download_one( url: str, dest_dir: pathlib.Path, timeout: int ) -> Tuple[bool, Optional[str]]: try: resp = requests.get(url, timeout=timeout, stream=True) if resp.status_code != 200: return False, f"HTTP {resp.status_code}" fname = sanitize_filename(url, resp.headers.get("Content-Type")) target = unique_name(dest_dir / fname) with open(target, "wb") as f: for chunk in resp.iter_content(chunk_size=8192): if chunk: f.write(chunk) return True, None except Exception as exc: # pylint: disable=broad-except return False, str(exc) def delete_zero_byte_files(dest_dir: pathlib.Path) -> List[str]: removed = [] for p in dest_dir.glob("*"): if p.is_file() and p.stat().st_size == 0: removed.append(p.name) p.unlink() return removed def plan_downloads(urls: List[str]) -> Dict[str, str]: """Return mapping of url -> planned base filename (sanitized) for verification.""" plan = {} for u in urls: plan[u] = sanitize_filename(u, None) return plan def verify_missing(dest_dir: pathlib.Path, plan: Dict[str, str]) -> List[str]: existing = {p.name for p in dest_dir.glob("*") if p.is_file()} missing = [] for url, fname in plan.items(): # consider potential uniqueness suffix; check prefix match if fname not in existing and not any( name.startswith(fname.rsplit(".", 1)[0]) for name in existing ): missing.append(url) return missing def save_source_urls(dest_root: pathlib.Path, sources_path: pathlib.Path) -> None: ensure_dir(dest_root) sources = load_lines(str(sources_path)) with open(dest_root / "urls.txt", "w", encoding="utf-8") as f: for line in sources: f.write(line + "\n") def main() -> None: args = parse_args() today = time.strftime(DATE_FMT) dest_root = pathlib.Path("downloads") / f"{today}-{args.title}-{args.page_id}" media_dir = dest_root / "media" ensure_dir(media_dir) urls_path = pathlib.Path(args.urls) urls = load_lines(args.urls) plan = plan_downloads(urls) # initial batch download for url in urls: ok, err = download_one(url, media_dir, args.timeout) if not ok: print(f"WARN: download failed for {url}: {err}", file=sys.stderr) time.sleep(0.05) # mild pacing removed = delete_zero_byte_files(media_dir) if removed: print(f"Removed zero-byte files: {removed}") missing = verify_missing(media_dir, plan) attempts = 0 while missing and attempts < args.retries: attempts += 1 print(f"Retry round {attempts}: missing {len(missing)} files") still_missing = [] for url in missing: ok, err = download_one(url, media_dir, args.timeout) if not ok: print(f"WARN: retry failed for {url}: {err}", file=sys.stderr) still_missing.append(url) removed_retry = delete_zero_byte_files(media_dir) if removed_retry: print(f"Removed zero-byte files after retry: {removed_retry}") missing = ( verify_missing(media_dir, {u: plan[u] for u in still_missing}) if still_missing else [] ) if missing: print("FAILED to fetch these URLs after retries:", file=sys.stderr) for url in missing: print(url, file=sys.stderr) else: print("All planned files downloaded.") save_source_urls(dest_root, pathlib.Path(args.sources)) print(f"Saved source URLs to {dest_root / 'urls.txt'}") try: urls_path.unlink() print(f"Removed input file: {urls_path}") except Exception as exc: # pylint: disable=broad-except print(f"WARN: failed to remove input file {urls_path}: {exc}", file=sys.stderr) if __name__ == "__main__": main()