Compare commits
6 Commits
bbc7204901
...
47c58ecb69
| Author | SHA1 | Date | |
|---|---|---|---|
| 47c58ecb69 | |||
| 271c8fe538 | |||
| 442c27a26d | |||
| 9c85b9b097 | |||
| 7c95ee0210 | |||
| 05d5037e97 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -20,6 +20,7 @@ Thumbs.db
|
||||
.vscode/
|
||||
.idea/
|
||||
.cache/
|
||||
.worktrees/
|
||||
storage/
|
||||
logs/
|
||||
tmp/
|
||||
|
||||
414
docs/superpowers/plans/2026-03-23-registry-k8s-compat.md
Normal file
414
docs/superpowers/plans/2026-03-23-registry-k8s-compat.md
Normal file
@@ -0,0 +1,414 @@
|
||||
# registry.k8s.io Compatibility Fallback Implementation Plan
|
||||
|
||||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||
|
||||
**Goal:** Add a `registry.k8s.io`-only manifest fallback so single-segment repos like `coredns` retry once as `coredns/coredns` after a final first-attempt `404`.
|
||||
|
||||
**Architecture:** Keep the first upstream request unchanged, then add one targeted retry in the proxy after the normal auth flow completes. Persist the effective upstream path for fallback-filled cache entries so later cache revalidation uses the path that actually produced the content instead of rechecking the original `404` path.
|
||||
|
||||
**Tech Stack:** Go 1.25, Fiber v3, internal docker hooks, internal proxy handler, filesystem cache store, Go tests
|
||||
|
||||
---
|
||||
|
||||
## File map
|
||||
|
||||
- Modify: `internal/hubmodule/docker/hooks.go`
|
||||
- add `registry.k8s.io` host detection and manifest fallback path derivation helpers
|
||||
- Modify: `internal/hubmodule/docker/hooks_test.go`
|
||||
- add unit tests for host gating and manifest fallback derivation rules
|
||||
- Modify: `internal/cache/store.go`
|
||||
- extend cache entry/options to carry optional effective upstream path metadata
|
||||
- Modify: `internal/cache/fs_store.go`
|
||||
- persist and load cache metadata for effective upstream path
|
||||
- Modify: `internal/cache/store_test.go`
|
||||
- verify metadata round-trip behavior and remove behavior
|
||||
- Modify: `internal/proxy/handler.go`
|
||||
- Modify: `internal/proxy/handler_test.go`
|
||||
- add one-shot `registry.k8s.io` fallback retry after auth retry flow
|
||||
- write/read effective upstream path metadata for fallback-backed cache entries
|
||||
- use effective upstream path during revalidation
|
||||
- Modify: `tests/integration/cache_flow_test.go`
|
||||
- add integration coverage for fallback success, no-fallback success, and cache/revalidation behavior
|
||||
|
||||
### Task 1: Add docker fallback derivation helpers
|
||||
|
||||
**Files:**
|
||||
- Modify: `internal/hubmodule/docker/hooks.go`
|
||||
- Test: `internal/hubmodule/docker/hooks_test.go`
|
||||
|
||||
- [ ] **Step 1: Write the failing unit tests**
|
||||
|
||||
```go
|
||||
func TestIsRegistryK8sHost(t *testing.T) {
|
||||
if !isRegistryK8sHost("registry.k8s.io") {
|
||||
t.Fatalf("expected registry.k8s.io to match")
|
||||
}
|
||||
if isRegistryK8sHost("example.com") {
|
||||
t.Fatalf("expected non-registry.k8s.io host to be ignored")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistryK8sManifestFallbackPath(t *testing.T) {
|
||||
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
|
||||
path, ok := manifestFallbackPath(ctx, "/v2/coredns/manifests/v1.13.1")
|
||||
if !ok || path != "/v2/coredns/coredns/manifests/v1.13.1" {
|
||||
t.Fatalf("expected fallback path, got %q ok=%v", path, ok)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistryK8sManifestFallbackPathRejectsMultiSegmentRepo(t *testing.T) {
|
||||
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
|
||||
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/coredns/manifests/v1.13.1"); ok {
|
||||
t.Fatalf("expected multi-segment repo to be ignored")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistryK8sManifestFallbackPathRejectsNonManifest(t *testing.T) {
|
||||
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
|
||||
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/blobs/sha256:deadbeef"); ok {
|
||||
t.Fatalf("expected non-manifest path to be ignored")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistryK8sManifestFallbackPathRejectsNonRegistryHost(t *testing.T) {
|
||||
ctx := &hooks.RequestContext{UpstreamHost: "mirror.gcr.io"}
|
||||
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/manifests/v1.13.1"); ok {
|
||||
t.Fatalf("expected non-registry.k8s.io host to be ignored")
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `go test ./internal/hubmodule/docker -run 'TestIsRegistryK8sHost|TestRegistryK8sManifestFallbackPath' -count=1`
|
||||
Expected: FAIL because `isRegistryK8sHost` and `manifestFallbackPath` do not exist yet.
|
||||
|
||||
- [ ] **Step 3: Write minimal implementation**
|
||||
|
||||
```go
|
||||
func isRegistryK8sHost(host string) bool {
|
||||
if parsedHost, _, err := net.SplitHostPort(host); err == nil {
|
||||
host = parsedHost
|
||||
}
|
||||
return strings.EqualFold(host, "registry.k8s.io")
|
||||
}
|
||||
|
||||
func manifestFallbackPath(ctx *hooks.RequestContext, clean string) (string, bool) {
|
||||
if ctx == nil || !isRegistryK8sHost(ctx.UpstreamHost) {
|
||||
return "", false
|
||||
}
|
||||
repo, rest, ok := splitDockerRepoPath(clean)
|
||||
if !ok || strings.Count(repo, "/") != 0 || !strings.HasPrefix(rest, "/manifests/") {
|
||||
return "", false
|
||||
}
|
||||
return "/v2/" + repo + "/" + repo + rest, true
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Run test to verify it passes**
|
||||
|
||||
Run: `go test ./internal/hubmodule/docker -count=1`
|
||||
Expected: PASS
|
||||
|
||||
- [ ] **Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add internal/hubmodule/docker/hooks.go internal/hubmodule/docker/hooks_test.go
|
||||
git commit -m "test: cover registry k8s fallback path derivation"
|
||||
```
|
||||
|
||||
### Task 2: Persist effective upstream path in cache metadata
|
||||
|
||||
**Files:**
|
||||
- Modify: `internal/cache/store.go`
|
||||
- Modify: `internal/cache/fs_store.go`
|
||||
- Test: `internal/cache/store_test.go`
|
||||
|
||||
- [ ] **Step 1: Write the failing cache metadata test**
|
||||
|
||||
```go
|
||||
func TestStorePersistsEffectiveUpstreamPath(t *testing.T) {
|
||||
store, err := NewStore(t.TempDir())
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore: %v", err)
|
||||
}
|
||||
loc := Locator{HubName: "docker", Path: "/v2/coredns/manifests/v1.13.1"}
|
||||
_, err = store.Put(context.Background(), loc, strings.NewReader("body"), PutOptions{
|
||||
EffectiveUpstreamPath: "/v2/coredns/coredns/manifests/v1.13.1",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Put: %v", err)
|
||||
}
|
||||
got, err := store.Get(context.Background(), loc)
|
||||
if err != nil {
|
||||
t.Fatalf("Get: %v", err)
|
||||
}
|
||||
if got.Entry.EffectiveUpstreamPath != "/v2/coredns/coredns/manifests/v1.13.1" {
|
||||
t.Fatalf("unexpected effective path: %q", got.Entry.EffectiveUpstreamPath)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `go test ./internal/cache -run TestStorePersistsEffectiveUpstreamPath -count=1`
|
||||
Expected: FAIL because cache entry metadata does not yet include effective upstream path.
|
||||
|
||||
- [ ] **Step 3: Write minimal implementation**
|
||||
|
||||
```go
|
||||
type PutOptions struct {
|
||||
ModTime time.Time
|
||||
EffectiveUpstreamPath string
|
||||
}
|
||||
|
||||
type Entry struct {
|
||||
Locator Locator `json:"locator"`
|
||||
FilePath string `json:"file_path"`
|
||||
SizeBytes int64 `json:"size_bytes"`
|
||||
ModTime time.Time `json:"mod_time"`
|
||||
EffectiveUpstreamPath string `json:"effective_upstream_path,omitempty"`
|
||||
}
|
||||
```
|
||||
|
||||
Implementation notes:
|
||||
- store metadata next to the cached body as a small JSON file such as `<entry>.meta`
|
||||
- on `Get`, load metadata if present and populate `Entry.EffectiveUpstreamPath`
|
||||
- on `Put`, write metadata atomically only when `EffectiveUpstreamPath` is non-empty
|
||||
- on `Remove`, delete both body and metadata files
|
||||
|
||||
- [ ] **Step 4: Run test to verify it passes**
|
||||
|
||||
Run: `go test ./internal/cache -count=1`
|
||||
Expected: PASS
|
||||
|
||||
- [ ] **Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add internal/cache/store.go internal/cache/fs_store.go internal/cache/store_test.go
|
||||
git commit -m "feat: persist cache effective upstream path"
|
||||
```
|
||||
|
||||
### Task 3: Add one-shot fallback retry in proxy fetch flow
|
||||
|
||||
**Files:**
|
||||
- Modify: `internal/proxy/handler.go`
|
||||
- Modify: `internal/hubmodule/docker/hooks.go`
|
||||
- Test: `tests/integration/cache_flow_test.go`
|
||||
|
||||
- [ ] **Step 1: Write the failing integration test for fallback success**
|
||||
|
||||
```go
|
||||
func TestRegistryK8sManifestFallbackRetry(t *testing.T) {
|
||||
// stub returns 404 for /v2/coredns/manifests/v1.13.1
|
||||
// stub returns 200 for /v2/coredns/coredns/manifests/v1.13.1
|
||||
// request /v2/coredns/manifests/v1.13.1 through proxy
|
||||
// expect 200 and exactly two upstream hits
|
||||
}
|
||||
|
||||
func TestRegistryK8sManifestFallbackNotAttemptedWhenOriginalSucceeds(t *testing.T) {
|
||||
// stub returns 200 for original path
|
||||
// request original path
|
||||
// expect 200 and only one upstream hit
|
||||
}
|
||||
|
||||
func TestRegistryK8sManifestFallbackNotAttemptedForNonRegistryHost(t *testing.T) {
|
||||
// non-registry.k8s.io upstream returns 404 for original path
|
||||
// request original path
|
||||
// expect final 404 and no derived path hit
|
||||
}
|
||||
|
||||
func TestRegistryK8sManifestFallbackSecondRequestHitsCache(t *testing.T) {
|
||||
// first GET falls back and succeeds
|
||||
// second identical GET should be cache hit
|
||||
// expect no new hit to original 404 path on second request
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `go test ./tests/integration -run 'TestRegistryK8sManifestFallbackRetry|TestRegistryK8sManifestFallbackNotAttemptedWhenOriginalSucceeds|TestRegistryK8sManifestFallbackNotAttemptedForNonRegistryHost|TestRegistryK8sManifestFallbackSecondRequestHitsCache' -count=1`
|
||||
Expected: FAIL because the fallback tests were written before the retry logic exists.
|
||||
|
||||
- [ ] **Step 3: Write minimal implementation**
|
||||
|
||||
```go
|
||||
resp, upstreamURL, err := h.executeRequest(c, route, hook)
|
||||
resp, upstreamURL, err = h.retryOnAuthFailure(c, route, requestID, started, resp, upstreamURL, hook)
|
||||
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
if fallbackHook, fallbackURL, ok := h.registryK8sFallbackAttempt(c, route, hook); ok {
|
||||
resp.Body.Close()
|
||||
resp, upstreamURL, err = h.executeRequest(c, route, fallbackHook)
|
||||
if err == nil {
|
||||
resp, upstreamURL, err = h.retryOnAuthFailure(c, route, requestID, started, resp, upstreamURL, fallbackHook)
|
||||
}
|
||||
fallbackEffectivePath = fallbackURL.Path
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Implementation notes:
|
||||
- in `internal/hubmodule/docker/hooks.go`, keep only pure path helpers such as `isRegistryK8sHost` and `manifestFallbackPath`
|
||||
- in `internal/proxy/handler.go`, add the retry orchestration helper that clones the current hook state with the derived fallback path when `manifestFallbackPath` returns true
|
||||
- only evaluate fallback after `retryOnAuthFailure` completes and only for final `404`
|
||||
- close the first response body before the retry
|
||||
- if fallback succeeds with `200`, pass `EffectiveUpstreamPath` into cache `PutOptions`
|
||||
- if fallback returns non-`200`, pass that second upstream response through unchanged
|
||||
- emit a structured fallback log event from `internal/proxy/handler.go` with hub name, domain, upstream host, original path, fallback path, original status, and request method
|
||||
|
||||
- [ ] **Step 4: Run test to verify it passes**
|
||||
|
||||
Run: `go test ./tests/integration -run 'TestRegistryK8sManifestFallbackRetry|TestRegistryK8sManifestFallbackNotAttemptedWhenOriginalSucceeds|TestRegistryK8sManifestFallbackNotAttemptedForNonRegistryHost|TestRegistryK8sManifestFallbackSecondRequestHitsCache' -count=1`
|
||||
Expected: PASS
|
||||
|
||||
- [ ] **Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add internal/proxy/handler.go tests/integration/cache_flow_test.go internal/hubmodule/docker/hooks.go
|
||||
git commit -m "feat: retry registry k8s manifest fallback"
|
||||
```
|
||||
|
||||
### Task 4: Revalidate cached fallback entries against the effective upstream path
|
||||
|
||||
**Files:**
|
||||
- Modify: `internal/proxy/handler.go`
|
||||
- Modify: `tests/integration/cache_flow_test.go`
|
||||
|
||||
- [ ] **Step 1: Write the failing revalidation regression test**
|
||||
|
||||
```go
|
||||
func TestRegistryK8sFallbackCacheRevalidatesEffectivePath(t *testing.T) {
|
||||
// first GET: original path 404, fallback path 200, response cached
|
||||
// second GET: cache hit path should revalidate against fallback path only
|
||||
// assert original 404 path is not re-requested during revalidation
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `go test ./tests/integration -run TestRegistryK8sFallbackCacheRevalidatesEffectivePath -count=1`
|
||||
Expected: FAIL because revalidation still targets the original client-facing path.
|
||||
|
||||
- [ ] **Step 3: Write minimal implementation**
|
||||
|
||||
```go
|
||||
func effectiveRevalidateURL(route *server.HubRoute, c fiber.Ctx, entry cache.Entry, hook *hookState) *url.URL {
|
||||
if entry.EffectiveUpstreamPath == "" {
|
||||
return resolveUpstreamURL(route, route.UpstreamURL, c, hook)
|
||||
}
|
||||
clone := *route.UpstreamURL
|
||||
clone.Path = entry.EffectiveUpstreamPath
|
||||
clone.RawPath = entry.EffectiveUpstreamPath
|
||||
return &clone
|
||||
}
|
||||
```
|
||||
|
||||
Implementation notes:
|
||||
- use `Entry.EffectiveUpstreamPath` in both `isCacheFresh` and cached `HEAD` handling inside `serveCache`
|
||||
- do not re-derive fallback during revalidation when metadata already specifies the effective path
|
||||
- keep client-visible cache key unchanged
|
||||
|
||||
- [ ] **Step 4: Run test to verify it passes**
|
||||
|
||||
Run: `go test ./tests/integration -run TestRegistryK8sFallbackCacheRevalidatesEffectivePath -count=1`
|
||||
Expected: PASS
|
||||
|
||||
- [ ] **Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add internal/proxy/handler.go tests/integration/cache_flow_test.go
|
||||
git commit -m "fix: revalidate registry k8s fallback cache entries"
|
||||
```
|
||||
|
||||
### Task 5: Emit and verify fallback structured logging
|
||||
|
||||
**Files:**
|
||||
- Modify: `internal/proxy/handler.go`
|
||||
- Test: `internal/proxy/handler_test.go`
|
||||
|
||||
- [ ] **Step 1: Write the failing logging test**
|
||||
|
||||
```go
|
||||
func TestRegistryK8sManifestFallbackLogsStructuredEvent(t *testing.T) {
|
||||
// configure handler with a bytes.Buffer-backed logrus logger
|
||||
// trigger fallback success for /v2/coredns/manifests/v1.13.1
|
||||
// assert log output contains event name plus fields:
|
||||
// hub, domain, upstream host, original path, fallback path, original status, method
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `go test ./internal/proxy -run TestRegistryK8sManifestFallbackLogsStructuredEvent -count=1`
|
||||
Expected: FAIL because the fallback log event does not exist yet.
|
||||
|
||||
- [ ] **Step 3: Write minimal implementation**
|
||||
|
||||
```go
|
||||
func (h *Handler) logRegistryK8sFallback(route *server.HubRoute, requestID string, originalPath string, fallbackPath string, originalStatus int, method string) {
|
||||
fields := logging.RequestFields(
|
||||
route.Config.Name,
|
||||
route.Config.Domain,
|
||||
route.Config.Type,
|
||||
route.Config.AuthMode(),
|
||||
route.Module.Key,
|
||||
false,
|
||||
)
|
||||
fields["action"] = "proxy_fallback"
|
||||
fields["upstream_host"] = route.UpstreamURL.Host
|
||||
fields["original_path"] = originalPath
|
||||
fields["fallback_path"] = fallbackPath
|
||||
fields["original_status"] = originalStatus
|
||||
fields["method"] = method
|
||||
h.logger.WithFields(fields).Info("proxy_registry_k8s_fallback")
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Run test to verify it passes**
|
||||
|
||||
Run: `go test ./internal/proxy -run TestRegistryK8sManifestFallbackLogsStructuredEvent -count=1`
|
||||
Expected: PASS
|
||||
|
||||
- [ ] **Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add internal/proxy/handler.go internal/proxy/handler_test.go
|
||||
git commit -m "test: cover registry k8s fallback logging"
|
||||
```
|
||||
|
||||
### Task 6: Final verification
|
||||
|
||||
**Files:**
|
||||
- Verify: `internal/hubmodule/docker/hooks.go`
|
||||
- Verify: `internal/cache/store.go`
|
||||
- Verify: `internal/cache/fs_store.go`
|
||||
- Verify: `internal/proxy/handler.go`
|
||||
- Verify: `tests/integration/cache_flow_test.go`
|
||||
|
||||
- [ ] **Step 1: Run focused package tests**
|
||||
|
||||
Run: `go test ./internal/hubmodule/docker ./internal/cache ./tests/integration -count=1`
|
||||
Expected: PASS
|
||||
|
||||
- [ ] **Step 2: Run full test suite**
|
||||
|
||||
Run: `go test ./... -count=1`
|
||||
Expected: PASS
|
||||
|
||||
- [ ] **Step 3: Inspect git diff**
|
||||
|
||||
Run: `git diff --stat`
|
||||
Expected: only the planned files changed for this feature.
|
||||
|
||||
- [ ] **Step 4: Verify fallback logging is present**
|
||||
|
||||
Run: `go test ./internal/proxy -run TestRegistryK8sManifestFallbackLogsStructuredEvent -count=1`
|
||||
Expected: PASS, and the test explicitly checks the structured fallback log event fields.
|
||||
|
||||
- [ ] **Step 5: Commit final polish if needed**
|
||||
|
||||
```bash
|
||||
git add internal/hubmodule/docker/hooks.go internal/hubmodule/docker/hooks_test.go internal/cache/store.go internal/cache/fs_store.go internal/cache/store_test.go internal/proxy/handler.go internal/proxy/handler_test.go tests/integration/cache_flow_test.go
|
||||
git commit -m "feat: add registry k8s manifest fallback compatibility"
|
||||
```
|
||||
181
docs/superpowers/specs/2026-03-23-registry-k8s-compat-design.md
Normal file
181
docs/superpowers/specs/2026-03-23-registry-k8s-compat-design.md
Normal file
@@ -0,0 +1,181 @@
|
||||
# registry.k8s.io coredns fallback design
|
||||
|
||||
## Goal
|
||||
|
||||
Add a narrow compatibility fallback for `registry.k8s.io` so requests like
|
||||
`k8s.hub.ipao.vip/coredns:v1.13.1` can retry as
|
||||
`k8s.hub.ipao.vip/coredns/coredns:v1.13.1` when the original manifest lookup is
|
||||
not found upstream.
|
||||
|
||||
## Scope
|
||||
|
||||
This compatibility applies only when all of the following are true:
|
||||
|
||||
- hub module is `docker`
|
||||
- upstream host is exactly `registry.k8s.io`
|
||||
- requested repository name is a single segment such as `coredns`
|
||||
- request targets `/manifests/<ref>`
|
||||
- the first upstream request clearly fails as not found
|
||||
|
||||
Out of scope:
|
||||
|
||||
- changing behavior for Docker Hub
|
||||
- changing behavior for other registries
|
||||
- eagerly rewriting every single-segment repo under `registry.k8s.io`
|
||||
- retrying more than one alternate repository form
|
||||
- applying the initial compatibility behavior to blobs, tags, or referrers
|
||||
|
||||
## Recommended approach
|
||||
|
||||
Keep the current request path unchanged on the first attempt. After the normal
|
||||
auth flow completes for that path, if the final first-attempt response from
|
||||
`registry.k8s.io` is `404` for a single-segment manifest repository, perform
|
||||
one internal retry against the derived path `/v2/<repo>/<repo>/...` and return
|
||||
the retry result when it succeeds.
|
||||
|
||||
This keeps the existing behavior stable for paths that already work and limits
|
||||
the compatibility behavior to the known `coredns -> coredns/coredns` style case.
|
||||
|
||||
## Alternatives considered
|
||||
|
||||
### 1. Always rewrite single-segment repos to `<repo>/<repo>`
|
||||
|
||||
Rejected because it would change successful existing paths and could break valid
|
||||
single-segment repositories on `registry.k8s.io`.
|
||||
|
||||
### 2. Probe both paths before deciding
|
||||
|
||||
Rejected because it doubles upstream traffic and adds more complexity than the
|
||||
known compatibility problem needs.
|
||||
|
||||
## Request flow
|
||||
|
||||
1. Receive docker request and run the existing normalization logic.
|
||||
2. Send the first upstream request using the original normalized path.
|
||||
3. Complete the existing auth retry behavior for that path first.
|
||||
4. If the final response is successful, continue as today.
|
||||
5. If the final response is `404`, check whether the request is eligible for
|
||||
the `registry.k8s.io` fallback.
|
||||
6. Close the first response body before retrying.
|
||||
7. If eligible, derive a second path by transforming `/v2/<repo>/...` into
|
||||
`/v2/<repo>/<repo>/...`.
|
||||
8. Retry exactly once against the derived path, reusing the same method and the
|
||||
same auth behavior used by the normal upstream flow.
|
||||
9. If the retry succeeds, use that response and mark the cache entry with the
|
||||
effective upstream path that produced the content.
|
||||
10. If the retry also returns an upstream response, pass that second response
|
||||
through unchanged.
|
||||
11. If the retry fails with a transport error, return the existing proxy error
|
||||
response path and do not continue retrying.
|
||||
|
||||
## Eligibility rules
|
||||
|
||||
The fallback should only trigger when all checks pass:
|
||||
|
||||
- upstream host matches `registry.k8s.io`
|
||||
- request path parses as `/v2/<repo>/manifests/<ref>`
|
||||
- repository name contains exactly one segment before `manifests`
|
||||
- repository is not already `<repo>/<repo>`
|
||||
- final response after normal auth handling is HTTP `404`
|
||||
|
||||
For the initial implementation, the trigger is status-based and limited to HTTP
|
||||
`404`.
|
||||
|
||||
## Component changes
|
||||
|
||||
### `internal/hubmodule/docker/hooks.go`
|
||||
|
||||
- add helper logic to identify `registry.k8s.io`
|
||||
- add helper logic to derive the duplicate-segment manifest path for eligible
|
||||
requests
|
||||
|
||||
### `internal/proxy/handler.go`
|
||||
|
||||
- add a targeted retry path after the first upstream response is received and
|
||||
before the response is finalized
|
||||
- keep the retry count fixed at one alternate attempt
|
||||
- complete the normal auth retry flow before evaluating fallback eligibility
|
||||
- persist the effective upstream path when fallback succeeds so future cache
|
||||
revalidation can target the path that actually produced the cached content
|
||||
|
||||
The retry should reuse the same auth, request method, response handling, and
|
||||
streaming behavior as the normal upstream path.
|
||||
|
||||
### Cache metadata
|
||||
|
||||
- extend cached metadata for fallback-derived entries with an optional effective
|
||||
upstream path field
|
||||
- when a cached entry was populated through fallback, revalidate it against that
|
||||
effective upstream path rather than the original client-facing path
|
||||
- do not apply the fallback derivation again during revalidation if an effective
|
||||
upstream path is already recorded
|
||||
|
||||
## Caching behavior
|
||||
|
||||
- first successful response remains cacheable under the locator path that served
|
||||
the client request
|
||||
- if fallback succeeds, cache the successful fallback response under the client
|
||||
request path so the next identical client request can hit cache directly
|
||||
- also persist the effective upstream path used to fill that cache entry
|
||||
- do not cache the failed first attempt separately
|
||||
|
||||
This preserves the client-visible repository path while allowing the proxy to
|
||||
internally source content from the alternate upstream location.
|
||||
|
||||
## Error handling
|
||||
|
||||
- if fallback path derivation is not possible, return the original response
|
||||
- if the fallback request errors at the transport layer, log the retry attempt
|
||||
and return the existing proxy error response path
|
||||
- never loop between variants
|
||||
|
||||
## Logging
|
||||
|
||||
Add one targeted structured log event for visibility when the fallback is used,
|
||||
including:
|
||||
|
||||
- hub name
|
||||
- domain
|
||||
- upstream host
|
||||
- original path
|
||||
- fallback path
|
||||
- original status
|
||||
- request method
|
||||
|
||||
This makes it easy to confirm that `registry.k8s.io` compatibility is active
|
||||
without changing normal logging volume for unrelated registries.
|
||||
|
||||
## Tests
|
||||
|
||||
### Unit tests
|
||||
|
||||
Add docker hook tests for:
|
||||
|
||||
- recognizing `registry.k8s.io`
|
||||
- deriving `/v2/coredns/manifests/v1.13.1` to
|
||||
`/v2/coredns/coredns/manifests/v1.13.1`
|
||||
- refusing fallback for already multi-segment repositories
|
||||
- refusing fallback for non-`registry.k8s.io` hosts
|
||||
- refusing fallback for non-manifest paths
|
||||
|
||||
### Integration tests
|
||||
|
||||
Add proxy tests for:
|
||||
|
||||
- original path returns `404`, fallback path returns `200` -> final client
|
||||
response is `200`
|
||||
- original path returns `200` -> fallback is not attempted
|
||||
- non-`registry.k8s.io` upstream returns `404` -> fallback is not attempted
|
||||
- fallback succeeds, second identical request is served from cache without
|
||||
re-fetching the original `404` path
|
||||
- fallback-derived cached entry revalidates against the recorded effective
|
||||
upstream path instead of the original client-facing path
|
||||
|
||||
## Success criteria
|
||||
|
||||
- `k8s.hub.ipao.vip/coredns:v1.13.1` can succeed through fallback when the
|
||||
upstream only exposes `coredns/coredns`
|
||||
- existing Docker Hub behavior remains unchanged
|
||||
- existing `registry.k8s.io` paths that already work continue to use the
|
||||
original path first
|
||||
- fallback is isolated to `registry.k8s.io`
|
||||
75
internal/cache/fs_store.go
vendored
75
internal/cache/fs_store.go
vendored
@@ -2,6 +2,7 @@ package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -49,6 +50,10 @@ type entryLock struct {
|
||||
refs int
|
||||
}
|
||||
|
||||
type entryMetadata struct {
|
||||
EffectiveUpstreamPath string `json:"effective_upstream_path,omitempty"`
|
||||
}
|
||||
|
||||
func (s *fileStore) Get(ctx context.Context, locator Locator) (*ReadResult, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -86,6 +91,12 @@ func (s *fileStore) Get(ctx context.Context, locator Locator) (*ReadResult, erro
|
||||
SizeBytes: info.Size(),
|
||||
ModTime: info.ModTime(),
|
||||
}
|
||||
if metadata, err := s.readMetadata(filePath); err == nil {
|
||||
entry.EffectiveUpstreamPath = metadata.EffectiveUpstreamPath
|
||||
} else if !errors.Is(err, fs.ErrNotExist) {
|
||||
file.Close()
|
||||
return nil, err
|
||||
}
|
||||
return &ReadResult{Entry: entry, Reader: file}, nil
|
||||
}
|
||||
|
||||
@@ -133,12 +144,16 @@ func (s *fileStore) Put(ctx context.Context, locator Locator, body io.Reader, op
|
||||
if err := os.Chtimes(filePath, modTime, modTime); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := s.writeMetadata(filePath, opts.EffectiveUpstreamPath); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
entry := Entry{
|
||||
Locator: locator,
|
||||
FilePath: filePath,
|
||||
SizeBytes: written,
|
||||
ModTime: modTime,
|
||||
Locator: locator,
|
||||
FilePath: filePath,
|
||||
SizeBytes: written,
|
||||
ModTime: modTime,
|
||||
EffectiveUpstreamPath: opts.EffectiveUpstreamPath,
|
||||
}
|
||||
return &entry, nil
|
||||
}
|
||||
@@ -157,6 +172,54 @@ func (s *fileStore) Remove(ctx context.Context, locator Locator) error {
|
||||
if err := os.Remove(filePath); err != nil && !errors.Is(err, fs.ErrNotExist) {
|
||||
return err
|
||||
}
|
||||
if err := os.Remove(metadataPath(filePath)); err != nil && !errors.Is(err, fs.ErrNotExist) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *fileStore) readMetadata(filePath string) (entryMetadata, error) {
|
||||
raw, err := os.ReadFile(metadataPath(filePath))
|
||||
if err != nil {
|
||||
return entryMetadata{}, err
|
||||
}
|
||||
var metadata entryMetadata
|
||||
if err := json.Unmarshal(raw, &metadata); err != nil {
|
||||
return entryMetadata{}, err
|
||||
}
|
||||
return metadata, nil
|
||||
}
|
||||
|
||||
func (s *fileStore) writeMetadata(filePath string, effectiveUpstreamPath string) error {
|
||||
metaFilePath := metadataPath(filePath)
|
||||
if effectiveUpstreamPath == "" {
|
||||
if err := os.Remove(metaFilePath); err != nil && !errors.Is(err, fs.ErrNotExist) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
data, err := json.Marshal(entryMetadata{EffectiveUpstreamPath: effectiveUpstreamPath})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tempFile, err := os.CreateTemp(filepath.Dir(metaFilePath), ".cache-meta-*")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tempName := tempFile.Name()
|
||||
if _, err := tempFile.Write(data); err != nil {
|
||||
tempFile.Close()
|
||||
_ = os.Remove(tempName)
|
||||
return err
|
||||
}
|
||||
if err := tempFile.Close(); err != nil {
|
||||
_ = os.Remove(tempName)
|
||||
return err
|
||||
}
|
||||
if err := os.Rename(tempName, metaFilePath); err != nil {
|
||||
_ = os.Remove(tempName)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -248,3 +311,7 @@ func copyWithContext(ctx context.Context, dst io.Writer, src io.Reader) (int64,
|
||||
func locatorKey(locator Locator) string {
|
||||
return locator.HubName + "::" + locator.Path
|
||||
}
|
||||
|
||||
func metadataPath(filePath string) string {
|
||||
return filePath + ".meta"
|
||||
}
|
||||
|
||||
12
internal/cache/store.go
vendored
12
internal/cache/store.go
vendored
@@ -26,7 +26,8 @@ type Store interface {
|
||||
|
||||
// PutOptions 控制写入过程中的可选属性。
|
||||
type PutOptions struct {
|
||||
ModTime time.Time
|
||||
ModTime time.Time
|
||||
EffectiveUpstreamPath string
|
||||
}
|
||||
|
||||
// Locator 唯一定位一个缓存条目(Hub + 相对路径),所有路径均为 URL 路径风格。
|
||||
@@ -37,10 +38,11 @@ type Locator struct {
|
||||
|
||||
// Entry 表示一次缓存命中结果,包含绝对文件路径及文件信息。
|
||||
type Entry struct {
|
||||
Locator Locator `json:"locator"`
|
||||
FilePath string `json:"file_path"`
|
||||
SizeBytes int64 `json:"size_bytes"`
|
||||
ModTime time.Time
|
||||
Locator Locator `json:"locator"`
|
||||
FilePath string `json:"file_path"`
|
||||
SizeBytes int64 `json:"size_bytes"`
|
||||
ModTime time.Time `json:"mod_time"`
|
||||
EffectiveUpstreamPath string `json:"effective_upstream_path,omitempty"`
|
||||
}
|
||||
|
||||
// ReadResult 组合 Entry 与正文 Reader,便于代理层直接将 Body 流式返回。
|
||||
|
||||
23
internal/cache/store_test.go
vendored
23
internal/cache/store_test.go
vendored
@@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
@@ -62,6 +63,28 @@ func TestStoreRemove(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorePersistsEffectiveUpstreamPath(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
locator := Locator{HubName: "docker", Path: "/v2/coredns/manifests/v1.13.1"}
|
||||
|
||||
_, err := store.Put(context.Background(), locator, strings.NewReader("body"), PutOptions{
|
||||
EffectiveUpstreamPath: "/v2/coredns/coredns/manifests/v1.13.1",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("put error: %v", err)
|
||||
}
|
||||
|
||||
result, err := store.Get(context.Background(), locator)
|
||||
if err != nil {
|
||||
t.Fatalf("get error: %v", err)
|
||||
}
|
||||
defer result.Reader.Close()
|
||||
|
||||
if result.Entry.EffectiveUpstreamPath != "/v2/coredns/coredns/manifests/v1.13.1" {
|
||||
t.Fatalf("unexpected effective upstream path: %q", result.Entry.EffectiveUpstreamPath)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStoreIgnoresDirectories(t *testing.T) {
|
||||
store := newTestStore(t)
|
||||
locator := Locator{HubName: "ghcr", Path: "/v2"}
|
||||
|
||||
@@ -69,6 +69,28 @@ func isDockerHubHost(host string) bool {
|
||||
}
|
||||
}
|
||||
|
||||
func isRegistryK8sHost(host string) bool {
|
||||
if parsedHost, _, err := net.SplitHostPort(host); err == nil {
|
||||
host = parsedHost
|
||||
}
|
||||
return strings.EqualFold(host, "registry.k8s.io")
|
||||
}
|
||||
|
||||
func manifestFallbackPath(ctx *hooks.RequestContext, clean string) (string, bool) {
|
||||
if ctx == nil || !isRegistryK8sHost(ctx.UpstreamHost) {
|
||||
return "", false
|
||||
}
|
||||
repo, rest, ok := splitDockerRepoPath(clean)
|
||||
if !ok || strings.Contains(repo, "/") || !strings.HasPrefix(rest, "/manifests/") {
|
||||
return "", false
|
||||
}
|
||||
return "/v2/" + repo + "/" + repo + rest, true
|
||||
}
|
||||
|
||||
func RegistryK8sManifestFallbackPath(ctx *hooks.RequestContext, clean string) (string, bool) {
|
||||
return manifestFallbackPath(ctx, clean)
|
||||
}
|
||||
|
||||
func splitDockerRepoPath(path string) (string, string, bool) {
|
||||
if !strings.HasPrefix(path, "/v2/") {
|
||||
return "", "", false
|
||||
|
||||
@@ -27,6 +27,44 @@ func TestNormalizePathSkipsLibraryForNonDockerHub(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsRegistryK8sHost(t *testing.T) {
|
||||
if !isRegistryK8sHost("registry.k8s.io") {
|
||||
t.Fatalf("expected registry.k8s.io to match")
|
||||
}
|
||||
if isRegistryK8sHost("example.com") {
|
||||
t.Fatalf("expected non-registry.k8s.io host to be ignored")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistryK8sManifestFallbackPath(t *testing.T) {
|
||||
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
|
||||
path, ok := manifestFallbackPath(ctx, "/v2/coredns/manifests/v1.13.1")
|
||||
if !ok || path != "/v2/coredns/coredns/manifests/v1.13.1" {
|
||||
t.Fatalf("expected fallback path, got %q ok=%v", path, ok)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistryK8sManifestFallbackPathRejectsMultiSegmentRepo(t *testing.T) {
|
||||
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
|
||||
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/coredns/manifests/v1.13.1"); ok {
|
||||
t.Fatalf("expected multi-segment repo to be ignored")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistryK8sManifestFallbackPathRejectsNonManifest(t *testing.T) {
|
||||
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
|
||||
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/blobs/sha256:deadbeef"); ok {
|
||||
t.Fatalf("expected non-manifest path to be ignored")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistryK8sManifestFallbackPathRejectsNonRegistryHost(t *testing.T) {
|
||||
ctx := &hooks.RequestContext{UpstreamHost: "mirror.gcr.io"}
|
||||
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/manifests/v1.13.1"); ok {
|
||||
t.Fatalf("expected non-registry.k8s.io host to be ignored")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSplitDockerRepoPath(t *testing.T) {
|
||||
repo, rest, ok := splitDockerRepoPath("/v2/library/nginx/manifests/latest")
|
||||
if !ok || repo != "library/nginx" || rest != "/manifests/latest" {
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
|
||||
"github.com/any-hub/any-hub/internal/cache"
|
||||
"github.com/any-hub/any-hub/internal/hubmodule"
|
||||
dockermodule "github.com/any-hub/any-hub/internal/hubmodule/docker"
|
||||
"github.com/any-hub/any-hub/internal/logging"
|
||||
"github.com/any-hub/any-hub/internal/proxy/hooks"
|
||||
"github.com/any-hub/any-hub/internal/server"
|
||||
@@ -218,7 +219,7 @@ func (h *Handler) serveCache(
|
||||
}
|
||||
|
||||
if shouldRevalidate {
|
||||
if resp, err := h.revalidateRequest(c, route, resolveUpstreamURL(route, route.UpstreamURL, c, hook), result.Entry.Locator, ""); err == nil {
|
||||
if resp, err := h.revalidateRequest(c, route, effectiveRevalidateURL(route, c, result.Entry, hook), result.Entry.Locator, ""); err == nil {
|
||||
resp.Body.Close()
|
||||
}
|
||||
}
|
||||
@@ -259,6 +260,17 @@ func (h *Handler) fetchAndStream(
|
||||
h.logResult(route, upstreamURL.String(), requestID, 0, false, started, err)
|
||||
return h.writeError(c, fiber.StatusBadGateway, "upstream_failed")
|
||||
}
|
||||
effectiveUpstreamPath := ""
|
||||
originalStatus := resp.StatusCode
|
||||
originalPath := ""
|
||||
if hook != nil {
|
||||
originalPath = hook.clean
|
||||
}
|
||||
resp, upstreamURL, effectiveUpstreamPath, err = h.retryRegistryK8sManifestFallback(c, route, requestID, resp, upstreamURL, hook, originalStatus, originalPath)
|
||||
if err != nil {
|
||||
h.logResult(route, upstreamURL.String(), requestID, 0, false, started, err)
|
||||
return h.writeError(c, fiber.StatusBadGateway, "upstream_failed")
|
||||
}
|
||||
if hook != nil && hook.hasHooks && hook.def.RewriteResponse != nil {
|
||||
if rewritten, rewriteErr := applyHookRewrite(hook, resp, requestPath(c)); rewriteErr == nil {
|
||||
resp = rewritten
|
||||
@@ -273,7 +285,7 @@ func (h *Handler) fetchAndStream(
|
||||
|
||||
shouldStore := policy.allowStore && writer.Enabled() && isCacheableStatus(resp.StatusCode) &&
|
||||
c.Method() == http.MethodGet
|
||||
return h.consumeUpstream(c, route, locator, resp, shouldStore, writer, requestID, started, ctx)
|
||||
return h.consumeUpstream(c, route, locator, resp, shouldStore, writer, requestID, started, ctx, effectiveUpstreamPath)
|
||||
}
|
||||
|
||||
func applyHookRewrite(hook *hookState, resp *http.Response, path string) (*http.Response, error) {
|
||||
@@ -325,13 +337,14 @@ func (h *Handler) consumeUpstream(
|
||||
requestID string,
|
||||
started time.Time,
|
||||
ctx context.Context,
|
||||
effectiveUpstreamPath string,
|
||||
) error {
|
||||
upstreamURL := resp.Request.URL.String()
|
||||
method := c.Method()
|
||||
authFailure := isAuthFailure(resp.StatusCode) && route.Config.HasCredentials()
|
||||
|
||||
if shouldStore {
|
||||
return h.cacheAndStream(c, route, locator, resp, writer, requestID, started, ctx, upstreamURL)
|
||||
return h.cacheAndStream(c, route, locator, resp, writer, requestID, started, ctx, upstreamURL, effectiveUpstreamPath)
|
||||
}
|
||||
|
||||
copyResponseHeaders(c, resp.Header)
|
||||
@@ -369,6 +382,7 @@ func (h *Handler) cacheAndStream(
|
||||
started time.Time,
|
||||
ctx context.Context,
|
||||
upstreamURL string,
|
||||
effectiveUpstreamPath string,
|
||||
) error {
|
||||
copyResponseHeaders(c, resp.Header)
|
||||
c.Set("X-Any-Hub-Upstream", upstreamURL)
|
||||
@@ -381,7 +395,7 @@ func (h *Handler) cacheAndStream(
|
||||
// 使用 TeeReader 边向客户端回写边落盘,避免大文件在内存中完整缓冲。
|
||||
reader := io.TeeReader(resp.Body, c.Response().BodyWriter())
|
||||
|
||||
opts := cache.PutOptions{ModTime: extractModTime(resp.Header)}
|
||||
opts := cache.PutOptions{ModTime: extractModTime(resp.Header), EffectiveUpstreamPath: effectiveUpstreamPath}
|
||||
entry, err := writer.Put(ctx, locator, reader, opts)
|
||||
h.logResult(route, upstreamURL, requestID, resp.StatusCode, false, started, err)
|
||||
if err != nil {
|
||||
@@ -774,7 +788,7 @@ func (h *Handler) isCacheFresh(
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
upstreamURL := resolveUpstreamURL(route, route.UpstreamURL, c, hook)
|
||||
upstreamURL := effectiveRevalidateURL(route, c, entry, hook)
|
||||
resp, err := h.revalidateRequest(c, route, upstreamURL, locator, "")
|
||||
if err != nil {
|
||||
return false, err
|
||||
@@ -825,6 +839,77 @@ func (h *Handler) isCacheFresh(
|
||||
}
|
||||
}
|
||||
|
||||
func effectiveRevalidateURL(route *server.HubRoute, c fiber.Ctx, entry cache.Entry, hook *hookState) *url.URL {
|
||||
if route == nil || route.UpstreamURL == nil || entry.EffectiveUpstreamPath == "" {
|
||||
return resolveUpstreamURL(route, route.UpstreamURL, c, hook)
|
||||
}
|
||||
clone := *route.UpstreamURL
|
||||
clone.Path = entry.EffectiveUpstreamPath
|
||||
clone.RawPath = entry.EffectiveUpstreamPath
|
||||
return &clone
|
||||
}
|
||||
|
||||
func (h *Handler) retryRegistryK8sManifestFallback(
|
||||
c fiber.Ctx,
|
||||
route *server.HubRoute,
|
||||
requestID string,
|
||||
resp *http.Response,
|
||||
upstreamURL *url.URL,
|
||||
hook *hookState,
|
||||
originalStatus int,
|
||||
originalPath string,
|
||||
) (*http.Response, *url.URL, string, error) {
|
||||
if resp == nil || resp.StatusCode != http.StatusNotFound || hook == nil || hook.ctx == nil {
|
||||
return resp, upstreamURL, "", nil
|
||||
}
|
||||
fallbackPath, ok := dockermodule.RegistryK8sManifestFallbackPath(hook.ctx, hook.clean)
|
||||
if !ok {
|
||||
return resp, upstreamURL, "", nil
|
||||
}
|
||||
resp.Body.Close()
|
||||
fallbackHook := *hook
|
||||
fallbackHook.clean = fallbackPath
|
||||
fallbackResp, fallbackURL, err := h.executeRequest(c, route, &fallbackHook)
|
||||
if err != nil {
|
||||
return nil, upstreamURL, "", err
|
||||
}
|
||||
fallbackResp, fallbackURL, err = h.retryOnAuthFailure(c, route, requestID, time.Now(), fallbackResp, fallbackURL, &fallbackHook)
|
||||
if err != nil {
|
||||
return nil, upstreamURL, "", err
|
||||
}
|
||||
h.logRegistryK8sFallback(route, requestID, originalPath, fallbackPath, originalStatus, c.Method())
|
||||
if fallbackResp.StatusCode == http.StatusOK {
|
||||
return fallbackResp, fallbackURL, fallbackPath, nil
|
||||
}
|
||||
return fallbackResp, fallbackURL, "", nil
|
||||
}
|
||||
|
||||
func (h *Handler) logRegistryK8sFallback(route *server.HubRoute, requestID string, originalPath string, fallbackPath string, originalStatus int, method string) {
|
||||
if route == nil || h == nil || h.logger == nil {
|
||||
return
|
||||
}
|
||||
fields := logging.RequestFields(
|
||||
route.Config.Name,
|
||||
route.Config.Domain,
|
||||
route.Config.Type,
|
||||
route.Config.AuthMode(),
|
||||
route.Module.Key,
|
||||
false,
|
||||
)
|
||||
fields["action"] = "proxy_fallback"
|
||||
if route.UpstreamURL != nil {
|
||||
fields["upstream_host"] = route.UpstreamURL.Hostname()
|
||||
}
|
||||
fields["original_path"] = originalPath
|
||||
fields["fallback_path"] = fallbackPath
|
||||
fields["original_status"] = originalStatus
|
||||
fields["method"] = method
|
||||
if requestID != "" {
|
||||
fields["request_id"] = requestID
|
||||
}
|
||||
h.logger.WithFields(fields).Info("proxy_registry_k8s_fallback")
|
||||
}
|
||||
|
||||
func (h *Handler) revalidateRequest(
|
||||
c fiber.Ctx,
|
||||
route *server.HubRoute,
|
||||
|
||||
55
internal/proxy/handler_test.go
Normal file
55
internal/proxy/handler_test.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"net/url"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/any-hub/any-hub/internal/config"
|
||||
"github.com/any-hub/any-hub/internal/hubmodule"
|
||||
"github.com/any-hub/any-hub/internal/server"
|
||||
)
|
||||
|
||||
func TestRegistryK8sManifestFallbackLogsStructuredEvent(t *testing.T) {
|
||||
buf := new(bytes.Buffer)
|
||||
logger := logrus.New()
|
||||
logger.SetOutput(buf)
|
||||
logger.SetFormatter(&logrus.JSONFormatter{})
|
||||
|
||||
upstreamURL, err := url.Parse("http://registry.k8s.io")
|
||||
if err != nil {
|
||||
t.Fatalf("parse upstream url: %v", err)
|
||||
}
|
||||
|
||||
h := NewHandler(nil, logger, nil)
|
||||
route := &server.HubRoute{
|
||||
Config: config.HubConfig{
|
||||
Name: "docker",
|
||||
Domain: "k8s.hub.local",
|
||||
Type: "docker",
|
||||
},
|
||||
Module: hubmodule.ModuleMetadata{Key: "docker"},
|
||||
UpstreamURL: upstreamURL,
|
||||
}
|
||||
|
||||
h.logRegistryK8sFallback(route, "req-1", "/v2/coredns/manifests/v1.13.1", "/v2/coredns/coredns/manifests/v1.13.1", 404, "GET")
|
||||
|
||||
output := buf.String()
|
||||
for _, want := range []string{
|
||||
"proxy_registry_k8s_fallback",
|
||||
`"hub":"docker"`,
|
||||
`"domain":"k8s.hub.local"`,
|
||||
`"upstream_host":"registry.k8s.io"`,
|
||||
`"original_path":"/v2/coredns/manifests/v1.13.1"`,
|
||||
`"fallback_path":"/v2/coredns/coredns/manifests/v1.13.1"`,
|
||||
`"original_status":404`,
|
||||
`"method":"GET"`,
|
||||
} {
|
||||
if !strings.Contains(output, want) {
|
||||
t.Fatalf("expected log output to contain %s, got %s", want, output)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -26,6 +26,8 @@ import (
|
||||
const (
|
||||
dockerManifestPath = "/v2/library/cache-flow/manifests/latest"
|
||||
dockerManifestNoNamespacePath = "/v2/cache-flow/manifests/latest"
|
||||
registryK8sOriginalManifest = "/v2/coredns/manifests/v1.13.1"
|
||||
registryK8sFallbackManifest = "/v2/coredns/coredns/manifests/v1.13.1"
|
||||
)
|
||||
|
||||
func TestCacheFlowWithConditionalRequest(t *testing.T) {
|
||||
@@ -286,6 +288,140 @@ func TestDockerNonDockerHubUpstreamKeepsOriginalPath(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistryK8sManifestFallbackRetry(t *testing.T) {
|
||||
stub := newRegistryK8sStub(t)
|
||||
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
|
||||
defer stub.Close()
|
||||
|
||||
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
|
||||
req.Host = "k8s.hub.local"
|
||||
resp, err := app.Test(req)
|
||||
if err != nil {
|
||||
t.Fatalf("app.Test failed: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != fiber.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
t.Fatalf("expected 200, got %d (body=%s)", resp.StatusCode, string(body))
|
||||
}
|
||||
if stub.pathHits(registryK8sOriginalManifest, http.MethodGet) != 1 {
|
||||
t.Fatalf("expected single original manifest hit")
|
||||
}
|
||||
if stub.pathHits(registryK8sFallbackManifest, http.MethodGet) != 1 {
|
||||
t.Fatalf("expected single fallback manifest hit")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistryK8sManifestFallbackNotAttemptedWhenOriginalSucceeds(t *testing.T) {
|
||||
stub := newRegistryK8sStub(t)
|
||||
stub.setResponse(registryK8sOriginalManifest, http.StatusOK, []byte("original manifest"))
|
||||
defer stub.Close()
|
||||
|
||||
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
|
||||
req.Host = "k8s.hub.local"
|
||||
resp, err := app.Test(req)
|
||||
if err != nil {
|
||||
t.Fatalf("app.Test failed: %v", err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != fiber.StatusOK {
|
||||
t.Fatalf("expected 200, got %d", resp.StatusCode)
|
||||
}
|
||||
if stub.pathHits(registryK8sFallbackManifest, http.MethodGet) != 0 {
|
||||
t.Fatalf("expected no fallback hit")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistryK8sManifestFallbackNotAttemptedForNonRegistryHost(t *testing.T) {
|
||||
stub := newRegistryK8sStub(t)
|
||||
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
|
||||
defer stub.Close()
|
||||
|
||||
app := newDockerProxyTestApp(t, t.TempDir(), "mirror.hub.local", stub.URL)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "http://mirror.hub.local"+registryK8sOriginalManifest, nil)
|
||||
req.Host = "mirror.hub.local"
|
||||
resp, err := app.Test(req)
|
||||
if err != nil {
|
||||
t.Fatalf("app.Test failed: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != fiber.StatusNotFound {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
t.Fatalf("expected 404, got %d (body=%s)", resp.StatusCode, string(body))
|
||||
}
|
||||
if stub.pathHits(registryK8sFallbackManifest, http.MethodGet) != 0 {
|
||||
t.Fatalf("expected no fallback hit for non-registry host")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistryK8sManifestFallbackSecondRequestHitsCache(t *testing.T) {
|
||||
stub := newRegistryK8sStub(t)
|
||||
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
|
||||
defer stub.Close()
|
||||
|
||||
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
|
||||
|
||||
makeRequest := func() *http.Response {
|
||||
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
|
||||
req.Host = "k8s.hub.local"
|
||||
resp, err := app.Test(req)
|
||||
if err != nil {
|
||||
t.Fatalf("app.Test failed: %v", err)
|
||||
}
|
||||
return resp
|
||||
}
|
||||
|
||||
resp1 := makeRequest()
|
||||
resp1.Body.Close()
|
||||
resp2 := makeRequest()
|
||||
defer resp2.Body.Close()
|
||||
|
||||
if resp2.Header.Get("X-Any-Hub-Cache-Hit") != "true" {
|
||||
t.Fatalf("expected second request to hit cache")
|
||||
}
|
||||
if stub.pathHits(registryK8sOriginalManifest, http.MethodGet) != 1 {
|
||||
t.Fatalf("expected original 404 path to be requested once, got %d", stub.pathHits(registryK8sOriginalManifest, http.MethodGet))
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistryK8sFallbackCacheRevalidatesEffectivePath(t *testing.T) {
|
||||
stub := newRegistryK8sStub(t)
|
||||
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
|
||||
defer stub.Close()
|
||||
|
||||
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
|
||||
|
||||
makeRequest := func() *http.Response {
|
||||
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
|
||||
req.Host = "k8s.hub.local"
|
||||
resp, err := app.Test(req)
|
||||
if err != nil {
|
||||
t.Fatalf("app.Test failed: %v", err)
|
||||
}
|
||||
return resp
|
||||
}
|
||||
|
||||
resp1 := makeRequest()
|
||||
resp1.Body.Close()
|
||||
resp2 := makeRequest()
|
||||
resp2.Body.Close()
|
||||
|
||||
if stub.pathHits(registryK8sOriginalManifest, http.MethodHead) != 0 {
|
||||
t.Fatalf("expected no HEAD revalidation against original path, got %d", stub.pathHits(registryK8sOriginalManifest, http.MethodHead))
|
||||
}
|
||||
if stub.pathHits(registryK8sFallbackManifest, http.MethodHead) == 0 {
|
||||
t.Fatalf("expected HEAD revalidation against fallback path")
|
||||
}
|
||||
}
|
||||
|
||||
type cacheFlowStub struct {
|
||||
server *http.Server
|
||||
listener net.Listener
|
||||
@@ -300,6 +436,22 @@ type cacheFlowStub struct {
|
||||
lastMod string
|
||||
}
|
||||
|
||||
type registryK8sStub struct {
|
||||
server *http.Server
|
||||
listener net.Listener
|
||||
URL string
|
||||
|
||||
mu sync.Mutex
|
||||
responses map[string]registryK8sResponse
|
||||
hits map[string]int
|
||||
}
|
||||
|
||||
type registryK8sResponse struct {
|
||||
status int
|
||||
body []byte
|
||||
etag string
|
||||
}
|
||||
|
||||
func newCacheFlowStub(t *testing.T, paths ...string) *cacheFlowStub {
|
||||
t.Helper()
|
||||
stub := &cacheFlowStub{
|
||||
@@ -335,6 +487,32 @@ func newCacheFlowStub(t *testing.T, paths ...string) *cacheFlowStub {
|
||||
return stub
|
||||
}
|
||||
|
||||
func newRegistryK8sStub(t *testing.T) *registryK8sStub {
|
||||
t.Helper()
|
||||
stub := ®istryK8sStub{
|
||||
responses: make(map[string]registryK8sResponse),
|
||||
hits: make(map[string]int),
|
||||
}
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/", stub.handle)
|
||||
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Skipf("unable to start registry.k8s.io stub listener: %v", err)
|
||||
}
|
||||
|
||||
server := &http.Server{Handler: mux}
|
||||
stub.server = server
|
||||
stub.listener = listener
|
||||
stub.URL = "http://" + listener.Addr().String()
|
||||
|
||||
go func() {
|
||||
_ = server.Serve(listener)
|
||||
}()
|
||||
|
||||
return stub
|
||||
}
|
||||
|
||||
func (s *cacheFlowStub) Close() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
@@ -346,6 +524,109 @@ func (s *cacheFlowStub) Close() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *registryK8sStub) Close() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
if s.server != nil {
|
||||
_ = s.server.Shutdown(ctx)
|
||||
}
|
||||
if s.listener != nil {
|
||||
_ = s.listener.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *registryK8sStub) setResponse(path string, status int, body []byte) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.responses[path] = registryK8sResponse{
|
||||
status: status,
|
||||
body: append([]byte(nil), body...),
|
||||
etag: fmt.Sprintf(`"%s-etag"`, strings.ReplaceAll(strings.Trim(path, "/"), "/", "-")),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *registryK8sStub) pathHits(path string, method string) int {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.hits[method+" "+path]
|
||||
}
|
||||
|
||||
func (s *registryK8sStub) handle(w http.ResponseWriter, r *http.Request) {
|
||||
s.mu.Lock()
|
||||
s.hits[r.Method+" "+r.URL.Path]++
|
||||
resp, ok := s.responses[r.URL.Path]
|
||||
s.mu.Unlock()
|
||||
|
||||
if !ok {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
_, _ = w.Write([]byte(`{"errors":[{"code":"MANIFEST_UNKNOWN","message":"manifest unknown"}]}`))
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/vnd.docker.distribution.manifest.v2+json")
|
||||
w.Header().Set("Etag", resp.etag)
|
||||
w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat))
|
||||
if r.Method == http.MethodHead {
|
||||
for _, candidate := range r.Header.Values("If-None-Match") {
|
||||
if strings.Trim(candidate, `"`) == strings.Trim(resp.etag, `"`) {
|
||||
w.WriteHeader(http.StatusNotModified)
|
||||
return
|
||||
}
|
||||
}
|
||||
w.WriteHeader(resp.status)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(resp.status)
|
||||
_, _ = w.Write(resp.body)
|
||||
}
|
||||
|
||||
func newDockerProxyTestApp(t *testing.T, storageDir string, domain string, upstream string, proxyURLs ...string) *fiber.App {
|
||||
t.Helper()
|
||||
proxyURL := ""
|
||||
if len(proxyURLs) > 0 {
|
||||
proxyURL = proxyURLs[0]
|
||||
}
|
||||
cfg := &config.Config{
|
||||
Global: config.GlobalConfig{
|
||||
ListenPort: 5000,
|
||||
CacheTTL: config.Duration(30 * time.Second),
|
||||
StoragePath: storageDir,
|
||||
},
|
||||
Hubs: []config.HubConfig{
|
||||
{
|
||||
Name: "docker",
|
||||
Domain: domain,
|
||||
Type: "docker",
|
||||
Upstream: upstream,
|
||||
Proxy: proxyURL,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
registry, err := server.NewHubRegistry(cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("registry error: %v", err)
|
||||
}
|
||||
logger := logrus.New()
|
||||
logger.SetOutput(io.Discard)
|
||||
store, err := cache.NewStore(storageDir)
|
||||
if err != nil {
|
||||
t.Fatalf("store error: %v", err)
|
||||
}
|
||||
client := server.NewUpstreamClient(cfg)
|
||||
handler := proxy.NewHandler(client, logger, store)
|
||||
app, err := server.NewApp(server.AppOptions{
|
||||
Logger: logger,
|
||||
Registry: registry,
|
||||
Proxy: handler,
|
||||
ListenPort: 5000,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("app error: %v", err)
|
||||
}
|
||||
return app
|
||||
}
|
||||
|
||||
func (s *cacheFlowStub) handle(w http.ResponseWriter, r *http.Request) {
|
||||
s.mu.Lock()
|
||||
etag := s.etag
|
||||
|
||||
@@ -30,6 +30,7 @@ type upstreamStub struct {
|
||||
requests []RecordedRequest
|
||||
mode upstreamMode
|
||||
blobBytes []byte
|
||||
lastMod string
|
||||
}
|
||||
|
||||
// RecordedRequest 捕获每次请求的方法/路径/Host/Headers,便于断言代理行为。
|
||||
@@ -48,13 +49,14 @@ func newUpstreamStub(t *testing.T, mode upstreamMode) *upstreamStub {
|
||||
stub := &upstreamStub{
|
||||
mode: mode,
|
||||
blobBytes: []byte("stub-layer-payload"),
|
||||
lastMod: time.Now().UTC().Format(http.TimeFormat),
|
||||
}
|
||||
|
||||
switch mode {
|
||||
case upstreamDocker:
|
||||
registerDockerHandlers(mux, stub.blobBytes)
|
||||
case upstreamNPM:
|
||||
registerNPMHandlers(mux)
|
||||
registerNPMHandlers(mux, stub)
|
||||
default:
|
||||
t.Fatalf("unsupported stub mode: %s", mode)
|
||||
}
|
||||
@@ -153,10 +155,10 @@ func registerDockerHandlers(mux *http.ServeMux, blob []byte) {
|
||||
})
|
||||
}
|
||||
|
||||
func registerNPMHandlers(mux *http.ServeMux) {
|
||||
func registerNPMHandlers(mux *http.ServeMux, stub *upstreamStub) {
|
||||
mux.HandleFunc("/lodash", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat))
|
||||
w.Header().Set("Last-Modified", stub.lastMod)
|
||||
resp := map[string]any{
|
||||
"name": "lodash",
|
||||
"dist-tags": map[string]string{
|
||||
@@ -175,7 +177,7 @@ func registerNPMHandlers(mux *http.ServeMux) {
|
||||
|
||||
mux.HandleFunc("/lodash/-/lodash-4.17.21.tgz", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/octet-stream")
|
||||
w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat))
|
||||
w.Header().Set("Last-Modified", stub.lastMod)
|
||||
_, _ = w.Write([]byte("tarball-bytes"))
|
||||
})
|
||||
}
|
||||
@@ -232,6 +234,37 @@ func TestDockerStubServesManifestAndBlob(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestNPMStubKeepsStableValidatorsAcrossHeadRequests(t *testing.T) {
|
||||
stub := newUpstreamStub(t, upstreamNPM)
|
||||
defer stub.Close()
|
||||
|
||||
req1, err := http.NewRequest(http.MethodHead, stub.URL+"/lodash", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("build first HEAD request: %v", err)
|
||||
}
|
||||
resp1, err := http.DefaultClient.Do(req1)
|
||||
if err != nil {
|
||||
t.Fatalf("first HEAD failed: %v", err)
|
||||
}
|
||||
resp1.Body.Close()
|
||||
|
||||
time.Sleep(1100 * time.Millisecond)
|
||||
|
||||
req2, err := http.NewRequest(http.MethodHead, stub.URL+"/lodash", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("build second HEAD request: %v", err)
|
||||
}
|
||||
resp2, err := http.DefaultClient.Do(req2)
|
||||
if err != nil {
|
||||
t.Fatalf("second HEAD failed: %v", err)
|
||||
}
|
||||
resp2.Body.Close()
|
||||
|
||||
if resp1.Header.Get("Last-Modified") != resp2.Header.Get("Last-Modified") {
|
||||
t.Fatalf("expected stable Last-Modified, got %q then %q", resp1.Header.Get("Last-Modified"), resp2.Header.Get("Last-Modified"))
|
||||
}
|
||||
}
|
||||
|
||||
func TestNPMStubServesMetadataAndTarball(t *testing.T) {
|
||||
stub := newUpstreamStub(t, upstreamNPM)
|
||||
defer stub.Close()
|
||||
|
||||
Reference in New Issue
Block a user