12 Commits

Author SHA1 Message Date
47c58ecb69 docs: add registry k8s fallback design and plan
Some checks failed
docker-release / build-and-push (push) Failing after 9m51s
2026-03-23 17:44:18 +08:00
271c8fe538 test: stabilize npm upstream validators 2026-03-23 16:59:36 +08:00
442c27a26d feat: add registry k8s manifest fallback 2026-03-23 16:55:20 +08:00
9c85b9b097 feat: persist cache effective upstream path 2026-03-23 16:24:39 +08:00
7c95ee0210 test: cover registry k8s fallback path derivation 2026-03-23 16:22:28 +08:00
05d5037e97 chore: ignore local worktrees 2026-03-23 16:20:15 +08:00
bbc7204901 fix: limit docker library rewrite to Docker Hub
Some checks failed
docker-release / build-and-push (push) Failing after 12m34s
2026-03-23 15:42:45 +08:00
40c6f2fcce fix: align cache controls with config
Remove unused head-check config, make TTL overrides explicit, and tighten revalidation to avoid stale cache behavior.
2026-01-26 15:55:03 +08:00
9a57949147 Fix upstream timeout for large downloads
Some checks failed
docker-release / build-and-push (push) Failing after 25m53s
2025-12-12 17:36:37 +08:00
3685b2129a fix: update content type handling for release.gpg 2025-11-18 16:13:05 +08:00
fc2c46a9df Refactor module binding to rely on Type 2025-11-18 16:11:13 +08:00
347eb3adc5 feat: remove module/rollout config key 2025-11-18 15:37:21 +08:00
55 changed files with 1513 additions and 690 deletions

1
.gitignore vendored
View File

@@ -20,6 +20,7 @@ Thumbs.db
.vscode/
.idea/
.cache/
.worktrees/
storage/
logs/
tmp/

View File

@@ -38,7 +38,7 @@ Password = "s3cr3t"
1. 复制 `configs/config.example.toml` 为工作目录下的 `config.toml` 并调整 `[[Hub]]` 配置:
- 在全局段添加/修改 `ListenPort`,并从每个 Hub 中移除 `Port`
- 为 Hub 填写 `Type`并按需添加 `Module`(缺省为 `legacy`,自定义模块需在 `internal/hubmodule/<module-key>/` 注册)
- 为 Hub 填写 `Type`any-hub 会根据类型挑选对应模块 Hook如需扩展模块需在 `internal/hubmodule/<type>/` 注册并将新的类型纳入配置校验
- 根据 quickstart 示例设置 `Domain``Upstream``StoragePath` 等字段,并按需添加 `Username`/`Password`
2. 参考 [`specs/003-hub-auth-fields/quickstart.md`](specs/003-hub-auth-fields/quickstart.md) 完成配置校验、凭证验证与日志检查。
3. 常用命令:
@@ -48,20 +48,20 @@ Password = "s3cr3t"
## 模块化代理与示例
- `configs/config.example.toml` 展示了多个 Hub 的组合Docker Hub省略 `Module`,自动使用 `Type` 同名 Hook、Composer Hub显式指定 `Module = "composer"`)以及 legacy 兜底 Hub可直接复制修改
- 运行 `./scripts/demo-proxy.sh docker`(或 `npm`)即可加载示例配置并启动代理,日志中会附带 `module_key` 字段,便于确认命中的是 `legacy` 还是自定义模块。
- `configs/config.example.toml` 展示了多个 Hub 的组合Docker/NPM/Composer 等类型均自动绑定同名模块,仅需设置 `Type` 与上游信息即可启动
- 运行 `./scripts/demo-proxy.sh docker`(或 `npm`)即可加载示例配置并启动代理,日志中会附带 `module_key` 字段,便于确认命中的是 `docker``npm`模块。
- Hook 开发流程:
1. 复制 `internal/hubmodule/template/``internal/hubmodule/<module-key>/`,补全 `module.go``module_test.go`
2. 在模块 `init()` 中调用 `hubmodule.MustRegister` 注册 metadata并使用 `hooks.MustRegister` 注册 HookNormalizePath/ResolveUpstream/RewriteResponse 等)。
3. 为模块补充单元测试、`tests/integration/` 覆盖 miss→hit 流程,运行 `make modules-test`/`go test ./...`
4. 更新配置:`[[Hub]].Module` 留空,将根据 `Type` 自动选择 Hook也可显式设置 `Module = "<module-key>"` 并通过 `Rollout` 控制 legacy/dual/modular
4. 更新配置:为新的模块挑选一个唯一的 `Type`需要同步到配置校验列表Hub 只需填写该 `Type` 即可路由至新模块
5. 启动服务前,可通过 `curl -s /-/modules | jq '.hook_registry'` 确认 hook 注册情况;缺失时启动会直接失败,避免运行期回退到 legacy。
### 模块选择与 legacy
- `[[Hub]].Module` 为空时会自动回退到与 `Type` 同名的模块(若已注册),否则使用 `legacy` 兜底。
- diagnostics `/-/modules` 将展示 `hook_status`,当模块仍使用 legacy 时会标记 `legacy-only`,便于排查
- legacy 模块仅提供最小兜底能力,迁移完成后应显式将 `Module` 设置为对应仓库Hook。
- 示例操作手册、常见问题参见 [`specs/003-hub-auth-fields/quickstart.md`](specs/003-hub-auth-fields/quickstart.md) 以及本特性的 [`quickstart.md`](specs/004-modular-proxy-cache/quickstart.md)
### 模块选择
- Hub 的 `Type` 直接映射到同名模块;新增模块时需同步扩展 `internal/config/validation.go` 中的支持列表
- diagnostics `/-/modules` 仍会展示每个模块hook 注册状态与所有 Hub 绑定关系,便于排查配置错误
- legacy 模块仅作为历史兼容存在,不再通过配置字段触发
## CLI 标志

View File

@@ -19,7 +19,6 @@ Name = "docker"
Upstream = "https://registry-1.docker.io"
Proxy = ""
Type = "docker"
Module = "docker"
Username = ""
Password = ""
@@ -29,7 +28,6 @@ Name = "ghcr"
Upstream = "https://ghcr.io"
Proxy = ""
Type = "docker"
Module = "docker"
Username = ""
Password = ""
@@ -39,7 +37,6 @@ Name = "quay"
Upstream = "https://quay.io"
Proxy = ""
Type = "docker"
Module = "docker"
Username = ""
Password = ""
@@ -50,7 +47,6 @@ Name = "go"
Upstream = "https://proxy.golang.org"
Proxy = ""
Type = "go"
Module = "go"
Username = ""
Password = ""
@@ -61,7 +57,6 @@ Name = "npm"
Upstream = "https://registry.npmjs.org"
Proxy = ""
Type = "npm"
Module = "npm"
Username = ""
Password = ""
@@ -74,7 +69,6 @@ Proxy = ""
Username = ""
Password = ""
Type = "pypi"
Module = "pypi"
# Composer Repository
[[Hub]]
@@ -85,7 +79,6 @@ Proxy = ""
Username = ""
Password = ""
Type = "composer"
Module = "composer"
# Debian/Ubuntu APT 示例
[[Hub]]
@@ -93,8 +86,6 @@ Name = "apt-cache"
Domain = "apt.hub.local"
Upstream = "https://mirrors.edge.kernel.org/ubuntu"
Type = "debian"
Module = "debian"
Rollout = "modular"
# Alpine APK 示例
[[Hub]]
@@ -102,5 +93,3 @@ Name = "apk-cache"
Domain = "apk.hub.local"
Upstream = "https://dl-cdn.alpinelinux.org/alpine"
Type = "apk"
Module = "apk"
Rollout = "modular"

View File

@@ -17,21 +17,16 @@ Name = "docker-cache"
Domain = "docker.hub.local"
Upstream = "https://registry-1.docker.io"
Proxy = ""
Type = "docker" # 省略 Module 时自动选择与 Type 同名的 Hook此处为 docker
# Module = "docker" # 如需明确指定,可取消注释
Rollout = "modular"
Type = "docker"
Username = ""
Password = ""
CacheTTL = 43200
EnableHeadCheck = true
[[Hub]]
Name = "composer-cache"
Domain = "composer.hub.local"
Upstream = "https://repo.packagist.org"
Type = "composer"
Module = "composer" # 显式绑定 composer Hook启动时会验证 hook 是否已注册
Rollout = "dual" # 可选legacy-only/dual/modular
CacheTTL = 21600
[[Hub]]
@@ -39,8 +34,6 @@ Name = "legacy-fallback"
Domain = "legacy.hub.local"
Upstream = "https://registry.npmjs.org"
Type = "npm"
Module = "legacy" # 仍未迁移的 Hub 可显式指定 legacy诊断会标记为 legacy-only
Rollout = "legacy-only"
# Debian/Ubuntu APT 示例
[[Hub]]
@@ -48,8 +41,6 @@ Name = "apt-cache"
Domain = "apt.hub.local"
Upstream = "https://mirrors.edge.kernel.org/ubuntu"
Type = "debian"
Module = "debian"
Rollout = "modular"
# Alpine APK 示例
[[Hub]]
@@ -57,5 +48,3 @@ Name = "apk-cache"
Domain = "apk.hub.local"
Upstream = "https://dl-cdn.alpinelinux.org/alpine"
Type = "apk"
Module = "apk"
Rollout = "modular"

View File

@@ -19,4 +19,3 @@ Module = "legacy"
Username = ""
Password = ""
CacheTTL = 43200
EnableHeadCheck = true

View File

@@ -19,4 +19,3 @@ Module = "legacy"
Username = ""
Password = ""
CacheTTL = 43200
EnableHeadCheck = false

View File

@@ -2,10 +2,9 @@
## Common Fields
- `hub`/`domain`/`hub_type`:当前 Hub 标识与协议类型,例如 `debian`/`apk`
- `module_key`:命中的模块键( `legacy` 时表示新模块生效)。
- `module_key`:命中的模块键( `Type` 同名)。
- `cache_hit``true` 表示直接复用缓存;`false` 表示从上游获取或已刷新。
- `upstream`/`upstream_status`:实际访问的上游地址与状态码。
- `rollout_flag``legacy-only`/`dual`/`modular`,便于排查路由与灰度。
- `action``proxy`,表明代理链路日志。
## APT (debian 模块)
@@ -19,4 +18,4 @@
## Quick Checks
- 观察 `cache_hit``upstream_status``cache_hit=true``upstream_status=200/304` 表示缓存复用成功;`cache_hit=false` 表示回源或刷新。
-期望模块日志字段但出现 `module_key":"legacy"`,检查 `Module``Rollout` 配置是否指向新模块。
-`module_key` 与配置的 `Type` 不符,检查该类型的 hook 是否已注册,或是否误用了旧版二进制。

View File

@@ -1,58 +1,33 @@
# Modular Hub Migration Playbook
# Module Binding Notes
This playbook describes how to cut a hub over from the shared legacy adapter to a dedicated module using the new rollout flags, diagnostics endpoint, and structured logs delivered in feature `004-modular-proxy-cache`.
Legacy rollout flags (`Module`/`Rollout`) have been removed. Hubs now bind to modules solely through their `Type` values, which map 1:1 to registered modules (docker, npm, go, pypi, composer, debian, apk, ...).
## Prerequisites
## Migrating to a New Module
- Target module must be registered via `hubmodule.MustRegister` and expose a proxy handler through `proxy.RegisterModuleHandler`.
- `config.toml` must already map the hub to its target module through `[[Hub]].Module`.
- Operators must have access to the running binary port (default `:5000`) to query `/-/modules`.
1. **Register the module**
Implement the new module under `internal/hubmodule/<type>/`, call `hubmodule.MustRegister` in `init()`, and register hooks via `hooks.MustRegister`.
## Rollout Workflow
2. **Expose a handler**
New modules continue to reuse the shared proxy handler registered via `proxy.RegisterModule`. No per-module handler wiring is required unless the module supplies a bespoke handler.
1. **Snapshot current state**
Run `curl -s http://localhost:5000/-/modules | jq '.hubs[] | select(.hub_name=="<hub>")'` to capture the current `module_key` and `rollout_flag`. Legacy hubs report `module_key=legacy` and `rollout_flag=legacy-only`.
2. **Prepare config for dual traffic**
Edit the hub block to target the new module while keeping rollback safety:
```toml
[[Hub]]
Name = "npm-prod"
Domain = "npm.example.com"
Upstream = "https://registry.npmjs.org"
Module = "npm"
Rollout = "dual"
```
Dual mode keeps routing on the new module but keeps observability tagged as a partial rollout.
3. **Deploy and monitor**
Restart the service and tail logs filtered by `module_key`:
```sh
jq 'select(.module_key=="npm" and .rollout_flag=="dual")' /var/log/any-hub.json
```
Every request now carries `module_key`/`rollout_flag`, allowing dashboards or `grep`-based analyses without extra parsing.
3. **Update the config schema**
Add the new type to `internal/config/validation.go`s `supportedHubTypes`, then redeploy. Every hub that should use the new module only needs `Type = "<type>"` plus the usual `Domain`/`Upstream` fields.
4. **Verify diagnostics**
Query `/-/modules/npm` to inspect the registered metadata and confirm cache strategy, or `/-/modules` to ensure the hub binding reflects `rollout_flag=dual`.
`curl http://127.0.0.1:<port>/-/modules` to ensure the new type appears under `modules[]` and that the desired hubs show `module_key="<type>"`.
5. **Promote to modular**
Once metrics are healthy, change `Rollout = "modular"` in config and redeploy. Continue monitoring logs to make sure both `module_key` and `rollout_flag` show the fully promoted state.
5. **Monitor logs**
Structured logs still carry `module_key`, making it easy to confirm that traffic is flowing through the expected module. Example:
6. **Rollback procedure**
To rollback, set `Rollout = "legacy-only"` (without touching `Module`). The runtime forces traffic through the legacy module while keeping the desired module declaration for later reattempts. Confirm via diagnostics (`module_key` reverts to `legacy`) before announcing rollback complete.
```json
{"action":"proxy","hub":"npm","module_key":"npm","cache_hit":false,"upstream_status":200}
```
## Observability Checklist
- **Logs**: Every proxy log line now contains `hub`, `module_key`, `rollout_flag`, upstream status, and `request_id`. Capture at least five minutes of traffic per flag change.
- **Diagnostics**: Store JSON snapshots from `/-/modules` before and after each rollout stage for incident timelines.
- **Config History**: Keep the `config.toml` diff (especially `Rollout` changes) attached to change records for auditability.
6. **Rollback**
Since modules are now type-driven, rollback is as simple as reverting the `Type` value (or config deployment) back to the previous modules type.
## Troubleshooting
- **Error: `module_not_found` during diagnostics** → module key not registered; ensure the module packages `init()` calls `hubmodule.MustRegister`.
- **Requests still tagged with `legacy-only` after promotion** → double-check the running process uses the updated config path (`ANY_HUB_CONFIG` vs `--config`) and restart the service.
- **Diagnostics 404** → confirm you are hitting the correct port and that the CLI user/network path allows HTTP access; the endpoint ignores Host headers, so `curl http://127.0.0.1:<port>/-/modules` should succeed locally.
- **`module_not_found` in diagnostics** → ensure the module registered via `hubmodule.MustRegister` before the hub references its type.
- **Hooks missing** → `/-/modules` exposes `hook_registry`; confirm the new type reports `registered`.
- **Unexpected module key in logs** → confirm the running binary includes your module (imported in `internal/config/modules.go`) and that the config `/--config` path matches the deployed file.

View File

