217 lines
7.0 KiB
Python
217 lines
7.0 KiB
Python
#!/usr/bin/env python3
|
|
"""Download assets to structured folder per AGENTS rules.
|
|
|
|
Usage:
|
|
python download.py --page-id j4lexk --title "My Page" --urls urls.txt --sources source_urls.txt
|
|
|
|
- Creates `downloads/<date>-<title>-<page-id>/media/` and stores downloads there (date format `YYYYMMDD`, generated by the script).
|
|
- Saves source URLs provided by user into `downloads/<date>-<title>-<page-id>/urls.txt` (input file given by `--sources`).
|
|
- Cleans filenames (strip query strings and `@!` size tokens; keep/guess extensions).
|
|
- Batch downloads, then deletes 0-byte files, compares against planned URLs, retries missing up to 2 times, and reports any remaining failures. Deletes the `--urls` input file when finished.
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import pathlib
|
|
import sys
|
|
import time
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
import mimetypes
|
|
import urllib.parse
|
|
import requests
|
|
|
|
DATE_FMT = "%Y%m%d"
|
|
|
|
# Default retry policy
|
|
MAX_RETRIES = 2
|
|
TIMEOUT = 30
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Batch download assets per AGENTS rules"
|
|
)
|
|
parser.add_argument("--page-id", required=True, help="Page ID, e.g. j4lexk")
|
|
parser.add_argument("--title", required=True, help="Page title for folder naming")
|
|
parser.add_argument(
|
|
"--urls", required=True, help="Path to text file with asset URLs (one per line)"
|
|
)
|
|
parser.add_argument(
|
|
"--sources",
|
|
required=True,
|
|
help="Path to text file containing source page URLs to record (will be saved as urls.txt)",
|
|
)
|
|
parser.add_argument(
|
|
"--retries", type=int, default=MAX_RETRIES, help="Retry count for missing files"
|
|
)
|
|
parser.add_argument(
|
|
"--timeout", type=int, default=TIMEOUT, help="Request timeout seconds"
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def load_lines(path: str) -> List[str]:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return [ln.strip() for ln in f if ln.strip()]
|
|
|
|
|
|
def ensure_dir(path: pathlib.Path) -> None:
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def sanitize_filename(url: str, content_type: Optional[str]) -> str:
|
|
"""Strip query/size tokens and ensure extension."""
|
|
parsed = urllib.parse.urlparse(url)
|
|
basename = os.path.basename(parsed.path)
|
|
# remove query-derived size tokens like @!user_image_800x1
|
|
if "@!" in basename:
|
|
basename = basename.split("@!", 1)[0]
|
|
# fallback name
|
|
if not basename:
|
|
basename = "file"
|
|
root, ext = os.path.splitext(basename)
|
|
if not ext:
|
|
ext = guess_extension(content_type)
|
|
return root + ext
|
|
|
|
|
|
def guess_extension(content_type: Optional[str]) -> str:
|
|
if content_type:
|
|
ext = mimetypes.guess_extension(content_type.split(";")[0].strip())
|
|
if ext:
|
|
return ext
|
|
return ".bin"
|
|
|
|
|
|
def unique_name(base: pathlib.Path) -> pathlib.Path:
|
|
if not base.exists():
|
|
return base
|
|
stem = base.stem
|
|
suffix = base.suffix
|
|
parent = base.parent
|
|
idx = 1
|
|
while True:
|
|
candidate = parent / f"{stem}_{idx}{suffix}"
|
|
if not candidate.exists():
|
|
return candidate
|
|
idx += 1
|
|
|
|
|
|
def download_one(
|
|
url: str, dest_dir: pathlib.Path, timeout: int
|
|
) -> Tuple[bool, Optional[str]]:
|
|
try:
|
|
resp = requests.get(url, timeout=timeout, stream=True)
|
|
if resp.status_code != 200:
|
|
return False, f"HTTP {resp.status_code}"
|
|
fname = sanitize_filename(url, resp.headers.get("Content-Type"))
|
|
target = unique_name(dest_dir / fname)
|
|
with open(target, "wb") as f:
|
|
for chunk in resp.iter_content(chunk_size=8192):
|
|
if chunk:
|
|
f.write(chunk)
|
|
return True, None
|
|
except Exception as exc: # pylint: disable=broad-except
|
|
return False, str(exc)
|
|
|
|
|
|
def delete_zero_byte_files(dest_dir: pathlib.Path) -> List[str]:
|
|
removed = []
|
|
for p in dest_dir.glob("*"):
|
|
if p.is_file() and p.stat().st_size == 0:
|
|
removed.append(p.name)
|
|
p.unlink()
|
|
return removed
|
|
|
|
|
|
def plan_downloads(urls: List[str]) -> Dict[str, str]:
|
|
"""Return mapping of url -> planned base filename (sanitized) for verification."""
|
|
plan = {}
|
|
for u in urls:
|
|
plan[u] = sanitize_filename(u, None)
|
|
return plan
|
|
|
|
|
|
def verify_missing(dest_dir: pathlib.Path, plan: Dict[str, str]) -> List[str]:
|
|
existing = {p.name for p in dest_dir.glob("*") if p.is_file()}
|
|
missing = []
|
|
for url, fname in plan.items():
|
|
# consider potential uniqueness suffix; check prefix match
|
|
if fname not in existing and not any(
|
|
name.startswith(fname.rsplit(".", 1)[0]) for name in existing
|
|
):
|
|
missing.append(url)
|
|
return missing
|
|
|
|
|
|
def save_source_urls(dest_root: pathlib.Path, sources_path: pathlib.Path) -> None:
|
|
ensure_dir(dest_root)
|
|
sources = load_lines(str(sources_path))
|
|
with open(dest_root / "urls.txt", "w", encoding="utf-8") as f:
|
|
for line in sources:
|
|
f.write(line + "\n")
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
today = time.strftime(DATE_FMT)
|
|
dest_root = pathlib.Path("downloads") / f"{today}-{args.title}-{args.page_id}"
|
|
media_dir = dest_root / "media"
|
|
ensure_dir(media_dir)
|
|
|
|
urls_path = pathlib.Path(args.urls)
|
|
urls = load_lines(args.urls)
|
|
plan = plan_downloads(urls)
|
|
|
|
# initial batch download
|
|
for url in urls:
|
|
ok, err = download_one(url, media_dir, args.timeout)
|
|
if not ok:
|
|
print(f"WARN: download failed for {url}: {err}", file=sys.stderr)
|
|
time.sleep(0.05) # mild pacing
|
|
|
|
removed = delete_zero_byte_files(media_dir)
|
|
if removed:
|
|
print(f"Removed zero-byte files: {removed}")
|
|
|
|
missing = verify_missing(media_dir, plan)
|
|
attempts = 0
|
|
while missing and attempts < args.retries:
|
|
attempts += 1
|
|
print(f"Retry round {attempts}: missing {len(missing)} files")
|
|
still_missing = []
|
|
for url in missing:
|
|
ok, err = download_one(url, media_dir, args.timeout)
|
|
if not ok:
|
|
print(f"WARN: retry failed for {url}: {err}", file=sys.stderr)
|
|
still_missing.append(url)
|
|
removed_retry = delete_zero_byte_files(media_dir)
|
|
if removed_retry:
|
|
print(f"Removed zero-byte files after retry: {removed_retry}")
|
|
missing = (
|
|
verify_missing(media_dir, {u: plan[u] for u in still_missing})
|
|
if still_missing
|
|
else []
|
|
)
|
|
|
|
if missing:
|
|
print("FAILED to fetch these URLs after retries:", file=sys.stderr)
|
|
for url in missing:
|
|
print(url, file=sys.stderr)
|
|
else:
|
|
print("All planned files downloaded.")
|
|
|
|
save_source_urls(dest_root, pathlib.Path(args.sources))
|
|
print(f"Saved source URLs to {dest_root / 'urls.txt'}")
|
|
|
|
try:
|
|
urls_path.unlink()
|
|
print(f"Removed input file: {urls_path}")
|
|
except Exception as exc: # pylint: disable=broad-except
|
|
print(f"WARN: failed to remove input file {urls_path}: {exc}", file=sys.stderr)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|