@@ -0,0 +1,414 @@
# registry.k8s.io Compatibility Fallback Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Add a `registry.k8s.io`-only manifest fallback so single-segment repos like `coredns` retry once as `coredns/coredns` after a final first-attempt `404`.
**Architecture:** Keep the first upstream request unchanged, then add one targeted retry in the proxy after the normal auth flow completes. Persist the effective upstream path for fallback-filled cache entries so later cache revalidation uses the path that actually produced the content instead of rechecking the original `404` path.
**Tech Stack:** Go 1.25, Fiber v3, internal docker hooks, internal proxy handler, filesystem cache store, Go tests
---
## File map
- Modify: `internal/hubmodule/docker/hooks.go`
- add `registry.k8s.io` host detection and manifest fallback path derivation helpers
- Modify: `internal/hubmodule/docker/hooks_test.go`
- add unit tests for host gating and manifest fallback derivation rules
- Modify: `internal/cache/store.go`
- extend cache entry/options to carry optional effective upstream path metadata
- Modify: `internal/cache/fs_store.go`
- persist and load cache metadata for effective upstream path
- Modify: `internal/cache/store_test.go`
- verify metadata round-trip behavior and remove behavior
- Modify: `internal/proxy/handler.go`
- Modify: `internal/proxy/handler_test.go`
- add one-shot `registry.k8s.io` fallback retry after auth retry flow
- write/read effective upstream path metadata for fallback-backed cache entries
- use effective upstream path during revalidation
- Modify: `tests/integration/cache_flow_test.go`
- add integration coverage for fallback success, no-fallback success, and cache/revalidation behavior
### Task 1: Add docker fallback derivation helpers
**Files:**
- Modify: `internal/hubmodule/docker/hooks.go`
- Test: `internal/hubmodule/docker/hooks_test.go`
- [ ] **Step 1: Write the failing unit tests**
```go
func TestIsRegistryK8sHost(t *testing.T) {
if !isRegistryK8sHost("registry.k8s.io") {
t.Fatalf("expected registry.k8s.io to match")
}
if isRegistryK8sHost("example.com") {
t.Fatalf("expected non-registry.k8s.io host to be ignored")
}
}
func TestRegistryK8sManifestFallbackPath(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
path, ok := manifestFallbackPath(ctx, "/v2/coredns/manifests/v1.13.1")
if !ok || path != "/v2/coredns/coredns/manifests/v1.13.1" {
t.Fatalf("expected fallback path, got %q ok=%v", path, ok)
}
}
func TestRegistryK8sManifestFallbackPathRejectsMultiSegmentRepo(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/coredns/manifests/v1.13.1"); ok {
t.Fatalf("expected multi-segment repo to be ignored")
}
}
func TestRegistryK8sManifestFallbackPathRejectsNonManifest(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/blobs/sha256:deadbeef"); ok {
t.Fatalf("expected non-manifest path to be ignored")
}
}
func TestRegistryK8sManifestFallbackPathRejectsNonRegistryHost(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "mirror.gcr.io"}
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/manifests/v1.13.1"); ok {
t.Fatalf("expected non-registry.k8s.io host to be ignored")
}
}
```
- [ ] **Step 2: Run test to verify it fails**
Run: `go test ./internal/hubmodule/docker -run 'TestIsRegistryK8sHost|TestRegistryK8sManifestFallbackPath' -count=1`
Expected: FAIL because `isRegistryK8sHost` and `manifestFallbackPath` do not exist yet.
- [ ] **Step 3: Write minimal implementation**
```go
func isRegistryK8sHost(host string) bool {
if parsedHost, _, err := net.SplitHostPort(host); err == nil {
host = parsedHost
}
return strings.EqualFold(host, "registry.k8s.io")
}
func manifestFallbackPath(ctx *hooks.RequestContext, clean string) (string, bool) {
if ctx == nil || !isRegistryK8sHost(ctx.UpstreamHost) {
return "", false
}
repo, rest, ok := splitDockerRepoPath(clean)
if !ok || strings.Count(repo, "/") != 0 || !strings.HasPrefix(rest, "/manifests/") {
return "", false
}
return "/v2/" + repo + "/" + repo + rest, true
}
```
- [ ] **Step 4: Run test to verify it passes**
Run: `go test ./internal/hubmodule/docker -count=1`
Expected: PASS
- [ ] **Step 5: Commit**
```bash
git add internal/hubmodule/docker/hooks.go internal/hubmodule/docker/hooks_test.go
git commit -m "test: cover registry k8s fallback path derivation"
```
### Task 2: Persist effective upstream path in cache metadata
**Files:**
- Modify: `internal/cache/store.go`
- Modify: `internal/cache/fs_store.go`
- Test: `internal/cache/store_test.go`
- [ ] **Step 1: Write the failing cache metadata test**
```go
func TestStorePersistsEffectiveUpstreamPath(t *testing.T) {
store, err := NewStore(t.TempDir())
if err != nil {
t.Fatalf("NewStore: %v", err)
}
loc := Locator{HubName: "docker", Path: "/v2/coredns/manifests/v1.13.1"}
_, err = store.Put(context.Background(), loc, strings.NewReader("body"), PutOptions{
EffectiveUpstreamPath: "/v2/coredns/coredns/manifests/v1.13.1",
})
if err != nil {
t.Fatalf("Put: %v", err)
}
got, err := store.Get(context.Background(), loc)
if err != nil {
t.Fatalf("Get: %v", err)
}
if got.Entry.EffectiveUpstreamPath != "/v2/coredns/coredns/manifests/v1.13.1" {
t.Fatalf("unexpected effective path: %q", got.Entry.EffectiveUpstreamPath)
}
}
```
- [ ] **Step 2: Run test to verify it fails**
Run: `go test ./internal/cache -run TestStorePersistsEffectiveUpstreamPath -count=1`
Expected: FAIL because cache entry metadata does not yet include effective upstream path.
- [ ] **Step 3: Write minimal implementation**
```go
type PutOptions struct {
ModTime time.Time
EffectiveUpstreamPath string
}
type Entry struct {
Locator Locator `json:"locator"`
FilePath string `json:"file_path"`
SizeBytes int64 `json:"size_bytes"`
ModTime time.Time `json:"mod_time"`
EffectiveUpstreamPath string `json:"effective_upstream_path,omitempty"`
}
```
Implementation notes:
- store metadata next to the cached body as a small JSON file such as `<entry>.meta`
- on `Get`, load metadata if present and populate `Entry.EffectiveUpstreamPath`
- on `Put`, write metadata atomically only when `EffectiveUpstreamPath` is non-empty
- on `Remove`, delete both body and metadata files
- [ ] **Step 4: Run test to verify it passes**
Run: `go test ./internal/cache -count=1`
Expected: PASS
- [ ] **Step 5: Commit**
```bash
git add internal/cache/store.go internal/cache/fs_store.go internal/cache/store_test.go
git commit -m "feat: persist cache effective upstream path"
```
### Task 3: Add one-shot fallback retry in proxy fetch flow
**Files:**
- Modify: `internal/proxy/handler.go`
- Modify: `internal/hubmodule/docker/hooks.go`
- Test: `tests/integration/cache_flow_test.go`
- [ ] **Step 1: Write the failing integration test for fallback success**
```go
func TestRegistryK8sManifestFallbackRetry(t *testing.T) {
// stub returns 404 for /v2/coredns/manifests/v1.13.1
// stub returns 200 for /v2/coredns/coredns/manifests/v1.13.1
// request /v2/coredns/manifests/v1.13.1 through proxy
// expect 200 and exactly two upstream hits
}
func TestRegistryK8sManifestFallbackNotAttemptedWhenOriginalSucceeds(t *testing.T) {
// stub returns 200 for original path
// request original path
// expect 200 and only one upstream hit
}
func TestRegistryK8sManifestFallbackNotAttemptedForNonRegistryHost(t *testing.T) {
// non-registry.k8s.io upstream returns 404 for original path
// request original path
// expect final 404 and no derived path hit
}
func TestRegistryK8sManifestFallbackSecondRequestHitsCache(t *testing.T) {
// first GET falls back and succeeds
// second identical GET should be cache hit
// expect no new hit to original 404 path on second request
}
```
- [ ] **Step 2: Run test to verify it fails**
Run: `go test ./tests/integration -run 'TestRegistryK8sManifestFallbackRetry|TestRegistryK8sManifestFallbackNotAttemptedWhenOriginalSucceeds|TestRegistryK8sManifestFallbackNotAttemptedForNonRegistryHost|TestRegistryK8sManifestFallbackSecondRequestHitsCache' -count=1`
Expected: FAIL because the fallback tests were written before the retry logic exists.
- [ ] **Step 3: Write minimal implementation**
```go
resp, upstreamURL, err := h.executeRequest(c, route, hook)
resp, upstreamURL, err = h.retryOnAuthFailure(c, route, requestID, started, resp, upstreamURL, hook)
if resp.StatusCode == http.StatusNotFound {
if fallbackHook, fallbackURL, ok := h.registryK8sFallbackAttempt(c, route, hook); ok {
resp.Body.Close()
resp, upstreamURL, err = h.executeRequest(c, route, fallbackHook)
if err == nil {
resp, upstreamURL, err = h.retryOnAuthFailure(c, route, requestID, started, resp, upstreamURL, fallbackHook)
}
fallbackEffectivePath = fallbackURL.Path
}
}
```
Implementation notes:
- in `internal/hubmodule/docker/hooks.go`, keep only pure path helpers such as `isRegistryK8sHost` and `manifestFallbackPath`
- in `internal/proxy/handler.go`, add the retry orchestration helper that clones the current hook state with the derived fallback path when `manifestFallbackPath` returns true
- only evaluate fallback after `retryOnAuthFailure` completes and only for final `404`
- close the first response body before the retry
- if fallback succeeds with `200`, pass `EffectiveUpstreamPath` into cache `PutOptions`
- if fallback returns non-`200`, pass that second upstream response through unchanged
- emit a structured fallback log event from `internal/proxy/handler.go` with hub name, domain, upstream host, original path, fallback path, original status, and request method
- [ ] **Step 4: Run test to verify it passes**
Run: `go test ./tests/integration -run 'TestRegistryK8sManifestFallbackRetry|TestRegistryK8sManifestFallbackNotAttemptedWhenOriginalSucceeds|TestRegistryK8sManifestFallbackNotAttemptedForNonRegistryHost|TestRegistryK8sManifestFallbackSecondRequestHitsCache' -count=1`
Expected: PASS
- [ ] **Step 5: Commit**
```bash
git add internal/proxy/handler.go tests/integration/cache_flow_test.go internal/hubmodule/docker/hooks.go
git commit -m "feat: retry registry k8s manifest fallback"
```
### Task 4: Revalidate cached fallback entries against the effective upstream path
**Files:**
- Modify: `internal/proxy/handler.go`
- Modify: `tests/integration/cache_flow_test.go`
- [ ] **Step 1: Write the failing revalidation regression test**
```go
func TestRegistryK8sFallbackCacheRevalidatesEffectivePath(t *testing.T) {
// first GET: original path 404, fallback path 200, response cached
// second GET: cache hit path should revalidate against fallback path only
// assert original 404 path is not re-requested during revalidation
}
```
- [ ] **Step 2: Run test to verify it fails**
Run: `go test ./tests/integration -run TestRegistryK8sFallbackCacheRevalidatesEffectivePath -count=1`
Expected: FAIL because revalidation still targets the original client-facing path.
- [ ] **Step 3: Write minimal implementation**
```go
func effectiveRevalidateURL(route *server.HubRoute, c fiber.Ctx, entry cache.Entry, hook *hookState) *url.URL {
if entry.EffectiveUpstreamPath == "" {
return resolveUpstreamURL(route, route.UpstreamURL, c, hook)
}
clone := *route.UpstreamURL
clone.Path = entry.EffectiveUpstreamPath
clone.RawPath = entry.EffectiveUpstreamPath
return &clone
}
```
Implementation notes:
- use `Entry.EffectiveUpstreamPath` in both `isCacheFresh` and cached `HEAD` handling inside `serveCache`
- do not re-derive fallback during revalidation when metadata already specifies the effective path
- keep client-visible cache key unchanged
- [ ] **Step 4: Run test to verify it passes**
Run: `go test ./tests/integration -run TestRegistryK8sFallbackCacheRevalidatesEffectivePath -count=1`
Expected: PASS
- [ ] **Step 5: Commit**
```bash
git add internal/proxy/handler.go tests/integration/cache_flow_test.go
git commit -m "fix: revalidate registry k8s fallback cache entries"
```
### Task 5: Emit and verify fallback structured logging
**Files:**
- Modify: `internal/proxy/handler.go`
- Test: `internal/proxy/handler_test.go`
- [ ] **Step 1: Write the failing logging test**
```go
func TestRegistryK8sManifestFallbackLogsStructuredEvent(t *testing.T) {
// configure handler with a bytes.Buffer-backed logrus logger
// trigger fallback success for /v2/coredns/manifests/v1.13.1
// assert log output contains event name plus fields:
// hub, domain, upstream host, original path, fallback path, original status, method
}
```
- [ ] **Step 2: Run test to verify it fails**
Run: `go test ./internal/proxy -run TestRegistryK8sManifestFallbackLogsStructuredEvent -count=1`
Expected: FAIL because the fallback log event does not exist yet.
- [ ] **Step 3: Write minimal implementation**
```go
func (h *Handler) logRegistryK8sFallback(route *server.HubRoute, requestID string, originalPath string, fallbackPath string, originalStatus int, method string) {
fields := logging.RequestFields(
route.Config.Name,
route.Config.Domain,
route.Config.Type,
route.Config.AuthMode(),
route.Module.Key,
false,
)
fields["action"] = "proxy_fallback"
fields["upstream_host"] = route.UpstreamURL.Host
fields["original_path"] = originalPath
fields["fallback_path"] = fallbackPath
fields["original_status"] = originalStatus
fields["method"] = method
h.logger.WithFields(fields).Info("proxy_registry_k8s_fallback")
}
```
- [ ] **Step 4: Run test to verify it passes**
Run: `go test ./internal/proxy -run TestRegistryK8sManifestFallbackLogsStructuredEvent -count=1`
Expected: PASS
- [ ] **Step 5: Commit**
```bash
git add internal/proxy/handler.go internal/proxy/handler_test.go
git commit -m "test: cover registry k8s fallback logging"
```
### Task 6: Final verification
**Files:**
- Verify: `internal/hubmodule/docker/hooks.go`
- Verify: `internal/cache/store.go`
- Verify: `internal/cache/fs_store.go`
- Verify: `internal/proxy/handler.go`
- Verify: `tests/integration/cache_flow_test.go`
- [ ] **Step 1: Run focused package tests**
Run: `go test ./internal/hubmodule/docker ./internal/cache ./tests/integration -count=1`
Expected: PASS
- [ ] **Step 2: Run full test suite**
Run: `go test ./... -count=1`
Expected: PASS
- [ ] **Step 3: Inspect git diff**
Run: `git diff --stat`
Expected: only the planned files changed for this feature.
- [ ] **Step 4: Verify fallback logging is present**
Run: `go test ./internal/proxy -run TestRegistryK8sManifestFallbackLogsStructuredEvent -count=1`
Expected: PASS, and the test explicitly checks the structured fallback log event fields.
- [ ] **Step 5: Commit final polish if needed**
```bash
git add internal/hubmodule/docker/hooks.go internal/hubmodule/docker/hooks_test.go internal/cache/store.go internal/cache/fs_store.go internal/cache/store_test.go internal/proxy/handler.go internal/proxy/handler_test.go tests/integration/cache_flow_test.go
git commit -m "feat: add registry k8s manifest fallback compatibility"
```

View File

@@ -0,0 +1,181 @@
# registry.k8s.io coredns fallback design
## Goal
Add a narrow compatibility fallback for `registry.k8s.io` so requests like
`k8s.hub.ipao.vip/coredns:v1.13.1` can retry as
`k8s.hub.ipao.vip/coredns/coredns:v1.13.1` when the original manifest lookup is
not found upstream.
## Scope
This compatibility applies only when all of the following are true:
- hub module is `docker`
- upstream host is exactly `registry.k8s.io`
- requested repository name is a single segment such as `coredns`
- request targets `/manifests/<ref>`
- the first upstream request clearly fails as not found
Out of scope:
- changing behavior for Docker Hub
- changing behavior for other registries
- eagerly rewriting every single-segment repo under `registry.k8s.io`
- retrying more than one alternate repository form
- applying the initial compatibility behavior to blobs, tags, or referrers
## Recommended approach
Keep the current request path unchanged on the first attempt. After the normal
auth flow completes for that path, if the final first-attempt response from
`registry.k8s.io` is `404` for a single-segment manifest repository, perform
one internal retry against the derived path `/v2/<repo>/<repo>/...` and return
the retry result when it succeeds.
This keeps the existing behavior stable for paths that already work and limits
the compatibility behavior to the known `coredns -> coredns/coredns` style case.
## Alternatives considered
### 1. Always rewrite single-segment repos to `<repo>/<repo>`
Rejected because it would change successful existing paths and could break valid
single-segment repositories on `registry.k8s.io`.
### 2. Probe both paths before deciding
Rejected because it doubles upstream traffic and adds more complexity than the
known compatibility problem needs.
## Request flow
1. Receive docker request and run the existing normalization logic.
2. Send the first upstream request using the original normalized path.
3. Complete the existing auth retry behavior for that path first.
4. If the final response is successful, continue as today.
5. If the final response is `404`, check whether the request is eligible for
the `registry.k8s.io` fallback.
6. Close the first response body before retrying.
7. If eligible, derive a second path by transforming `/v2/<repo>/...` into
`/v2/<repo>/<repo>/...`.
8. Retry exactly once against the derived path, reusing the same method and the
same auth behavior used by the normal upstream flow.
9. If the retry succeeds, use that response and mark the cache entry with the
effective upstream path that produced the content.
10. If the retry also returns an upstream response, pass that second response
through unchanged.
11. If the retry fails with a transport error, return the existing proxy error
response path and do not continue retrying.
## Eligibility rules
The fallback should only trigger when all checks pass:
- upstream host matches `registry.k8s.io`
- request path parses as `/v2/<repo>/manifests/<ref>`
- repository name contains exactly one segment before `manifests`
- repository is not already `<repo>/<repo>`
- final response after normal auth handling is HTTP `404`
For the initial implementation, the trigger is status-based and limited to HTTP
`404`.
## Component changes
### `internal/hubmodule/docker/hooks.go`
- add helper logic to identify `registry.k8s.io`
- add helper logic to derive the duplicate-segment manifest path for eligible
requests
### `internal/proxy/handler.go`
- add a targeted retry path after the first upstream response is received and
before the response is finalized
- keep the retry count fixed at one alternate attempt
- complete the normal auth retry flow before evaluating fallback eligibility
- persist the effective upstream path when fallback succeeds so future cache
revalidation can target the path that actually produced the cached content
The retry should reuse the same auth, request method, response handling, and
streaming behavior as the normal upstream path.
### Cache metadata
- extend cached metadata for fallback-derived entries with an optional effective
upstream path field
- when a cached entry was populated through fallback, revalidate it against that
effective upstream path rather than the original client-facing path
- do not apply the fallback derivation again during revalidation if an effective
upstream path is already recorded
## Caching behavior
- first successful response remains cacheable under the locator path that served
the client request
- if fallback succeeds, cache the successful fallback response under the client
request path so the next identical client request can hit cache directly
- also persist the effective upstream path used to fill that cache entry
- do not cache the failed first attempt separately
This preserves the client-visible repository path while allowing the proxy to
internally source content from the alternate upstream location.
## Error handling
- if fallback path derivation is not possible, return the original response
- if the fallback request errors at the transport layer, log the retry attempt
and return the existing proxy error response path
- never loop between variants
## Logging
Add one targeted structured log event for visibility when the fallback is used,
including:
- hub name
- domain
- upstream host
- original path
- fallback path
- original status
- request method
This makes it easy to confirm that `registry.k8s.io` compatibility is active
without changing normal logging volume for unrelated registries.
## Tests
### Unit tests
Add docker hook tests for:
- recognizing `registry.k8s.io`
- deriving `/v2/coredns/manifests/v1.13.1` to
`/v2/coredns/coredns/manifests/v1.13.1`
- refusing fallback for already multi-segment repositories
- refusing fallback for non-`registry.k8s.io` hosts
- refusing fallback for non-manifest paths
### Integration tests
Add proxy tests for:
- original path returns `404`, fallback path returns `200` -> final client
response is `200`
- original path returns `200` -> fallback is not attempted
- non-`registry.k8s.io` upstream returns `404` -> fallback is not attempted
- fallback succeeds, second identical request is served from cache without
re-fetching the original `404` path
- fallback-derived cached entry revalidates against the recorded effective
upstream path instead of the original client-facing path
## Success criteria
- `k8s.hub.ipao.vip/coredns:v1.13.1` can succeed through fallback when the
upstream only exposes `coredns/coredns`
- existing Docker Hub behavior remains unchanged
- existing `registry.k8s.io` paths that already work continue to use the
original path first
- fallback is isolated to `registry.k8s.io`

View File

@@ -2,6 +2,7 @@ package cache
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
@@ -49,6 +50,10 @@ type entryLock struct {
refs int
}
type entryMetadata struct {
EffectiveUpstreamPath string `json:"effective_upstream_path,omitempty"`
}
func (s *fileStore) Get(ctx context.Context, locator Locator) (*ReadResult, error) {
select {
case <-ctx.Done():
@@ -86,6 +91,12 @@ func (s *fileStore) Get(ctx context.Context, locator Locator) (*ReadResult, erro
SizeBytes: info.Size(),
ModTime: info.ModTime(),
}
if metadata, err := s.readMetadata(filePath); err == nil {
entry.EffectiveUpstreamPath = metadata.EffectiveUpstreamPath
} else if !errors.Is(err, fs.ErrNotExist) {
file.Close()
return nil, err
}
return &ReadResult{Entry: entry, Reader: file}, nil
}
@@ -133,12 +144,16 @@ func (s *fileStore) Put(ctx context.Context, locator Locator, body io.Reader, op
if err := os.Chtimes(filePath, modTime, modTime); err != nil {
return nil, err
}
if err := s.writeMetadata(filePath, opts.EffectiveUpstreamPath); err != nil {
return nil, err
}
entry := Entry{
Locator: locator,
FilePath: filePath,
SizeBytes: written,
ModTime: modTime,
Locator: locator,
FilePath: filePath,
SizeBytes: written,
ModTime: modTime,
EffectiveUpstreamPath: opts.EffectiveUpstreamPath,
}
return &entry, nil
}
@@ -157,6 +172,54 @@ func (s *fileStore) Remove(ctx context.Context, locator Locator) error {
if err := os.Remove(filePath); err != nil && !errors.Is(err, fs.ErrNotExist) {
return err
}
if err := os.Remove(metadataPath(filePath)); err != nil && !errors.Is(err, fs.ErrNotExist) {
return err
}
return nil
}
func (s *fileStore) readMetadata(filePath string) (entryMetadata, error) {
raw, err := os.ReadFile(metadataPath(filePath))
if err != nil {
return entryMetadata{}, err
}
var metadata entryMetadata
if err := json.Unmarshal(raw, &metadata); err != nil {
return entryMetadata{}, err
}
return metadata, nil
}
func (s *fileStore) writeMetadata(filePath string, effectiveUpstreamPath string) error {
metaFilePath := metadataPath(filePath)
if effectiveUpstreamPath == "" {
if err := os.Remove(metaFilePath); err != nil && !errors.Is(err, fs.ErrNotExist) {
return err
}
return nil
}
data, err := json.Marshal(entryMetadata{EffectiveUpstreamPath: effectiveUpstreamPath})
if err != nil {
return err
}
tempFile, err := os.CreateTemp(filepath.Dir(metaFilePath), ".cache-meta-*")
if err != nil {
return err
}
tempName := tempFile.Name()
if _, err := tempFile.Write(data); err != nil {
tempFile.Close()
_ = os.Remove(tempName)
return err
}
if err := tempFile.Close(); err != nil {
_ = os.Remove(tempName)
return err
}
if err := os.Rename(tempName, metaFilePath); err != nil {
_ = os.Remove(tempName)
return err
}
return nil
}
@@ -248,3 +311,7 @@ func copyWithContext(ctx context.Context, dst io.Writer, src io.Reader) (int64,
func locatorKey(locator Locator) string {
return locator.HubName + "::" + locator.Path
}
func metadataPath(filePath string) string {
return filePath + ".meta"
}

View File

@@ -26,7 +26,8 @@ type Store interface {
// PutOptions 控制写入过程中的可选属性。
type PutOptions struct {
ModTime time.Time
ModTime time.Time
EffectiveUpstreamPath string
}
// Locator 唯一定位一个缓存条目Hub + 相对路径),所有路径均为 URL 路径风格。
@@ -37,10 +38,11 @@ type Locator struct {
// Entry 表示一次缓存命中结果,包含绝对文件路径及文件信息。
type Entry struct {
Locator Locator `json:"locator"`
FilePath string `json:"file_path"`
SizeBytes int64 `json:"size_bytes"`
ModTime time.Time
Locator Locator `json:"locator"`
FilePath string `json:"file_path"`
SizeBytes int64 `json:"size_bytes"`
ModTime time.Time `json:"mod_time"`
EffectiveUpstreamPath string `json:"effective_upstream_path,omitempty"`
}
// ReadResult 组合 Entry 与正文 Reader便于代理层直接将 Body 流式返回。

View File

@@ -5,6 +5,7 @@ import (
"context"
"io"
"os"
"strings"
"testing"
"time"
)
@@ -62,6 +63,28 @@ func TestStoreRemove(t *testing.T) {
}
}
func TestStorePersistsEffectiveUpstreamPath(t *testing.T) {
store := newTestStore(t)
locator := Locator{HubName: "docker", Path: "/v2/coredns/manifests/v1.13.1"}
_, err := store.Put(context.Background(), locator, strings.NewReader("body"), PutOptions{
EffectiveUpstreamPath: "/v2/coredns/coredns/manifests/v1.13.1",
})
if err != nil {
t.Fatalf("put error: %v", err)
}
result, err := store.Get(context.Background(), locator)
if err != nil {
t.Fatalf("get error: %v", err)
}
defer result.Reader.Close()
if result.Entry.EffectiveUpstreamPath != "/v2/coredns/coredns/manifests/v1.13.1" {
t.Fatalf("unexpected effective upstream path: %q", result.Entry.EffectiveUpstreamPath)
}
}
func TestStoreIgnoresDirectories(t *testing.T) {
store := newTestStore(t)
locator := Locator{HubName: "ghcr", Path: "/v2"}

View File

@@ -5,7 +5,6 @@ import (
"path/filepath"
"reflect"
"strconv"
"strings"
"time"
"github.com/mitchellh/mapstructure"
@@ -89,25 +88,12 @@ func applyHubDefaults(h *HubConfig) {
if h.CacheTTL.DurationValue() < 0 {
h.CacheTTL = Duration(0)
}
if trimmed := strings.TrimSpace(h.Module); trimmed == "" {
typeKey := strings.ToLower(strings.TrimSpace(h.Type))
if meta, ok := hubmodule.Resolve(typeKey); ok {
h.Module = meta.Key
} else {
h.Module = hubmodule.DefaultModuleKey()
}
} else {
h.Module = strings.ToLower(trimmed)
}
if rollout := strings.TrimSpace(h.Rollout); rollout != "" {
h.Rollout = strings.ToLower(rollout)
}
if h.ValidationMode == "" {
h.ValidationMode = string(hubmodule.ValidationModeETag)
}
}
// NormalizeHubConfig 公开给无需依赖 loader 的调用方(例如测试)以填充模块/rollout 默认值。
// NormalizeHubConfig 公开给无需依赖 loader 的调用方(例如测试)以应用 TTL/校验默认值。
func NormalizeHubConfig(h HubConfig) HubConfig {
applyHubDefaults(&h)
return h

View File

@@ -4,7 +4,6 @@ import (
"time"
"github.com/any-hub/any-hub/internal/hubmodule"
"github.com/any-hub/any-hub/internal/hubmodule/legacy"
)
// HubRuntime 将 Hub 配置与模块元数据合并,方便运行时快速取用策略。
@@ -12,16 +11,14 @@ type HubRuntime struct {
Config HubConfig
Module hubmodule.ModuleMetadata
CacheStrategy hubmodule.CacheStrategyProfile
Rollout legacy.RolloutFlag
}
// BuildHubRuntime 根据 Hub 配置和模块元数据创建运行时描述,应用最终 TTL 覆盖。
func BuildHubRuntime(cfg HubConfig, meta hubmodule.ModuleMetadata, ttl time.Duration, flag legacy.RolloutFlag) HubRuntime {
func BuildHubRuntime(cfg HubConfig, meta hubmodule.ModuleMetadata, ttl time.Duration) HubRuntime {
strategy := hubmodule.ResolveStrategy(meta, cfg.StrategyOverrides(ttl))
return HubRuntime{
Config: cfg,
Module: meta,
CacheStrategy: strategy,
Rollout: flag,
}
}

View File

@@ -1,69 +0,0 @@
package config
import (
"fmt"
"strings"
"github.com/any-hub/any-hub/internal/hubmodule"
"github.com/any-hub/any-hub/internal/hubmodule/legacy"
)
// Rollout 字段说明legacy → modular 平滑迁移控制):
// - legacy-only强制使用 legacy 模块EffectiveModuleKey → legacy用于未迁移或需要快速回滚时。
// - dual新模块为默认保留 legacy 以便诊断/灰度;仅当 Module 非空时生效,否则回退 legacy-only。
// - modular仅使用新模块Module 为空或 legacy 模块时自动回退 legacy-only。
// 默认行为:未填写 Rollout 时,空 Module/legacy 模块默认 legacy-only其它模块默认 modular。
// 影响范围动态选择执行的模块键EffectiveModuleKey、路由日志中的 rollout_flag方便区分迁移阶段。
// parseRolloutFlag 将配置中的 rollout 字段标准化,并结合模块类型输出最终状态。
func parseRolloutFlag(raw string, moduleKey string) (legacy.RolloutFlag, error) {
normalized := strings.ToLower(strings.TrimSpace(raw))
if normalized == "" {
return defaultRolloutFlag(moduleKey), nil
}
switch normalized {
case string(legacy.RolloutLegacyOnly):
return legacy.RolloutLegacyOnly, nil
case string(legacy.RolloutDual):
if moduleKey == hubmodule.DefaultModuleKey() {
return legacy.RolloutLegacyOnly, nil
}
return legacy.RolloutDual, nil
case string(legacy.RolloutModular):
if moduleKey == hubmodule.DefaultModuleKey() {
return legacy.RolloutLegacyOnly, nil
}
return legacy.RolloutModular, nil
default:
return "", fmt.Errorf("不支持的 rollout 值: %s", raw)
}
}
func defaultRolloutFlag(moduleKey string) legacy.RolloutFlag {
if strings.TrimSpace(moduleKey) == "" || moduleKey == hubmodule.DefaultModuleKey() {
return legacy.RolloutLegacyOnly
}
return legacy.RolloutModular
}
// EffectiveModuleKey 根据 rollout 状态计算真实运行的模块。
func EffectiveModuleKey(moduleKey string, flag legacy.RolloutFlag) string {
if flag == legacy.RolloutLegacyOnly {
return hubmodule.DefaultModuleKey()
}
normalized := strings.ToLower(strings.TrimSpace(moduleKey))
if normalized == "" {
return hubmodule.DefaultModuleKey()
}
return normalized
}
// RolloutFlagValue 返回当前 Hub 的 rollout flag假定 Validate 已经通过)。
func (h HubConfig) RolloutFlagValue() legacy.RolloutFlag {
flag := legacy.RolloutFlag(strings.ToLower(strings.TrimSpace(h.Rollout)))
if flag == "" {
return defaultRolloutFlag(h.Module)
}
return flag
}

View File

@@ -64,18 +64,15 @@ type GlobalConfig struct {
// HubConfig 决定单个代理实例如何与下游/上游交互。
type HubConfig struct {
Name string `mapstructure:"Name"`
Domain string `mapstructure:"Domain"`
Upstream string `mapstructure:"Upstream"`
Proxy string `mapstructure:"Proxy"`
Type string `mapstructure:"Type"`
Module string `mapstructure:"Module"`
Rollout string `mapstructure:"Rollout"`
Username string `mapstructure:"Username"`
Password string `mapstructure:"Password"`
CacheTTL Duration `mapstructure:"CacheTTL"`
ValidationMode string `mapstructure:"ValidationMode"`
EnableHeadCheck bool `mapstructure:"EnableHeadCheck"`
Name string `mapstructure:"Name"`
Domain string `mapstructure:"Domain"`
Upstream string `mapstructure:"Upstream"`
Proxy string `mapstructure:"Proxy"`
Type string `mapstructure:"Type"`
Username string `mapstructure:"Username"`
Password string `mapstructure:"Password"`
CacheTTL Duration `mapstructure:"CacheTTL"`
ValidationMode string `mapstructure:"ValidationMode"`
}
// Config 是 TOML 文件映射的整体结构。
@@ -111,11 +108,12 @@ func CredentialModes(hubs []HubConfig) []string {
// StrategyOverrides 将 hub 层的 TTL/Validation 配置映射为模块策略覆盖项。
func (h HubConfig) StrategyOverrides(ttl time.Duration) hubmodule.StrategyOptions {
opts := hubmodule.StrategyOptions{
TTLOverride: ttl,
}
opts := hubmodule.StrategyOptions{}
if mode := strings.TrimSpace(h.ValidationMode); mode != "" {
opts.ValidationOverride = hubmodule.ValidationMode(mode)
}
if h.CacheTTL.DurationValue() > 0 {
opts.TTLOverride = ttl
}
return opts
}

View File

@@ -79,23 +79,9 @@ func (c *Config) Validate() error {
}
hub.Type = normalizedType
moduleKey := strings.ToLower(strings.TrimSpace(hub.Module))
if moduleKey == "" {
if _, ok := hubmodule.Resolve(normalizedType); ok && normalizedType != "" {
moduleKey = normalizedType
} else {
moduleKey = hubmodule.DefaultModuleKey()
}
if _, ok := hubmodule.Resolve(normalizedType); !ok {
return newFieldError(hubField(hub.Name, "Type"), fmt.Sprintf("未注册模块: %s", normalizedType))
}
if _, ok := hubmodule.Resolve(moduleKey); !ok {
return newFieldError(hubField(hub.Name, "Module"), fmt.Sprintf("未注册模块: %s", moduleKey))
}
hub.Module = moduleKey
flag, err := parseRolloutFlag(hub.Rollout, hub.Module)
if err != nil {
return newFieldError(hubField(hub.Name, "Rollout"), err.Error())
}
hub.Rollout = string(flag)
if hub.ValidationMode != "" {
mode := strings.ToLower(strings.TrimSpace(hub.ValidationMode))
switch mode {

View File

@@ -24,7 +24,7 @@ internal/hubmodule/
2. 填写模块特有逻辑与缓存策略,并确保包含中文注释解释设计。
3. 在模块目录添加 `module_test.go`,使用 `httptest.Server``t.TempDir()` 复现真实流量。
4. 运行 `make modules-test` 验证模块单元测试。
5. `[[Hub]].Module` 留空时会优先选择与 `Type` 同名的模块,实际迁移时仍建议显式填写,便于 diagnostics 标记 rollout
5. `[[Hub]].Type` 现已直接映射到同名模块;新增模块时记得将类型加入配置校验与示例配置
## 术语
- **Module Key**:模块唯一标识(如 `legacy``npm-tarball`)。

View File

@@ -25,76 +25,56 @@ func normalizePath(_ *hooks.RequestContext, p string, rawQuery []byte) (string,
func cachePolicy(_ *hooks.RequestContext, locatorPath string, current hooks.CachePolicy) hooks.CachePolicy {
clean := canonicalPath(locatorPath)
switch {
case isAptIndexPath(clean):
// 索引类Release/Packages需要 If-None-Match/If-Modified-Since 再验证。
current.AllowCache = true
current.AllowStore = true
current.RequireRevalidate = true
case isAptImmutablePath(clean):
if strings.Contains(clean, "/by-hash/") || strings.Contains(clean, "/pool/") {
// pool/*.deb 与 by-hash 路径视为不可变,直接缓存后续不再 HEAD。
current.AllowCache = true
current.AllowStore = true
current.RequireRevalidate = false
default:
current.AllowCache = false
current.AllowStore = false
current.RequireRevalidate = false
return current
}
if strings.Contains(clean, "/dists/") {
// 索引类Release/Packages/Contents需要 If-None-Match/If-Modified-Since 再验证。
if strings.HasSuffix(clean, "/release") ||
strings.HasSuffix(clean, "/inrelease") ||
strings.HasSuffix(clean, "/release.gpg") ||
strings.HasSuffix(clean, "/packages") ||
strings.HasSuffix(clean, "/packages.gz") ||
strings.HasSuffix(clean, "/packages.xz") ||
strings.HasSuffix(clean, "/sources") ||
strings.HasSuffix(clean, "/sources.gz") ||
strings.HasSuffix(clean, "/sources.xz") ||
strings.Contains(clean, "/contents-") {
current.AllowCache = true
current.AllowStore = true
current.RequireRevalidate = true
return current
}
}
current.AllowCache = false
current.AllowStore = false
current.RequireRevalidate = false
return current
}
func contentType(_ *hooks.RequestContext, locatorPath string) string {
clean := canonicalPath(locatorPath)
switch {
case strings.HasSuffix(locatorPath, ".gz"):
case strings.HasSuffix(clean, ".gz"):
return "application/gzip"
case strings.HasSuffix(locatorPath, ".xz"):
case strings.HasSuffix(clean, ".xz"):
return "application/x-xz"
case strings.HasSuffix(locatorPath, "Release.gpg"):
case strings.HasSuffix(clean, "release.gpg"):
return "application/pgp-signature"
case isAptIndexPath(locatorPath):
case strings.Contains(clean, "/dists/") &&
(strings.HasSuffix(clean, "/release") || strings.HasSuffix(clean, "/inrelease") || strings.HasSuffix(clean, "/release.gpg")):
return "text/plain"
default:
return ""
}
}
func isAptIndexPath(p string) bool {
clean := canonicalPath(p)
if isByHashPath(clean) {
return false
}
if strings.Contains(clean, "/dists/") {
if strings.HasSuffix(clean, "/release") ||
strings.HasSuffix(clean, "/inrelease") ||
strings.HasSuffix(clean, "/release.gpg") {
return true
}
}
return false
}
func isAptImmutablePath(p string) bool {
clean := canonicalPath(p)
if isByHashPath(clean) {
return true
}
if strings.Contains(clean, "/pool/") {
return true
}
return false
}
func isByHashPath(p string) bool {
clean := canonicalPath(p)
if strings.Contains(clean, "/dists/") {
return false
}
return strings.Contains(clean, "/by-hash/")
}
func canonicalPath(p string) string {
if p == "" {
return "/"

View File

@@ -1,6 +1,7 @@
package docker
import (
"net"
"strings"
"github.com/any-hub/any-hub/internal/proxy/hooks"
@@ -15,6 +16,9 @@ func init() {
}
func normalizePath(ctx *hooks.RequestContext, clean string, rawQuery []byte) (string, []byte) {
if ctx == nil || !isDockerHubHost(ctx.UpstreamHost) {
return clean, rawQuery
}
repo, rest, ok := splitDockerRepoPath(clean)
if !ok || repo == "" || strings.Contains(repo, "/") || repo == "library" {
return clean, rawQuery
@@ -54,6 +58,9 @@ func contentType(_ *hooks.RequestContext, locatorPath string) string {
}
func isDockerHubHost(host string) bool {
if parsedHost, _, err := net.SplitHostPort(host); err == nil {
host = parsedHost
}
switch strings.ToLower(host) {
case "registry-1.docker.io", "docker.io", "index.docker.io":
return true
@@ -62,6 +69,28 @@ func isDockerHubHost(host string) bool {
}
}
func isRegistryK8sHost(host string) bool {
if parsedHost, _, err := net.SplitHostPort(host); err == nil {
host = parsedHost
}
return strings.EqualFold(host, "registry.k8s.io")
}
func manifestFallbackPath(ctx *hooks.RequestContext, clean string) (string, bool) {
if ctx == nil || !isRegistryK8sHost(ctx.UpstreamHost) {
return "", false
}
repo, rest, ok := splitDockerRepoPath(clean)
if !ok || strings.Contains(repo, "/") || !strings.HasPrefix(rest, "/manifests/") {
return "", false
}
return "/v2/" + repo + "/" + repo + rest, true
}
func RegistryK8sManifestFallbackPath(ctx *hooks.RequestContext, clean string) (string, bool) {
return manifestFallbackPath(ctx, clean)
}
func splitDockerRepoPath(path string) (string, string, bool) {
if !strings.HasPrefix(path, "/v2/") {
return "", "", false

View File

@@ -19,6 +19,52 @@ func TestNormalizePathAddsLibraryForDockerHub(t *testing.T) {
}
}
func TestNormalizePathSkipsLibraryForNonDockerHub(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
path, _ := normalizePath(ctx, "/v2/kube-apiserver/manifests/v1.35.3", nil)
if path != "/v2/kube-apiserver/manifests/v1.35.3" {
t.Fatalf("expected non-docker hub path to remain unchanged, got %s", path)
}
}
func TestIsRegistryK8sHost(t *testing.T) {
if !isRegistryK8sHost("registry.k8s.io") {
t.Fatalf("expected registry.k8s.io to match")
}
if isRegistryK8sHost("example.com") {
t.Fatalf("expected non-registry.k8s.io host to be ignored")
}
}
func TestRegistryK8sManifestFallbackPath(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
path, ok := manifestFallbackPath(ctx, "/v2/coredns/manifests/v1.13.1")
if !ok || path != "/v2/coredns/coredns/manifests/v1.13.1" {
t.Fatalf("expected fallback path, got %q ok=%v", path, ok)
}
}
func TestRegistryK8sManifestFallbackPathRejectsMultiSegmentRepo(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/coredns/manifests/v1.13.1"); ok {
t.Fatalf("expected multi-segment repo to be ignored")
}
}
func TestRegistryK8sManifestFallbackPathRejectsNonManifest(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/blobs/sha256:deadbeef"); ok {
t.Fatalf("expected non-manifest path to be ignored")
}
}
func TestRegistryK8sManifestFallbackPathRejectsNonRegistryHost(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "mirror.gcr.io"}
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/manifests/v1.13.1"); ok {
t.Fatalf("expected non-registry.k8s.io host to be ignored")
}
}
func TestSplitDockerRepoPath(t *testing.T) {
repo, rest, ok := splitDockerRepoPath("/v2/library/nginx/manifests/latest")
if !ok || repo != "library/nginx" || rest != "/manifests/latest" {

View File

@@ -1,65 +0,0 @@
package legacy
import (
"sort"
"strings"
"sync"
)
// RolloutFlag 描述 legacy 模块迁移阶段。
type RolloutFlag string
const (
RolloutLegacyOnly RolloutFlag = "legacy-only"
RolloutDual RolloutFlag = "dual"
RolloutModular RolloutFlag = "modular"
)
// AdapterState 记录特定 Hub 在 legacy 适配器中的运行状态。
type AdapterState struct {
HubName string
ModuleKey string
Rollout RolloutFlag
}
var (
stateMu sync.RWMutex
state = make(map[string]AdapterState)
)
// RecordAdapterState 更新指定 Hub 的 rollout 状态,供诊断端和日志使用。
func RecordAdapterState(hubName, moduleKey string, flag RolloutFlag) {
if hubName == "" {
return
}
key := strings.ToLower(hubName)
stateMu.Lock()
state[key] = AdapterState{
HubName: hubName,
ModuleKey: moduleKey,
Rollout: flag,
}
stateMu.Unlock()
}
// SnapshotAdapterStates 返回所有 Hub 的 rollout 状态,按名称排序。
func SnapshotAdapterStates() []AdapterState {
stateMu.RLock()
defer stateMu.RUnlock()
if len(state) == 0 {
return nil
}
keys := make([]string, 0, len(state))
for k := range state {
keys = append(keys, k)
}
sort.Strings(keys)
result := make([]AdapterState, 0, len(keys))
for _, key := range keys {
result = append(result, state[key])
}
return result
}

View File

@@ -11,13 +11,14 @@ type StrategyOptions struct {
// ResolveStrategy 将模块的默认策略与 hub 级覆盖合并。
func ResolveStrategy(meta ModuleMetadata, opts StrategyOptions) CacheStrategyProfile {
strategy := meta.CacheStrategy
if strategy.TTLHint > 0 && opts.TTLOverride > 0 {
strategy.TTLHint = opts.TTLOverride
}
if opts.ValidationOverride != "" {
strategy.ValidationMode = opts.ValidationOverride
}
return normalizeStrategy(strategy)
strategy = normalizeStrategy(strategy)
if opts.TTLOverride > 0 {
strategy.TTLHint = opts.TTLOverride
}
return strategy
}
func normalizeStrategy(profile CacheStrategyProfile) CacheStrategyProfile {

View File

@@ -11,15 +11,13 @@ func BaseFields(action, configPath string) logrus.Fields {
}
// RequestFields 提供 hub/domain/命中状态字段,供代理请求日志复用。
func RequestFields(hub, domain, hubType, authMode, moduleKey, rolloutFlag string, cacheHit bool, legacyOnly bool) logrus.Fields {
func RequestFields(hub, domain, hubType, authMode, moduleKey string, cacheHit bool) logrus.Fields {
return logrus.Fields{
"hub": hub,
"domain": domain,
"hub_type": hubType,
"auth_mode": authMode,
"cache_hit": cacheHit,
"legacy_only": legacyOnly,
"module_key": moduleKey,
"rollout_flag": rolloutFlag,
"hub": hub,
"domain": domain,
"hub_type": hubType,
"auth_mode": authMode,
"cache_hit": cacheHit,
"module_key": moduleKey,
}
}

View File

@@ -8,7 +8,6 @@ import (
"github.com/gofiber/fiber/v3"
"github.com/sirupsen/logrus"
"github.com/any-hub/any-hub/internal/hubmodule"
"github.com/any-hub/any-hub/internal/logging"
"github.com/any-hub/any-hub/internal/server"
)
@@ -36,7 +35,7 @@ func RegisterModuleHandler(key string, handler server.ProxyHandler) {
MustRegisterModule(ModuleRegistration{Key: key, Handler: handler})
}
// Handle 实现 server.ProxyHandler根据 route.ModuleKey 选择 handler。
// Handle 实现 server.ProxyHandler根据 route.Module.Key 选择 handler。
func (f *Forwarder) Handle(c fiber.Ctx, route *server.HubRoute) error {
requestID := server.RequestID(c)
handler := f.lookup(route)
@@ -91,7 +90,7 @@ func (f *Forwarder) logModuleError(route *server.HubRoute, code string, err erro
func (f *Forwarder) lookup(route *server.HubRoute) server.ProxyHandler {
if route != nil {
if handler := lookupModuleHandler(route.ModuleKey); handler != nil {
if handler := lookupModuleHandler(route.Module.Key); handler != nil {
return handler
}
}
@@ -132,10 +131,8 @@ func (f *Forwarder) routeFields(route *server.HubRoute, requestID string) logrus
route.Config.Domain,
route.Config.Type,
route.Config.AuthMode(),
route.ModuleKey,
string(route.RolloutFlag),
route.Module.Key,
false,
route.ModuleKey == hubmodule.DefaultModuleKey(),
)
if requestID != "" {
fields["request_id"] = requestID

View File

@@ -10,7 +10,7 @@ import (
"github.com/valyala/fasthttp"
"github.com/any-hub/any-hub/internal/config"
"github.com/any-hub/any-hub/internal/hubmodule/legacy"
"github.com/any-hub/any-hub/internal/hubmodule"
"github.com/any-hub/any-hub/internal/server"
)
@@ -103,7 +103,8 @@ func testRouteWithModule(moduleKey string) *server.HubRoute {
Domain: "test.local",
Type: "custom",
},
ModuleKey: moduleKey,
RolloutFlag: legacy.RolloutModular,
Module: hubmodule.ModuleMetadata{
Key: moduleKey,
},
}
}

View File

@@ -3,7 +3,7 @@ package proxy
import (
"bytes"
"context"
"crypto/sha1"
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"encoding/json"
@@ -22,6 +22,7 @@ import (
"github.com/any-hub/any-hub/internal/cache"
"github.com/any-hub/any-hub/internal/hubmodule"
dockermodule "github.com/any-hub/any-hub/internal/hubmodule/docker"
"github.com/any-hub/any-hub/internal/logging"
"github.com/any-hub/any-hub/internal/proxy/hooks"
"github.com/any-hub/any-hub/internal/server"
@@ -65,8 +66,7 @@ func buildHookContext(route *server.HubRoute, c fiber.Ctx) *hooks.RequestContext
HubName: route.Config.Name,
Domain: route.Config.Domain,
HubType: route.Config.Type,
ModuleKey: route.ModuleKey,
RolloutFlag: string(route.RolloutFlag),
ModuleKey: route.Module.Key,
UpstreamHost: baseHost,
Method: c.Method(),
}
@@ -84,7 +84,7 @@ func hasHook(def hooks.Hooks) bool {
func (h *Handler) Handle(c fiber.Ctx, route *server.HubRoute) error {
started := time.Now()
requestID := server.RequestID(c)
hooksDef, ok := hooks.Fetch(route.ModuleKey)
hooksDef, ok := hooks.Fetch(route.Module.Key)
hookCtx := buildHookContext(route, c)
rawQuery := append([]byte(nil), c.Request().URI().QueryString()...)
cleanPath := normalizeRequestPath(route, string(c.Request().URI().Path()))
@@ -121,7 +121,7 @@ func (h *Handler) Handle(c fiber.Ctx, route *server.HubRoute) error {
// miss, continue
default:
h.logger.WithError(err).
WithFields(logrus.Fields{"hub": route.Config.Name, "module_key": route.ModuleKey}).
WithFields(logrus.Fields{"hub": route.Config.Name, "module_key": route.Module.Key}).
Warn("cache_get_failed")
}
}
@@ -135,7 +135,7 @@ func (h *Handler) Handle(c fiber.Ctx, route *server.HubRoute) error {
fresh, err := h.isCacheFresh(c, route, locator, cached.Entry, &hookState)
if err != nil {
h.logger.WithError(err).
WithFields(logrus.Fields{"hub": route.Config.Name, "module_key": route.ModuleKey}).
WithFields(logrus.Fields{"hub": route.Config.Name, "module_key": route.Module.Key}).
Warn("cache_revalidate_failed")
serve = false
} else if !fresh {
@@ -203,10 +203,25 @@ func (h *Handler) serveCache(
c.Status(status)
if method == http.MethodHead {
// 对 HEAD 请求仍向上游发起一次 HEAD以满足“显式请求 + 再验证”的期望。
if route != nil && route.UpstreamURL != nil {
if resp, err := h.revalidateRequest(c, route, resolveUpstreamURL(route, route.UpstreamURL, c, hook), result.Entry.Locator, ""); err == nil {
resp.Body.Close()
shouldRevalidate := true
if hook != nil && hook.hasHooks && hook.def.CachePolicy != nil {
policy := hook.def.CachePolicy(hook.ctx, result.Entry.Locator.Path, hooks.CachePolicy{
AllowCache: true,
AllowStore: true,
RequireRevalidate: true,
})
if !policy.AllowCache || !policy.AllowStore {
shouldRevalidate = false
} else {
shouldRevalidate = policy.RequireRevalidate
}
}
if shouldRevalidate {
if resp, err := h.revalidateRequest(c, route, effectiveRevalidateURL(route, c, result.Entry, hook), result.Entry.Locator, ""); err == nil {
resp.Body.Close()
}
}
}
result.Reader.Close()
@@ -245,6 +260,17 @@ func (h *Handler) fetchAndStream(
h.logResult(route, upstreamURL.String(), requestID, 0, false, started, err)
return h.writeError(c, fiber.StatusBadGateway, "upstream_failed")
}
effectiveUpstreamPath := ""
originalStatus := resp.StatusCode
originalPath := ""
if hook != nil {
originalPath = hook.clean
}
resp, upstreamURL, effectiveUpstreamPath, err = h.retryRegistryK8sManifestFallback(c, route, requestID, resp, upstreamURL, hook, originalStatus, originalPath)
if err != nil {
h.logResult(route, upstreamURL.String(), requestID, 0, false, started, err)
return h.writeError(c, fiber.StatusBadGateway, "upstream_failed")
}
if hook != nil && hook.hasHooks && hook.def.RewriteResponse != nil {
if rewritten, rewriteErr := applyHookRewrite(hook, resp, requestPath(c)); rewriteErr == nil {
resp = rewritten
@@ -259,7 +285,7 @@ func (h *Handler) fetchAndStream(
shouldStore := policy.allowStore && writer.Enabled() && isCacheableStatus(resp.StatusCode) &&
c.Method() == http.MethodGet
return h.consumeUpstream(c, route, locator, resp, shouldStore, writer, requestID, started, ctx)
return h.consumeUpstream(c, route, locator, resp, shouldStore, writer, requestID, started, ctx, effectiveUpstreamPath)
}
func applyHookRewrite(hook *hookState, resp *http.Response, path string) (*http.Response, error) {
@@ -311,13 +337,14 @@ func (h *Handler) consumeUpstream(
requestID string,
started time.Time,
ctx context.Context,
effectiveUpstreamPath string,
) error {
upstreamURL := resp.Request.URL.String()
method := c.Method()
authFailure := isAuthFailure(resp.StatusCode) && route.Config.HasCredentials()
if shouldStore {
return h.cacheAndStream(c, route, locator, resp, writer, requestID, started, ctx, upstreamURL)
return h.cacheAndStream(c, route, locator, resp, writer, requestID, started, ctx, upstreamURL, effectiveUpstreamPath)
}
copyResponseHeaders(c, resp.Header)
@@ -355,6 +382,7 @@ func (h *Handler) cacheAndStream(
started time.Time,
ctx context.Context,
upstreamURL string,
effectiveUpstreamPath string,
) error {
copyResponseHeaders(c, resp.Header)
c.Set("X-Any-Hub-Upstream", upstreamURL)
@@ -367,7 +395,7 @@ func (h *Handler) cacheAndStream(
// 使用 TeeReader 边向客户端回写边落盘,避免大文件在内存中完整缓冲。
reader := io.TeeReader(resp.Body, c.Response().BodyWriter())
opts := cache.PutOptions{ModTime: extractModTime(resp.Header)}
opts := cache.PutOptions{ModTime: extractModTime(resp.Header), EffectiveUpstreamPath: effectiveUpstreamPath}
entry, err := writer.Put(ctx, locator, reader, opts)
h.logResult(route, upstreamURL, requestID, resp.StatusCode, false, started, err)
if err != nil {
@@ -518,10 +546,8 @@ func (h *Handler) logResult(
route.Config.Domain,
route.Config.Type,
route.Config.AuthMode(),
route.ModuleKey,
string(route.RolloutFlag),
route.Module.Key,
cacheHit,
route.ModuleKey == hubmodule.DefaultModuleKey(),
)
fields["action"] = "proxy"
fields["upstream"] = upstream
@@ -574,7 +600,7 @@ func resolveContentType(route *server.HubRoute, locator cache.Locator, hook *hoo
func buildLocator(route *server.HubRoute, c fiber.Ctx, clean string, rawQuery []byte) cache.Locator {
query := rawQuery
if len(query) > 0 {
sum := sha1.Sum(query)
sum := sha256.Sum256(query)
clean = fmt.Sprintf("%s/__qs/%s", clean, hex.EncodeToString(sum[:]))
}
loc := cache.Locator{
@@ -762,7 +788,7 @@ func (h *Handler) isCacheFresh(
ctx = context.Background()
}
upstreamURL := resolveUpstreamURL(route, route.UpstreamURL, c, hook)
upstreamURL := effectiveRevalidateURL(route, c, entry, hook)
resp, err := h.revalidateRequest(c, route, upstreamURL, locator, "")
if err != nil {
return false, err
@@ -794,7 +820,7 @@ func (h *Handler) isCacheFresh(
return true, nil
case http.StatusOK:
if resp.Header.Get("Etag") == "" && resp.Header.Get("Docker-Content-Digest") == "" && resp.Header.Get("Last-Modified") == "" {
return true, nil
return false, nil
}
h.rememberETag(route, locator, resp)
remote := extractModTime(resp.Header)
@@ -813,6 +839,77 @@ func (h *Handler) isCacheFresh(
}
}
func effectiveRevalidateURL(route *server.HubRoute, c fiber.Ctx, entry cache.Entry, hook *hookState) *url.URL {
if route == nil || route.UpstreamURL == nil || entry.EffectiveUpstreamPath == "" {
return resolveUpstreamURL(route, route.UpstreamURL, c, hook)
}
clone := *route.UpstreamURL
clone.Path = entry.EffectiveUpstreamPath
clone.RawPath = entry.EffectiveUpstreamPath
return &clone
}
func (h *Handler) retryRegistryK8sManifestFallback(
c fiber.Ctx,
route *server.HubRoute,
requestID string,
resp *http.Response,
upstreamURL *url.URL,
hook *hookState,
originalStatus int,
originalPath string,
) (*http.Response, *url.URL, string, error) {
if resp == nil || resp.StatusCode != http.StatusNotFound || hook == nil || hook.ctx == nil {
return resp, upstreamURL, "", nil
}
fallbackPath, ok := dockermodule.RegistryK8sManifestFallbackPath(hook.ctx, hook.clean)
if !ok {
return resp, upstreamURL, "", nil
}
resp.Body.Close()
fallbackHook := *hook
fallbackHook.clean = fallbackPath
fallbackResp, fallbackURL, err := h.executeRequest(c, route, &fallbackHook)
if err != nil {
return nil, upstreamURL, "", err
}
fallbackResp, fallbackURL, err = h.retryOnAuthFailure(c, route, requestID, time.Now(), fallbackResp, fallbackURL, &fallbackHook)
if err != nil {
return nil, upstreamURL, "", err
}
h.logRegistryK8sFallback(route, requestID, originalPath, fallbackPath, originalStatus, c.Method())
if fallbackResp.StatusCode == http.StatusOK {
return fallbackResp, fallbackURL, fallbackPath, nil
}
return fallbackResp, fallbackURL, "", nil
}
func (h *Handler) logRegistryK8sFallback(route *server.HubRoute, requestID string, originalPath string, fallbackPath string, originalStatus int, method string) {
if route == nil || h == nil || h.logger == nil {
return
}
fields := logging.RequestFields(
route.Config.Name,
route.Config.Domain,
route.Config.Type,
route.Config.AuthMode(),
route.Module.Key,
false,
)
fields["action"] = "proxy_fallback"
if route.UpstreamURL != nil {
fields["upstream_host"] = route.UpstreamURL.Hostname()
}
fields["original_path"] = originalPath
fields["fallback_path"] = fallbackPath
fields["original_status"] = originalStatus
fields["method"] = method
if requestID != "" {
fields["request_id"] = requestID
}
h.logger.WithFields(fields).Info("proxy_registry_k8s_fallback")
}
func (h *Handler) revalidateRequest(
c fiber.Ctx,
route *server.HubRoute,
@@ -971,10 +1068,8 @@ func (h *Handler) logAuthRetry(route *server.HubRoute, upstream string, requestI
route.Config.Domain,
route.Config.Type,
route.Config.AuthMode(),
route.ModuleKey,
string(route.RolloutFlag),
route.Module.Key,
false,
route.ModuleKey == hubmodule.DefaultModuleKey(),
)
fields["action"] = "proxy_retry"
fields["upstream"] = upstream
@@ -992,10 +1087,8 @@ func (h *Handler) logAuthFailure(route *server.HubRoute, upstream string, reques
route.Config.Domain,
route.Config.Type,
route.Config.AuthMode(),
route.ModuleKey,
string(route.RolloutFlag),
route.Module.Key,
false,
route.ModuleKey == hubmodule.DefaultModuleKey(),
)
fields["action"] = "proxy"
fields["upstream"] = upstream

View File

@@ -0,0 +1,55 @@
package proxy
import (
"bytes"
"net/url"
"strings"
"testing"
"github.com/sirupsen/logrus"
"github.com/any-hub/any-hub/internal/config"
"github.com/any-hub/any-hub/internal/hubmodule"
"github.com/any-hub/any-hub/internal/server"
)
func TestRegistryK8sManifestFallbackLogsStructuredEvent(t *testing.T) {
buf := new(bytes.Buffer)
logger := logrus.New()
logger.SetOutput(buf)
logger.SetFormatter(&logrus.JSONFormatter{})
upstreamURL, err := url.Parse("http://registry.k8s.io")
if err != nil {
t.Fatalf("parse upstream url: %v", err)
}
h := NewHandler(nil, logger, nil)
route := &server.HubRoute{
Config: config.HubConfig{
Name: "docker",
Domain: "k8s.hub.local",
Type: "docker",
},
Module: hubmodule.ModuleMetadata{Key: "docker"},
UpstreamURL: upstreamURL,
}
h.logRegistryK8sFallback(route, "req-1", "/v2/coredns/manifests/v1.13.1", "/v2/coredns/coredns/manifests/v1.13.1", 404, "GET")
output := buf.String()
for _, want := range []string{
"proxy_registry_k8s_fallback",
`"hub":"docker"`,
`"domain":"k8s.hub.local"`,
`"upstream_host":"registry.k8s.io"`,
`"original_path":"/v2/coredns/manifests/v1.13.1"`,
`"fallback_path":"/v2/coredns/coredns/manifests/v1.13.1"`,
`"original_status":404`,
`"method":"GET"`,
} {
if !strings.Contains(output, want) {
t.Fatalf("expected log output to contain %s, got %s", want, output)
}
}
}

View File

@@ -13,7 +13,6 @@ type RequestContext struct {
Domain string
HubType string
ModuleKey string
RolloutFlag string
UpstreamHost string
Method string
}

View File

@@ -31,9 +31,12 @@ func NewUpstreamClient(cfg *config.Config) *http.Client {
timeout = cfg.Global.UpstreamTimeout.DurationValue()
}
transport := defaultTransport.Clone()
// Use UpstreamTimeout as ResponseHeaderTimeout to avoid killing long streaming downloads.
transport.ResponseHeaderTimeout = timeout
return &http.Client{
Timeout: timeout,
Transport: defaultTransport.Clone(),
Transport: transport,
}
}

View File

@@ -16,8 +16,12 @@ func TestNewUpstreamClientUsesConfigTimeout(t *testing.T) {
}
client := NewUpstreamClient(cfg)
if client.Timeout != 45*time.Second {
t.Fatalf("expected timeout 45s, got %s", client.Timeout)
transport, ok := client.Transport.(*http.Transport)
if !ok {
t.Fatalf("expected *http.Transport, got %T", client.Transport)
}
if transport.ResponseHeaderTimeout != 45*time.Second {
t.Fatalf("expected response header timeout 45s, got %s", transport.ResponseHeaderTimeout)
}
}

View File

@@ -11,7 +11,6 @@ import (
"github.com/any-hub/any-hub/internal/config"
"github.com/any-hub/any-hub/internal/hubmodule"
"github.com/any-hub/any-hub/internal/hubmodule/legacy"
)
// HubRoute 将 Hub 配置与派生属性(如缓存 TTL、解析后的 Upstream/Proxy URL
@@ -26,13 +25,10 @@ type HubRoute struct {
// UpstreamURL/ProxyURL 在构造 Registry 时提前解析完成,便于后续请求快速复用。
UpstreamURL *url.URL
ProxyURL *url.URL
// ModuleKey/Module 记录当前 hub 选用的模块及其元数据,便于日志与观测。
ModuleKey string
Module hubmodule.ModuleMetadata
// Module 记录当前 hub 选用的模块元数据,便于日志与观测。
Module hubmodule.ModuleMetadata
// CacheStrategy 代表模块默认策略与 hub 覆盖后的最终结果。
CacheStrategy hubmodule.CacheStrategyProfile
// RolloutFlag 反映当前 hub 的 legacy → modular 迁移状态,供日志/诊断使用。
RolloutFlag legacy.RolloutFlag
}
// HubRegistry 提供 Host/Host:port 到 HubRoute 的查询能力,所有 Hub 共享同一个监听端口。
@@ -106,9 +102,11 @@ func (r *HubRegistry) List() []HubRoute {
}
func buildHubRoute(cfg *config.Config, hub config.HubConfig) (*HubRoute, error) {
flag := hub.RolloutFlagValue()
effectiveKey := config.EffectiveModuleKey(hub.Module, flag)
meta, err := moduleMetadataForKey(effectiveKey)
moduleKey := strings.ToLower(strings.TrimSpace(hub.Type))
if moduleKey == "" {
return nil, fmt.Errorf("hub %s: 缺少 Type", hub.Name)
}
meta, err := moduleMetadataForKey(moduleKey)
if err != nil {
return nil, fmt.Errorf("hub %s: %w", hub.Name, err)
}
@@ -127,8 +125,7 @@ func buildHubRoute(cfg *config.Config, hub config.HubConfig) (*HubRoute, error)
}
effectiveTTL := cfg.EffectiveCacheTTL(hub)
runtime := config.BuildHubRuntime(hub, meta, effectiveTTL, flag)
legacy.RecordAdapterState(hub.Name, runtime.Module.Key, flag)
runtime := config.BuildHubRuntime(hub, meta, effectiveTTL)
return &HubRoute{
Config: hub,
@@ -136,10 +133,8 @@ func buildHubRoute(cfg *config.Config, hub config.HubConfig) (*HubRoute, error)
CacheTTL: effectiveTTL,
UpstreamURL: upstreamURL,
ProxyURL: proxyURL,
ModuleKey: runtime.Module.Key,
Module: runtime.Module,
CacheStrategy: runtime.CacheStrategy,
RolloutFlag: runtime.Rollout,
}, nil
}

View File

@@ -5,7 +5,6 @@ import (
"time"
"github.com/any-hub/any-hub/internal/config"
"github.com/any-hub/any-hub/internal/hubmodule/legacy"
)
func TestHubRegistryLookupByHost(t *testing.T) {
@@ -16,11 +15,10 @@ func TestHubRegistryLookupByHost(t *testing.T) {
},
Hubs: []config.HubConfig{
{
Name: "docker",
Domain: "docker.hub.local",
Type: "docker",
Upstream: "https://registry-1.docker.io",
EnableHeadCheck: true,
Name: "docker",
Domain: "docker.hub.local",
Type: "docker",
Upstream: "https://registry-1.docker.io",
},
{
Name: "npm",
@@ -55,8 +53,8 @@ func TestHubRegistryLookupByHost(t *testing.T) {
if route.CacheStrategy.ValidationMode == "" {
t.Fatalf("cache strategy validation mode should not be empty")
}
if route.RolloutFlag != legacy.RolloutModular {
t.Fatalf("default rollout flag should be modular")
if route.Module.Key != "docker" {
t.Fatalf("expected docker module, got %s", route.Module.Key)
}
if route.UpstreamURL.String() != "https://registry-1.docker.io" {

View File

@@ -65,8 +65,6 @@ type hubBindingPayload struct {
ModuleKey string `json:"module_key"`
Domain string `json:"domain"`
Port int `json:"port"`
Rollout string `json:"rollout_flag"`
Legacy bool `json:"legacy_only"`
}
func encodeModules(mods []hubmodule.ModuleMetadata, status map[string]string) []modulePayload {
@@ -115,11 +113,9 @@ func encodeHubBindings(routes []server.HubRoute) []hubBindingPayload {
for _, route := range routes {
result = append(result, hubBindingPayload{
HubName: route.Config.Name,
ModuleKey: route.ModuleKey,
ModuleKey: route.Module.Key,
Domain: route.Config.Domain,
Port: route.ListenPort,
Rollout: string(route.RolloutFlag),
Legacy: route.ModuleKey == hubmodule.DefaultModuleKey(),
})
}
return result

View File

@@ -15,7 +15,7 @@
- `MaxMemoryCacheSize` (bytes, optional, default 268435456)
- `MaxRetries` (int >=0, default 3)
- `InitialBackoff` (duration, default 1s)
- `UpstreamTimeout` (duration, default 30s)
- `UpstreamTimeout` (duration, default 30s, used as upstream response header timeout; body can stream longer)
- **Validation Rules**: 路径必须存在或可创建;数值必须 >0LogLevel 必须匹配允许枚举。
- **Relationships**: 被 `Config` 聚合并为 `HubConfig` 提供默认值。
@@ -28,7 +28,6 @@
- `Upstream` (string, required, http/https URL)
- `Proxy` (string, optional, URL)
- `CacheTTL` (duration, optional, overrides global)
- `EnableHeadCheck` (bool, optional, default true)
- **Validation Rules**: `Name` 必须唯一;`Domain` + `Port` 组合不得冲突URL 必须可解析。
- **Relationships**: 属于 `Config`,在运行时用于初始化路由、缓存目录 `StoragePath/<Name>`

View File

@@ -4,7 +4,7 @@
### HubRoute
- **Description**: Host/端口到上游仓库的映射,供 Fiber 路由和 Proxy handler 使用。
- **Fields**: `Name` (string, unique), `Domain` (string, FQDN), `Port` (int, 1-65535), `Upstream` (URL), `Proxy` (URL, optional), `CacheTTL` (duration override), `EnableHeadCheck` (bool).
- **Fields**: `Name` (string, unique), `Domain` (string, FQDN), `Port` (int, 1-65535), `Upstream` (URL), `Proxy` (URL, optional), `CacheTTL` (duration override).
- **Validation**: Name 唯一Domain 不含协议/路径Upstream 必须 http/https。
- **Relationships**: 由 config 加载到 `HubRegistry`;与 CacheEntry、ProxyRequest 通过 `Name` 关联。

View File

@@ -94,6 +94,3 @@ components:
type: string
port:
type: integer
rollout_flag:
type: string
enum: [legacy-only, dual, modular]

View File

@@ -13,13 +13,12 @@ The modular architecture introduces explicit metadata describing which proxy+cac
- `Domain` *(string, required)* hostname clients access; must be unique per process.
- `Port` *(int, required)* listen port; validated to 165535.
- `Upstream` *(string, required)* base URL for upstream registry; must be HTTPS or explicitly whitelisted HTTP.
- `Module` *(string, optional, default `"legacy"`)* key resolved through module registry. Validation ensures module exists at load time.
- `Type` *(string, required)* module selector; validated against the registered module set.
- `CacheTTL`, `Proxy`, and other overrides *(optional)* reuse existing schema; modules may read these via dependency injection.
- **Relationships**:
- `HubConfigEntry.Module``ModuleMetadata.Key` (many-to-one).
- `HubConfigEntry.Type``ModuleMetadata.Key` (many-to-one).
- **Validation Rules**:
- Missing `Module` implicitly maps to `legacy` to preserve backward compatibility.
- Changing `Module` requires a migration plan; config loader logs module name for observability.
- Invalid `Type` values reject config load; operators must add the type to the supported list alongside module registration.
### 2. ModuleMetadata
- **Fields**:
@@ -57,20 +56,12 @@ The modular architecture introduces explicit metadata describing which proxy+cac
- TTL must be positive.
- Modules flagged as `SupportsStreamingWrite=false` must document fallback behavior before registration.
### 5. LegacyAdapterState
- **Purpose**: Tracks which hubs still run through the old shared implementation to support progressive migration.
- **Fields**:
- `HubName` *(string)* references `HubConfigEntry.Name`.
- ` rolloutFlag` *(enum: `legacy-only`, `dual`, `modular`)* indicates traffic split for that hub.
- `FallbackDeadline` *(timestamp, optional)* when legacy path will be removed.
- **Storage**: In-memory map derived from config + environment flags; optionally surfaced via diagnostics endpoint.
## State Transitions
1. **Module Adoption**
- Start: `HubConfigEntry.Module = "legacy"`.
- Transition: operator edits config to new module key, runs validation.
- Result: registry resolves new module, `LegacyAdapterState` updated to `dual` until rollout flag toggled fully.
- Start: `HubConfigEntry.Type = "legacy"` (or other baseline).
- Transition: operator edits config to new module type, runs validation.
- Result: registry resolves new module and routes all traffic to it immediately.
2. **Cache Strategy Update**
- Start: Module uses default TTL.

View File

@@ -21,7 +21,7 @@ Modularize the proxy and cache layers so every hub type (npm, Docker, PyPI, futu
**Constraints**: 禁止 Web UI 或账号体系;所有行为受单一 TOML 配置控制;每个 Hub 需独立 Domain/Port 绑定;仅匿名访问
**Scale/Scope**: 支撑 Docker/NPM/Go/PyPI 等多仓代理,面向弱网及离线缓存复用场景
**Module Registry Location**: `internal/hubmodule/registry.go` 暴露注册/解析 API模块子目录位于 `internal/hubmodule/<name>/`
**Config Binding for Modules**: `[[Hub]].Module` 字段控制模块名,默认 `legacy`,配置加载阶段校验必须命中已注册模块
**Config Binding for Modules**: `[[Hub]].Type` 字段控制模块名(同名映射),配置加载阶段校验类型对应的模块已注册
## Constitution Check
@@ -29,7 +29,7 @@ Modularize the proxy and cache layers so every hub type (npm, Docker, PyPI, futu
- Feature 仍然是“轻量多仓 CLI 代理”,未引入 Web UI、账号体系或与代理无关的能力。
- 仅使用 Go + 宪法指定依赖;任何新第三方库都已在本计划中说明理由与审核结论。
- 行为完全由 `config.toml` 控制,新增 `[[Hub]].Module` 配置项已规划默认值、校验与迁移策略
- 行为完全由 `config.toml` 控制,`[[Hub]].Type` 直接驱动模块绑定,校验列表随模块扩展更新
- 方案维持缓存优先 + 流式回源路径,并给出命中/回源/失败的日志与观测手段。
- 计划内列出了配置解析、缓存读写、Host Header 路由等强制测试与中文注释交付范围。
@@ -103,14 +103,14 @@ tests/ # `go test` 下的单元/集成测试,用临时目
### Post-Design Constitution Check
- New diagnostics endpoint remains internal and optional; no UI/login introduced. ✅ Principle I
- Code still single Go binary with existing dependency set. ✅ Principle II
- `Module` field documented with defaults, validation, and migration path; no extra config sources. ✅ Principle III
- `Type` 字段即模块绑定点,文档与校验同步更新;无额外配置源。 ✅ Principle III
- Cache strategy enforces“原始路径 == 磁盘路径”的布局与流式回源,相关观测需求写入 contracts。✅ Principle IV
- Logs/quickstart/test guidance ensure observability and Chinese documentation continue. ✅ Principle V
## Phase 2 Implementation Outlook (pre-tasks)
1. **Module Registry & Interfaces**: Create `internal/hubmodule` package, define shared interfaces, implement registry with tests, and expose diagnostics data source reused by HTTP endpoints.
2. **Config Loader & Validation**: Extend `internal/config/types.go` and `validation.go` to include `Module` with default `legacy`, plus wiring to registry resolution during startup.
2. **Config Loader & Validation**: Extend `internal/config/types.go` and `validation.go` to bind modules via `Type`, plus wiring to registry resolution during startup.
3. **Legacy Adapter & Migration Switches**: Provide adapter module that wraps current shared proxy/cache, plus feature flags or config toggles to control rollout states per hub.
4. **Module Implementations**: Carve existing npm/docker/pypi logic into dedicated modules within `internal/hubmodule/`, ensuring cache writer复用原始请求路径与必要的 telemetry 标签。
5. **Observability/Diagnostics**: Implement `//modules` endpoint (Fiber route) and log tags showing `module_key` on cache/proxy events.

View File

@@ -12,20 +12,20 @@
4. Add tests under the module directory and run `make modules-test` (delegates to `go test ./internal/hubmodule/...`).
## 3. Bind Module via Config
1. Edit `config.toml` and set `Module = "<module-key>"` inside the target `[[Hub]]` block (omit to use `legacy`).
2. While validating a new module, set `Rollout = "dual"` so you can flip back to legacy without editing other fields.
1. Add your module type to `internal/config/validation.go` and the sample config if it represents a new protocol.
2. Edit `config.toml` and set `Type = "<module-type>"` inside the target `[[Hub]]` block.
3. (Optional) Override cache behavior per hub using existing fields (`CacheTTL`, etc.).
4. Run `ANY_HUB_CONFIG=./config.toml go test ./...` (or `make modules-test`) to ensure loader validation passes and the module registry sees your key.
## 4. Run and Verify
1. Start the binary: `go run ./cmd/any-hub --config ./config.toml`.
2. Use `curl -H "Host: <hub-domain>" http://127.0.0.1:<port>/<path>` to produce traffic, then hit `curl http://127.0.0.1:<port>/-/modules` and confirm the hub binding points to your module with the expected `rollout_flag`.
2. Use `curl -H "Host: <hub-domain>" http://127.0.0.1:<port>/<path>` to produce traffic, then hit `curl http://127.0.0.1:<port>/-/modules` and confirm the hub binding points to your module key.
3. Inspect `./storage/<hub>/` to confirm the cached files mirror the upstream path (no suffix). When a path also has child entries (e.g., `/pkg` metadata plus `/pkg/-/...` tarballs), the metadata payload is stored in a `__content` file under that directory so both artifacts can coexist. PyPI Simple responses rewrite distribution links to `/files/<scheme>/<host>/<path>` so that wheels/tarballs are fetched through the proxy and cached alongside the HTML/JSON index. Verify TTL overrides are propagated.
4. Monitor `logs/any-hub.log` (or the sample `logs/module_migration_sample.log`) to verify each entry exposes `module_key` + `rollout_flag`. Example:
4. Monitor `logs/any-hub.log` (or the sample `logs/module_migration_sample.log`) to verify each entry exposes `module_key`. Example:
```json
{"action":"proxy","hub":"testhub","module_key":"testhub","rollout_flag":"dual","cache_hit":false,"upstream_status":200}
{"action":"proxy","hub":"testhub","module_key":"testhub","cache_hit":false,"upstream_status":200}
```
5. Exercise rollback by switching `Rollout = "legacy-only"` (or `Module = "legacy"` if needed) and re-running the traffic to ensure diagnostics/logs show the transition.
5. Exercise rollback by reverting the config change (or type rename) and re-running the traffic to ensure diagnostics/logs show the transition.
## 5. Ship
1. Commit module code + config docs.

View File

@@ -15,7 +15,7 @@
## Phase 2: Foundational (Blocking Prerequisites)
- [X] T003 Create shared module interfaces + registry in `internal/hubmodule/interfaces.go` and `internal/hubmodule/registry.go`
- [X] T004 Extend config schema with `[[Hub]].Module` defaults/validation plus sample configs in `internal/config/{types.go,validation.go,loader.go}` and `configs/*.toml`
- [X] T004 Extend config schema with module defaults/validation2025-03 起由 `Type` 直接驱动,`[[Hub]].Module` 已淘汰)
- [X] T005 [P] Wire server bootstrap to resolve modules once and inject into proxy/cache layers (`internal/server/bootstrap.go`, `internal/proxy/handler.go`)
**Checkpoint**: Registry + config plumbing complete; user story work may begin.
@@ -61,6 +61,8 @@
## Phase 5: User Story 3 - Operate Mixed Generations During Migration (Priority: P3)
> 2025-03: Rollout flags were removed; this section remains for historical tracking only.
**Goal**: Support dual-path deployments with diagnostics/logging to track legacy vs. modular hubs.
**Independent Test**: Run mixed legacy/modular hubs, flip rollout flags, and confirm logs + diagnostics show module ownership and allow rollback.
@@ -73,7 +75,7 @@
- [X] T019 [US3] Implement `LegacyAdapterState` tracker + rollout flag parsing (`internal/hubmodule/legacy/state.go`, `internal/config/runtime_flags.go`)
- [X] T020 [US3] Implement Fiber handler + routing for `//modules` diagnostics (`internal/server/routes/modules.go`, `internal/server/router.go`)
- [X] T021 [US3] Add structured log fields (`module_key`, `rollout_flag`) across logging middleware (`internal/server/middleware/logging.go`, `internal/proxy/logging.go`)
- [X] T021 [US3] Add structured log fields (`module_key`) across logging middleware (`internal/server/middleware/logging.go`, `internal/proxy/logging.go`)
- [X] T022 [US3] Document operational playbook for phased migration (`docs/operations/migration.md`)
---

View File

@@ -9,5 +9,5 @@ If future external endpoints are added, document them here with request/response
## Error Behaviors
- **module_handler_missing**: Forwarder无法找到给定 module_key 的 handler 时返回 `500 {"error":"module_handler_missing"}`,并记录 `hub/domain/module_key/rollout_flag` 等日志字段,便于排查配置缺失或注册遗漏。
- **module_handler_missing**: Forwarder无法找到给定 module_key 的 handler 时返回 `500 {"error":"module_handler_missing"}`,并记录 `hub/domain/module_key` 等日志字段,便于排查配置缺失或注册遗漏。
- **module_handler_panic**: Module handler panic 被捕获后返回 `500 {"error":"module_handler_panic"}`,同时输出结构化日志 `error=module_handler_panic`,防止进程崩溃并提供观测。

View File

@@ -14,7 +14,7 @@
- **Proxy Dispatcher**
- Attributes: handler map (module_key → handler), default handler fallback.
- Behavior: Lookup by route.ModuleKey and invoke handler; wraps errors/logging.
- Behavior: Lookup by the hub's module key (derived from Type / route.Module.Key) and invoke handler; wraps errors/logging.
- Constraints: If handler missing, return 5xx with observable logging.
- **Cache Policy**

View File

@@ -27,7 +27,7 @@
## Relationships
- Hub module 注册时同时在 HookRegistry 与 Forwarder handler map 建立关联。
- ProxyDispatcher 在请求进入后根据 route.ModuleKey 查询 Hook + handler。
- ProxyDispatcher 在请求进入后根据 route.Module.Key(来自 Hub Type查询 Hook + handler。
- Diagnostics 依赖 HookRegistry 与 HubRegistry 联合输出状态。
## Lifecycle

View File

@@ -40,5 +40,5 @@ curl -s http://localhost:8080/-/modules | jq '.modules[].hook_status'
```
- 确认新模块标记为 `registered`,未注册模块显示 `missing`legacy handler 仍可作为兜底。
- 如果需要查看全局状态,可检查 `hook_registry` 字段,它返回每个 module_key 的注册情况。
- `hubs[].legacy_only``true` 时表示该 Hub 仍绑定 legacy 模块;迁移完成后应显式设置 `[[Hub]].Module`
- `hubs[].module_key` 应与配置中的 `Type` 对齐legacy 模块仅作为兜底存在,推荐尽快替换为协议专用模块
- 启动阶段会验证每个模块是否注册 Hook缺失则直接退出避免运行期静默回退。

View File

@@ -41,7 +41,6 @@ func TestAPKProxyCachesIndexAndPackages(t *testing.T) {
Name: "apk",
Domain: "apk.hub.local",
Type: "apk",
Module: "apk",
Upstream: stub.URL,
},
},

View File

@@ -41,7 +41,6 @@ func TestAptPackagesCachedWithoutRevalidate(t *testing.T) {
Name: "apt",
Domain: "apt.hub.local",
Type: "debian",
Module: "debian",
Upstream: stub.URL,
},
},

View File

@@ -36,7 +36,6 @@ func TestAptUpdateCachesIndexes(t *testing.T) {
Name: "apt",
Domain: "apt.hub.local",
Type: "debian",
Module: "debian",
Upstream: stub.URL,
},
},

View File

@@ -26,6 +26,8 @@ import (
const (
dockerManifestPath = "/v2/library/cache-flow/manifests/latest"
dockerManifestNoNamespacePath = "/v2/cache-flow/manifests/latest"
registryK8sOriginalManifest = "/v2/coredns/manifests/v1.13.1"
registryK8sFallbackManifest = "/v2/coredns/coredns/manifests/v1.13.1"
)
func TestCacheFlowWithConditionalRequest(t *testing.T) {
@@ -44,7 +46,6 @@ func TestCacheFlowWithConditionalRequest(t *testing.T) {
Name: "docker",
Domain: "docker.hub.local",
Type: "docker",
Module: "docker",
Upstream: upstream.URL,
},
},
@@ -146,7 +147,6 @@ func TestDockerManifestHeadDoesNotOverwriteCache(t *testing.T) {
Name: "docker",
Domain: "docker.hub.local",
Type: "docker",
Module: "docker",
Upstream: upstream.URL,
},
},
@@ -224,8 +224,8 @@ func TestDockerManifestHeadDoesNotOverwriteCache(t *testing.T) {
}
}
func TestDockerNamespaceFallbackAddsLibrary(t *testing.T) {
stub := newCacheFlowStub(t, dockerManifestPath)
func TestDockerNonDockerHubUpstreamKeepsOriginalPath(t *testing.T) {
stub := newCacheFlowStub(t, dockerManifestNoNamespacePath)
defer stub.Close()
storageDir := t.TempDir()
@@ -279,7 +279,7 @@ func TestDockerNamespaceFallbackAddsLibrary(t *testing.T) {
}
if resp.StatusCode != fiber.StatusOK {
body, _ := io.ReadAll(resp.Body)
t.Fatalf("expected 200 when fallback applies, got %d (body=%s)", resp.StatusCode, string(body))
t.Fatalf("expected 200 when upstream keeps original path, got %d (body=%s)", resp.StatusCode, string(body))
}
resp.Body.Close()
@@ -288,6 +288,140 @@ func TestDockerNamespaceFallbackAddsLibrary(t *testing.T) {
}
}
func TestRegistryK8sManifestFallbackRetry(t *testing.T) {
stub := newRegistryK8sStub(t)
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
defer stub.Close()
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
req.Host = "k8s.hub.local"
resp, err := app.Test(req)
if err != nil {
t.Fatalf("app.Test failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != fiber.StatusOK {
body, _ := io.ReadAll(resp.Body)
t.Fatalf("expected 200, got %d (body=%s)", resp.StatusCode, string(body))
}
if stub.pathHits(registryK8sOriginalManifest, http.MethodGet) != 1 {
t.Fatalf("expected single original manifest hit")
}
if stub.pathHits(registryK8sFallbackManifest, http.MethodGet) != 1 {
t.Fatalf("expected single fallback manifest hit")
}
}
func TestRegistryK8sManifestFallbackNotAttemptedWhenOriginalSucceeds(t *testing.T) {
stub := newRegistryK8sStub(t)
stub.setResponse(registryK8sOriginalManifest, http.StatusOK, []byte("original manifest"))
defer stub.Close()
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
req.Host = "k8s.hub.local"
resp, err := app.Test(req)
if err != nil {
t.Fatalf("app.Test failed: %v", err)
}
resp.Body.Close()
if resp.StatusCode != fiber.StatusOK {
t.Fatalf("expected 200, got %d", resp.StatusCode)
}
if stub.pathHits(registryK8sFallbackManifest, http.MethodGet) != 0 {
t.Fatalf("expected no fallback hit")
}
}
func TestRegistryK8sManifestFallbackNotAttemptedForNonRegistryHost(t *testing.T) {
stub := newRegistryK8sStub(t)
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
defer stub.Close()
app := newDockerProxyTestApp(t, t.TempDir(), "mirror.hub.local", stub.URL)
req := httptest.NewRequest(http.MethodGet, "http://mirror.hub.local"+registryK8sOriginalManifest, nil)
req.Host = "mirror.hub.local"
resp, err := app.Test(req)
if err != nil {
t.Fatalf("app.Test failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != fiber.StatusNotFound {
body, _ := io.ReadAll(resp.Body)
t.Fatalf("expected 404, got %d (body=%s)", resp.StatusCode, string(body))
}
if stub.pathHits(registryK8sFallbackManifest, http.MethodGet) != 0 {
t.Fatalf("expected no fallback hit for non-registry host")
}
}
func TestRegistryK8sManifestFallbackSecondRequestHitsCache(t *testing.T) {
stub := newRegistryK8sStub(t)
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
defer stub.Close()
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
makeRequest := func() *http.Response {
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
req.Host = "k8s.hub.local"
resp, err := app.Test(req)
if err != nil {
t.Fatalf("app.Test failed: %v", err)
}
return resp
}
resp1 := makeRequest()
resp1.Body.Close()
resp2 := makeRequest()
defer resp2.Body.Close()
if resp2.Header.Get("X-Any-Hub-Cache-Hit") != "true" {
t.Fatalf("expected second request to hit cache")
}
if stub.pathHits(registryK8sOriginalManifest, http.MethodGet) != 1 {
t.Fatalf("expected original 404 path to be requested once, got %d", stub.pathHits(registryK8sOriginalManifest, http.MethodGet))
}
}
func TestRegistryK8sFallbackCacheRevalidatesEffectivePath(t *testing.T) {
stub := newRegistryK8sStub(t)
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
defer stub.Close()
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
makeRequest := func() *http.Response {
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
req.Host = "k8s.hub.local"
resp, err := app.Test(req)
if err != nil {
t.Fatalf("app.Test failed: %v", err)
}
return resp
}
resp1 := makeRequest()
resp1.Body.Close()
resp2 := makeRequest()
resp2.Body.Close()
if stub.pathHits(registryK8sOriginalManifest, http.MethodHead) != 0 {
t.Fatalf("expected no HEAD revalidation against original path, got %d", stub.pathHits(registryK8sOriginalManifest, http.MethodHead))
}
if stub.pathHits(registryK8sFallbackManifest, http.MethodHead) == 0 {
t.Fatalf("expected HEAD revalidation against fallback path")
}
}
type cacheFlowStub struct {
server *http.Server
listener net.Listener
@@ -302,6 +436,22 @@ type cacheFlowStub struct {
lastMod string
}
type registryK8sStub struct {
server *http.Server
listener net.Listener
URL string
mu sync.Mutex
responses map[string]registryK8sResponse
hits map[string]int
}
type registryK8sResponse struct {
status int
body []byte
etag string
}
func newCacheFlowStub(t *testing.T, paths ...string) *cacheFlowStub {
t.Helper()
stub := &cacheFlowStub{
@@ -337,6 +487,32 @@ func newCacheFlowStub(t *testing.T, paths ...string) *cacheFlowStub {
return stub
}
func newRegistryK8sStub(t *testing.T) *registryK8sStub {
t.Helper()
stub := &registryK8sStub{
responses: make(map[string]registryK8sResponse),
hits: make(map[string]int),
}
mux := http.NewServeMux()
mux.HandleFunc("/", stub.handle)
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Skipf("unable to start registry.k8s.io stub listener: %v", err)
}
server := &http.Server{Handler: mux}
stub.server = server
stub.listener = listener
stub.URL = "http://" + listener.Addr().String()
go func() {
_ = server.Serve(listener)
}()
return stub
}
func (s *cacheFlowStub) Close() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
@@ -348,6 +524,109 @@ func (s *cacheFlowStub) Close() {
}
}
func (s *registryK8sStub) Close() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if s.server != nil {
_ = s.server.Shutdown(ctx)
}
if s.listener != nil {
_ = s.listener.Close()
}
}
func (s *registryK8sStub) setResponse(path string, status int, body []byte) {
s.mu.Lock()
defer s.mu.Unlock()
s.responses[path] = registryK8sResponse{
status: status,
body: append([]byte(nil), body...),
etag: fmt.Sprintf(`"%s-etag"`, strings.ReplaceAll(strings.Trim(path, "/"), "/", "-")),
}
}
func (s *registryK8sStub) pathHits(path string, method string) int {
s.mu.Lock()
defer s.mu.Unlock()
return s.hits[method+" "+path]
}
func (s *registryK8sStub) handle(w http.ResponseWriter, r *http.Request) {
s.mu.Lock()
s.hits[r.Method+" "+r.URL.Path]++
resp, ok := s.responses[r.URL.Path]
s.mu.Unlock()
if !ok {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(`{"errors":[{"code":"MANIFEST_UNKNOWN","message":"manifest unknown"}]}`))
return
}
w.Header().Set("Content-Type", "application/vnd.docker.distribution.manifest.v2+json")
w.Header().Set("Etag", resp.etag)
w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat))
if r.Method == http.MethodHead {
for _, candidate := range r.Header.Values("If-None-Match") {
if strings.Trim(candidate, `"`) == strings.Trim(resp.etag, `"`) {
w.WriteHeader(http.StatusNotModified)
return
}
}
w.WriteHeader(resp.status)
return
}
w.WriteHeader(resp.status)
_, _ = w.Write(resp.body)
}
func newDockerProxyTestApp(t *testing.T, storageDir string, domain string, upstream string, proxyURLs ...string) *fiber.App {
t.Helper()
proxyURL := ""
if len(proxyURLs) > 0 {
proxyURL = proxyURLs[0]
}
cfg := &config.Config{
Global: config.GlobalConfig{
ListenPort: 5000,
CacheTTL: config.Duration(30 * time.Second),
StoragePath: storageDir,
},
Hubs: []config.HubConfig{
{
Name: "docker",
Domain: domain,
Type: "docker",
Upstream: upstream,
Proxy: proxyURL,
},
},
}
registry, err := server.NewHubRegistry(cfg)
if err != nil {
t.Fatalf("registry error: %v", err)
}
logger := logrus.New()
logger.SetOutput(io.Discard)
store, err := cache.NewStore(storageDir)
if err != nil {
t.Fatalf("store error: %v", err)
}
client := server.NewUpstreamClient(cfg)
handler := proxy.NewHandler(client, logger, store)
app, err := server.NewApp(server.AppOptions{
Logger: logger,
Registry: registry,
Proxy: handler,
ListenPort: 5000,
})
if err != nil {
t.Fatalf("app error: %v", err)
}
return app
}
func (s *cacheFlowStub) handle(w http.ResponseWriter, r *http.Request) {
s.mu.Lock()
etag := s.etag

View File

@@ -35,7 +35,6 @@ func TestCacheStrategyOverrides(t *testing.T) {
Name: "npm-ttl",
Domain: "ttl.npm.local",
Type: "npm",
Module: "npm",
Upstream: stub.URL,
CacheTTL: config.Duration(ttl),
},
@@ -69,8 +68,8 @@ func TestCacheStrategyOverrides(t *testing.T) {
}
resp2.Body.Close()
if headCount := countRequests(stub.Requests(), http.MethodHead, "/lodash"); headCount != 0 {
t.Fatalf("expected no HEAD before TTL expiry, got %d", headCount)
if headCount := countRequests(stub.Requests(), http.MethodHead, "/lodash"); headCount != 1 {
t.Fatalf("expected single HEAD before TTL expiry, got %d", headCount)
}
if getCount := countRequests(stub.Requests(), http.MethodGet, "/lodash"); getCount != 1 {
t.Fatalf("upstream should be hit once before TTL expiry, got %d", getCount)
@@ -86,8 +85,8 @@ func TestCacheStrategyOverrides(t *testing.T) {
}
resp3.Body.Close()
if headCount := countRequests(stub.Requests(), http.MethodHead, "/lodash"); headCount != 1 {
t.Fatalf("expected single HEAD after TTL expiry, got %d", headCount)
if headCount := countRequests(stub.Requests(), http.MethodHead, "/lodash"); headCount != 2 {
t.Fatalf("expected two HEAD requests after TTL expiry, got %d", headCount)
}
if getCount := countRequests(stub.Requests(), http.MethodGet, "/lodash"); getCount != 1 {
t.Fatalf("upstream GET count should remain 1, got %d", getCount)
@@ -111,7 +110,6 @@ func TestCacheStrategyOverrides(t *testing.T) {
Name: "npm-novalidation",
Domain: "novalidation.npm.local",
Type: "npm",
Module: "npm",
Upstream: stub.URL,
CacheTTL: config.Duration(ttl),
ValidationMode: string(hubmodule.ValidationModeNever),

View File

@@ -1,118 +0,0 @@
package integration
import (
"io"
"net/http/httptest"
"testing"
"time"
"github.com/gofiber/fiber/v3"
"github.com/sirupsen/logrus"
"github.com/any-hub/any-hub/internal/config"
"github.com/any-hub/any-hub/internal/hubmodule"
"github.com/any-hub/any-hub/internal/hubmodule/legacy"
"github.com/any-hub/any-hub/internal/server"
)
func TestLegacyAdapterRolloutToggle(t *testing.T) {
const moduleKey = "rollout-toggle-test"
_ = hubmodule.Register(hubmodule.ModuleMetadata{Key: moduleKey})
logger := logrus.New()
logger.SetOutput(io.Discard)
baseHub := config.HubConfig{
Name: "dual-mode",
Domain: "dual.local",
Type: "docker",
Upstream: "https://registry.npmjs.org",
Module: moduleKey,
}
testCases := []struct {
name string
rolloutFlag string
expectKey string
expectFlag legacy.RolloutFlag
}{
{
name: "force legacy",
rolloutFlag: "legacy-only",
expectKey: hubmodule.DefaultModuleKey(),
expectFlag: legacy.RolloutLegacyOnly,
},
{
name: "dual mode",
rolloutFlag: "dual",
expectKey: moduleKey,
expectFlag: legacy.RolloutDual,
},
{
name: "full modular",
rolloutFlag: "modular",
expectKey: moduleKey,
expectFlag: legacy.RolloutModular,
},
{
name: "rollback to legacy",
rolloutFlag: "legacy-only",
expectKey: hubmodule.DefaultModuleKey(),
expectFlag: legacy.RolloutLegacyOnly,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cfg := &config.Config{
Global: config.GlobalConfig{
ListenPort: 6100,
CacheTTL: config.Duration(time.Minute),
},
Hubs: []config.HubConfig{
func() config.HubConfig {
h := baseHub
h.Rollout = tc.rolloutFlag
return h
}(),
},
}
registry, err := server.NewHubRegistry(cfg)
if err != nil {
t.Fatalf("failed to build registry: %v", err)
}
recorder := &routeRecorder{}
app := mustNewApp(t, cfg.Global.ListenPort, logger, registry, recorder)
req := httptest.NewRequest("GET", "http://dual.local/v2/", nil)
req.Host = "dual.local"
resp, err := app.Test(req)
if err != nil {
t.Fatalf("request failed: %v", err)
}
if resp.StatusCode != fiber.StatusNoContent {
t.Fatalf("unexpected status: %d", resp.StatusCode)
}
if recorder.moduleKey != tc.expectKey {
t.Fatalf("expected module %s, got %s", tc.expectKey, recorder.moduleKey)
}
if recorder.rolloutFlag != tc.expectFlag {
t.Fatalf("expected rollout flag %s, got %s", tc.expectFlag, recorder.rolloutFlag)
}
})
}
}
type routeRecorder struct {
moduleKey string
rolloutFlag legacy.RolloutFlag
}
func (r *routeRecorder) Handle(c fiber.Ctx, route *server.HubRoute) error {
r.moduleKey = route.ModuleKey
r.rolloutFlag = route.RolloutFlag
return c.SendStatus(fiber.StatusNoContent)
}

View File

@@ -13,7 +13,6 @@ import (
"github.com/any-hub/any-hub/internal/config"
"github.com/any-hub/any-hub/internal/hubmodule"
"github.com/any-hub/any-hub/internal/hubmodule/legacy"
"github.com/any-hub/any-hub/internal/proxy/hooks"
"github.com/any-hub/any-hub/internal/server"
"github.com/any-hub/any-hub/internal/server/routes"
@@ -37,21 +36,17 @@ func TestModuleDiagnosticsEndpoints(t *testing.T) {
CacheTTL: config.Duration(30 * time.Minute),
},
Hubs: []config.HubConfig{
{
Name: "legacy-hub",
Domain: "legacy.local",
Type: "docker",
Upstream: "https://registry-1.docker.io",
Module: hubmodule.DefaultModuleKey(),
Rollout: string(legacy.RolloutLegacyOnly),
},
{
Name: "modern-hub",
Domain: "modern.local",
Type: "npm",
Upstream: "https://registry.npmjs.org",
Module: moduleKey,
Rollout: "dual",
},
{
Name: "docker-hub",
Domain: "docker.local",
Type: "docker",
Upstream: "https://registry-1.docker.io",
},
},
}
@@ -77,12 +72,10 @@ func TestModuleDiagnosticsEndpoints(t *testing.T) {
var payload struct {
Modules []map[string]any `json:"modules"`
Hubs []struct {
HubName string `json:"hub_name"`
ModuleKey string `json:"module_key"`
Rollout string `json:"rollout_flag"`
Domain string `json:"domain"`
Port int `json:"port"`
LegacyOnly bool `json:"legacy_only"`
HubName string `json:"hub_name"`
ModuleKey string `json:"module_key"`
Domain string `json:"domain"`
Port int `json:"port"`
} `json:"hubs"`
}
body, _ := io.ReadAll(resp.Body)
@@ -111,22 +104,13 @@ func TestModuleDiagnosticsEndpoints(t *testing.T) {
}
for _, hub := range payload.Hubs {
switch hub.HubName {
case "legacy-hub":
if hub.ModuleKey != hubmodule.DefaultModuleKey() {
t.Fatalf("legacy hub should expose legacy module, got %s", hub.ModuleKey)
}
if !hub.LegacyOnly {
t.Fatalf("legacy hub should be marked legacy_only")
}
case "modern-hub":
if hub.ModuleKey != moduleKey {
t.Fatalf("modern hub should expose %s, got %s", moduleKey, hub.ModuleKey)
if hub.ModuleKey != "npm" {
t.Fatalf("modern hub should expose npm, got %s", hub.ModuleKey)
}
if hub.Rollout != "dual" {
t.Fatalf("modern hub rollout flag should be dual, got %s", hub.Rollout)
}
if hub.LegacyOnly {
t.Fatalf("modern hub should not be marked legacy_only")
case "docker-hub":
if hub.ModuleKey != "docker" {
t.Fatalf("docker hub should expose docker, got %s", hub.ModuleKey)
}
default:
t.Fatalf("unexpected hub %s", hub.HubName)

View File

@@ -1,107 +1,100 @@
package integration
import (
"io"
"net/http/httptest"
"testing"
"io"
"net/http/httptest"
"testing"
"github.com/gofiber/fiber/v3"
"github.com/sirupsen/logrus"
"github.com/gofiber/fiber/v3"
"github.com/sirupsen/logrus"
"github.com/any-hub/any-hub/internal/config"
"github.com/any-hub/any-hub/internal/hubmodule"
"github.com/any-hub/any-hub/internal/server"
"github.com/any-hub/any-hub/internal/config"
"github.com/any-hub/any-hub/internal/server"
)
func TestModuleRoutingIsolation(t *testing.T) {
_ = hubmodule.Register(hubmodule.ModuleMetadata{Key: "module-routing-test"})
cfg := &config.Config{
Global: config.GlobalConfig{
ListenPort: 6000,
CacheTTL: config.Duration(3600),
},
Hubs: []config.HubConfig{
{
Name: "docker-hub",
Domain: "legacy.hub.local",
Type: "docker",
Upstream: "https://registry-1.docker.io",
},
{
Name: "npm-hub",
Domain: "test.hub.local",
Type: "npm",
Upstream: "https://registry.example.com",
},
},
}
cfg := &config.Config{
Global: config.GlobalConfig{
ListenPort: 6000,
CacheTTL: config.Duration(3600),
},
Hubs: []config.HubConfig{
{
Name: "legacy",
Domain: "legacy.hub.local",
Type: "docker",
Module: "legacy",
Upstream: "https://registry-1.docker.io",
},
{
Name: "test",
Domain: "test.hub.local",
Type: "npm",
Module: "module-routing-test",
Upstream: "https://registry.example.com",
},
},
}
registry, err := server.NewHubRegistry(cfg)
if err != nil {
t.Fatalf("failed to create registry: %v", err)
}
registry, err := server.NewHubRegistry(cfg)
if err != nil {
t.Fatalf("failed to create registry: %v", err)
}
logger := logrus.New()
logger.SetOutput(io.Discard)
logger := logrus.New()
logger.SetOutput(io.Discard)
recorder := &moduleRecorder{}
app := mustNewApp(t, cfg.Global.ListenPort, logger, registry, recorder)
recorder := &moduleRecorder{}
app := mustNewApp(t, cfg.Global.ListenPort, logger, registry, recorder)
legacyReq := httptest.NewRequest("GET", "http://legacy.hub.local/v2/", nil)
legacyReq.Host = "legacy.hub.local"
legacyReq.Header.Set("Host", "legacy.hub.local")
resp, err := app.Test(legacyReq)
if err != nil {
t.Fatalf("legacy request failed: %v", err)
}
if resp.StatusCode != fiber.StatusNoContent {
t.Fatalf("legacy hub should return 204, got %d", resp.StatusCode)
}
if recorder.moduleKey != "docker" {
t.Fatalf("expected docker module, got %s", recorder.moduleKey)
}
legacyReq := httptest.NewRequest("GET", "http://legacy.hub.local/v2/", nil)
legacyReq.Host = "legacy.hub.local"
legacyReq.Header.Set("Host", "legacy.hub.local")
resp, err := app.Test(legacyReq)
if err != nil {
t.Fatalf("legacy request failed: %v", err)
}
if resp.StatusCode != fiber.StatusNoContent {
t.Fatalf("legacy hub should return 204, got %d", resp.StatusCode)
}
if recorder.moduleKey != "legacy" {
t.Fatalf("expected legacy module, got %s", recorder.moduleKey)
}
testReq := httptest.NewRequest("GET", "http://test.hub.local/v2/", nil)
testReq.Host = "test.hub.local"
testReq.Header.Set("Host", "test.hub.local")
resp2, err := app.Test(testReq)
if err != nil {
t.Fatalf("test request failed: %v", err)
}
if resp2.StatusCode != fiber.StatusNoContent {
t.Fatalf("test hub should return 204, got %d", resp2.StatusCode)
}
if recorder.moduleKey != "module-routing-test" {
t.Fatalf("expected module-routing-test module, got %s", recorder.moduleKey)
}
testReq := httptest.NewRequest("GET", "http://test.hub.local/v2/", nil)
testReq.Host = "test.hub.local"
testReq.Header.Set("Host", "test.hub.local")
resp2, err := app.Test(testReq)
if err != nil {
t.Fatalf("test request failed: %v", err)
}
if resp2.StatusCode != fiber.StatusNoContent {
t.Fatalf("test hub should return 204, got %d", resp2.StatusCode)
}
if recorder.moduleKey != "npm" {
t.Fatalf("expected npm module, got %s", recorder.moduleKey)
}
}
func mustNewApp(t *testing.T, port int, logger *logrus.Logger, registry *server.HubRegistry, handler server.ProxyHandler) *fiber.App {
t.Helper()
app, err := server.NewApp(server.AppOptions{
Logger: logger,
Registry: registry,
Proxy: handler,
ListenPort: port,
})
if err != nil {
t.Fatalf("failed to create app: %v", err)
}
return app
t.Helper()
app, err := server.NewApp(server.AppOptions{
Logger: logger,
Registry: registry,
Proxy: handler,
ListenPort: port,
})
if err != nil {
t.Fatalf("failed to create app: %v", err)
}
return app
}
type moduleRecorder struct {
routeName string
moduleKey string
rollout string
routeName string
moduleKey string
}
func (p *moduleRecorder) Handle(c fiber.Ctx, route *server.HubRoute) error {
p.routeName = route.Config.Name
p.moduleKey = route.ModuleKey
p.rollout = string(route.RolloutFlag)
return c.SendStatus(fiber.StatusNoContent)
p.routeName = route.Config.Name
p.moduleKey = route.Module.Key
return c.SendStatus(fiber.StatusNoContent)
}

View File

@@ -30,6 +30,7 @@ type upstreamStub struct {
requests []RecordedRequest
mode upstreamMode
blobBytes []byte
lastMod string
}
// RecordedRequest 捕获每次请求的方法/路径/Host/Headers便于断言代理行为。
@@ -48,13 +49,14 @@ func newUpstreamStub(t *testing.T, mode upstreamMode) *upstreamStub {
stub := &upstreamStub{
mode: mode,
blobBytes: []byte("stub-layer-payload"),
lastMod: time.Now().UTC().Format(http.TimeFormat),
}
switch mode {
case upstreamDocker:
registerDockerHandlers(mux, stub.blobBytes)
case upstreamNPM:
registerNPMHandlers(mux)
registerNPMHandlers(mux, stub)
default:
t.Fatalf("unsupported stub mode: %s", mode)
}
@@ -153,9 +155,10 @@ func registerDockerHandlers(mux *http.ServeMux, blob []byte) {
})
}
func registerNPMHandlers(mux *http.ServeMux) {
func registerNPMHandlers(mux *http.ServeMux, stub *upstreamStub) {
mux.HandleFunc("/lodash", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Last-Modified", stub.lastMod)
resp := map[string]any{
"name": "lodash",
"dist-tags": map[string]string{
@@ -174,6 +177,7 @@ func registerNPMHandlers(mux *http.ServeMux) {
mux.HandleFunc("/lodash/-/lodash-4.17.21.tgz", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Last-Modified", stub.lastMod)
_, _ = w.Write([]byte("tarball-bytes"))
})
}
@@ -230,6 +234,37 @@ func TestDockerStubServesManifestAndBlob(t *testing.T) {
}
}
func TestNPMStubKeepsStableValidatorsAcrossHeadRequests(t *testing.T) {
stub := newUpstreamStub(t, upstreamNPM)
defer stub.Close()
req1, err := http.NewRequest(http.MethodHead, stub.URL+"/lodash", nil)
if err != nil {
t.Fatalf("build first HEAD request: %v", err)
}
resp1, err := http.DefaultClient.Do(req1)
if err != nil {
t.Fatalf("first HEAD failed: %v", err)
}
resp1.Body.Close()
time.Sleep(1100 * time.Millisecond)
req2, err := http.NewRequest(http.MethodHead, stub.URL+"/lodash", nil)
if err != nil {
t.Fatalf("build second HEAD request: %v", err)
}
resp2, err := http.DefaultClient.Do(req2)
if err != nil {
t.Fatalf("second HEAD failed: %v", err)
}
resp2.Body.Close()
if resp1.Header.Get("Last-Modified") != resp2.Header.Get("Last-Modified") {
t.Fatalf("expected stable Last-Modified, got %q then %q", resp1.Header.Get("Last-Modified"), resp2.Header.Get("Last-Modified"))
}
}
func TestNPMStubServesMetadataAndTarball(t *testing.T) {
stub := newUpstreamStub(t, upstreamNPM)
defer stub.Close()