Compare commits
6 Commits
bbc7204901
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 47c58ecb69 | |||
| 271c8fe538 | |||
| 442c27a26d | |||
| 9c85b9b097 | |||
| 7c95ee0210 | |||
| 05d5037e97 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -20,6 +20,7 @@ Thumbs.db
|
|||||||
.vscode/
|
.vscode/
|
||||||
.idea/
|
.idea/
|
||||||
.cache/
|
.cache/
|
||||||
|
.worktrees/
|
||||||
storage/
|
storage/
|
||||||
logs/
|
logs/
|
||||||
tmp/
|
tmp/
|
||||||
|
|||||||
414
docs/superpowers/plans/2026-03-23-registry-k8s-compat.md
Normal file
414
docs/superpowers/plans/2026-03-23-registry-k8s-compat.md
Normal file
@@ -0,0 +1,414 @@
|
|||||||
|
# registry.k8s.io Compatibility Fallback Implementation Plan
|
||||||
|
|
||||||
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||||
|
|
||||||
|
**Goal:** Add a `registry.k8s.io`-only manifest fallback so single-segment repos like `coredns` retry once as `coredns/coredns` after a final first-attempt `404`.
|
||||||
|
|
||||||
|
**Architecture:** Keep the first upstream request unchanged, then add one targeted retry in the proxy after the normal auth flow completes. Persist the effective upstream path for fallback-filled cache entries so later cache revalidation uses the path that actually produced the content instead of rechecking the original `404` path.
|
||||||
|
|
||||||
|
**Tech Stack:** Go 1.25, Fiber v3, internal docker hooks, internal proxy handler, filesystem cache store, Go tests
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## File map
|
||||||
|
|
||||||
|
- Modify: `internal/hubmodule/docker/hooks.go`
|
||||||
|
- add `registry.k8s.io` host detection and manifest fallback path derivation helpers
|
||||||
|
- Modify: `internal/hubmodule/docker/hooks_test.go`
|
||||||
|
- add unit tests for host gating and manifest fallback derivation rules
|
||||||
|
- Modify: `internal/cache/store.go`
|
||||||
|
- extend cache entry/options to carry optional effective upstream path metadata
|
||||||
|
- Modify: `internal/cache/fs_store.go`
|
||||||
|
- persist and load cache metadata for effective upstream path
|
||||||
|
- Modify: `internal/cache/store_test.go`
|
||||||
|
- verify metadata round-trip behavior and remove behavior
|
||||||
|
- Modify: `internal/proxy/handler.go`
|
||||||
|
- Modify: `internal/proxy/handler_test.go`
|
||||||
|
- add one-shot `registry.k8s.io` fallback retry after auth retry flow
|
||||||
|
- write/read effective upstream path metadata for fallback-backed cache entries
|
||||||
|
- use effective upstream path during revalidation
|
||||||
|
- Modify: `tests/integration/cache_flow_test.go`
|
||||||
|
- add integration coverage for fallback success, no-fallback success, and cache/revalidation behavior
|
||||||
|
|
||||||
|
### Task 1: Add docker fallback derivation helpers
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `internal/hubmodule/docker/hooks.go`
|
||||||
|
- Test: `internal/hubmodule/docker/hooks_test.go`
|
||||||
|
|
||||||
|
- [ ] **Step 1: Write the failing unit tests**
|
||||||
|
|
||||||
|
```go
|
||||||
|
func TestIsRegistryK8sHost(t *testing.T) {
|
||||||
|
if !isRegistryK8sHost("registry.k8s.io") {
|
||||||
|
t.Fatalf("expected registry.k8s.io to match")
|
||||||
|
}
|
||||||
|
if isRegistryK8sHost("example.com") {
|
||||||
|
t.Fatalf("expected non-registry.k8s.io host to be ignored")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegistryK8sManifestFallbackPath(t *testing.T) {
|
||||||
|
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
|
||||||
|
path, ok := manifestFallbackPath(ctx, "/v2/coredns/manifests/v1.13.1")
|
||||||
|
if !ok || path != "/v2/coredns/coredns/manifests/v1.13.1" {
|
||||||
|
t.Fatalf("expected fallback path, got %q ok=%v", path, ok)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegistryK8sManifestFallbackPathRejectsMultiSegmentRepo(t *testing.T) {
|
||||||
|
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
|
||||||
|
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/coredns/manifests/v1.13.1"); ok {
|
||||||
|
t.Fatalf("expected multi-segment repo to be ignored")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegistryK8sManifestFallbackPathRejectsNonManifest(t *testing.T) {
|
||||||
|
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
|
||||||
|
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/blobs/sha256:deadbeef"); ok {
|
||||||
|
t.Fatalf("expected non-manifest path to be ignored")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegistryK8sManifestFallbackPathRejectsNonRegistryHost(t *testing.T) {
|
||||||
|
ctx := &hooks.RequestContext{UpstreamHost: "mirror.gcr.io"}
|
||||||
|
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/manifests/v1.13.1"); ok {
|
||||||
|
t.Fatalf("expected non-registry.k8s.io host to be ignored")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 2: Run test to verify it fails**
|
||||||
|
|
||||||
|
Run: `go test ./internal/hubmodule/docker -run 'TestIsRegistryK8sHost|TestRegistryK8sManifestFallbackPath' -count=1`
|
||||||
|
Expected: FAIL because `isRegistryK8sHost` and `manifestFallbackPath` do not exist yet.
|
||||||
|
|
||||||
|
- [ ] **Step 3: Write minimal implementation**
|
||||||
|
|
||||||
|
```go
|
||||||
|
func isRegistryK8sHost(host string) bool {
|
||||||
|
if parsedHost, _, err := net.SplitHostPort(host); err == nil {
|
||||||
|
host = parsedHost
|
||||||
|
}
|
||||||
|
return strings.EqualFold(host, "registry.k8s.io")
|
||||||
|
}
|
||||||
|
|
||||||
|
func manifestFallbackPath(ctx *hooks.RequestContext, clean string) (string, bool) {
|
||||||
|
if ctx == nil || !isRegistryK8sHost(ctx.UpstreamHost) {
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
repo, rest, ok := splitDockerRepoPath(clean)
|
||||||
|
if !ok || strings.Count(repo, "/") != 0 || !strings.HasPrefix(rest, "/manifests/") {
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
return "/v2/" + repo + "/" + repo + rest, true
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 4: Run test to verify it passes**
|
||||||
|
|
||||||
|
Run: `go test ./internal/hubmodule/docker -count=1`
|
||||||
|
Expected: PASS
|
||||||
|
|
||||||
|
- [ ] **Step 5: Commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add internal/hubmodule/docker/hooks.go internal/hubmodule/docker/hooks_test.go
|
||||||
|
git commit -m "test: cover registry k8s fallback path derivation"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Task 2: Persist effective upstream path in cache metadata
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `internal/cache/store.go`
|
||||||
|
- Modify: `internal/cache/fs_store.go`
|
||||||
|
- Test: `internal/cache/store_test.go`
|
||||||
|
|
||||||
|
- [ ] **Step 1: Write the failing cache metadata test**
|
||||||
|
|
||||||
|
```go
|
||||||
|
func TestStorePersistsEffectiveUpstreamPath(t *testing.T) {
|
||||||
|
store, err := NewStore(t.TempDir())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewStore: %v", err)
|
||||||
|
}
|
||||||
|
loc := Locator{HubName: "docker", Path: "/v2/coredns/manifests/v1.13.1"}
|
||||||
|
_, err = store.Put(context.Background(), loc, strings.NewReader("body"), PutOptions{
|
||||||
|
EffectiveUpstreamPath: "/v2/coredns/coredns/manifests/v1.13.1",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Put: %v", err)
|
||||||
|
}
|
||||||
|
got, err := store.Get(context.Background(), loc)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Get: %v", err)
|
||||||
|
}
|
||||||
|
if got.Entry.EffectiveUpstreamPath != "/v2/coredns/coredns/manifests/v1.13.1" {
|
||||||
|
t.Fatalf("unexpected effective path: %q", got.Entry.EffectiveUpstreamPath)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 2: Run test to verify it fails**
|
||||||
|
|
||||||
|
Run: `go test ./internal/cache -run TestStorePersistsEffectiveUpstreamPath -count=1`
|
||||||
|
Expected: FAIL because cache entry metadata does not yet include effective upstream path.
|
||||||
|
|
||||||
|
- [ ] **Step 3: Write minimal implementation**
|
||||||
|
|
||||||
|
```go
|
||||||
|
type PutOptions struct {
|
||||||
|
ModTime time.Time
|
||||||
|
EffectiveUpstreamPath string
|
||||||
|
}
|
||||||
|
|
||||||
|
type Entry struct {
|
||||||
|
Locator Locator `json:"locator"`
|
||||||
|
FilePath string `json:"file_path"`
|
||||||
|
SizeBytes int64 `json:"size_bytes"`
|
||||||
|
ModTime time.Time `json:"mod_time"`
|
||||||
|
EffectiveUpstreamPath string `json:"effective_upstream_path,omitempty"`
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Implementation notes:
|
||||||
|
- store metadata next to the cached body as a small JSON file such as `<entry>.meta`
|
||||||
|
- on `Get`, load metadata if present and populate `Entry.EffectiveUpstreamPath`
|
||||||
|
- on `Put`, write metadata atomically only when `EffectiveUpstreamPath` is non-empty
|
||||||
|
- on `Remove`, delete both body and metadata files
|
||||||
|
|
||||||
|
- [ ] **Step 4: Run test to verify it passes**
|
||||||
|
|
||||||
|
Run: `go test ./internal/cache -count=1`
|
||||||
|
Expected: PASS
|
||||||
|
|
||||||
|
- [ ] **Step 5: Commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add internal/cache/store.go internal/cache/fs_store.go internal/cache/store_test.go
|
||||||
|
git commit -m "feat: persist cache effective upstream path"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Task 3: Add one-shot fallback retry in proxy fetch flow
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `internal/proxy/handler.go`
|
||||||
|
- Modify: `internal/hubmodule/docker/hooks.go`
|
||||||
|
- Test: `tests/integration/cache_flow_test.go`
|
||||||
|
|
||||||
|
- [ ] **Step 1: Write the failing integration test for fallback success**
|
||||||
|
|
||||||
|
```go
|
||||||
|
func TestRegistryK8sManifestFallbackRetry(t *testing.T) {
|
||||||
|
// stub returns 404 for /v2/coredns/manifests/v1.13.1
|
||||||
|
// stub returns 200 for /v2/coredns/coredns/manifests/v1.13.1
|
||||||
|
// request /v2/coredns/manifests/v1.13.1 through proxy
|
||||||
|
// expect 200 and exactly two upstream hits
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegistryK8sManifestFallbackNotAttemptedWhenOriginalSucceeds(t *testing.T) {
|
||||||
|
// stub returns 200 for original path
|
||||||
|
// request original path
|
||||||
|
// expect 200 and only one upstream hit
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegistryK8sManifestFallbackNotAttemptedForNonRegistryHost(t *testing.T) {
|
||||||
|
// non-registry.k8s.io upstream returns 404 for original path
|
||||||
|
// request original path
|
||||||
|
// expect final 404 and no derived path hit
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegistryK8sManifestFallbackSecondRequestHitsCache(t *testing.T) {
|
||||||
|
// first GET falls back and succeeds
|
||||||
|
// second identical GET should be cache hit
|
||||||
|
// expect no new hit to original 404 path on second request
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 2: Run test to verify it fails**
|
||||||
|
|
||||||
|
Run: `go test ./tests/integration -run 'TestRegistryK8sManifestFallbackRetry|TestRegistryK8sManifestFallbackNotAttemptedWhenOriginalSucceeds|TestRegistryK8sManifestFallbackNotAttemptedForNonRegistryHost|TestRegistryK8sManifestFallbackSecondRequestHitsCache' -count=1`
|
||||||
|
Expected: FAIL because the fallback tests were written before the retry logic exists.
|
||||||
|
|
||||||
|
- [ ] **Step 3: Write minimal implementation**
|
||||||
|
|
||||||
|
```go
|
||||||
|
resp, upstreamURL, err := h.executeRequest(c, route, hook)
|
||||||
|
resp, upstreamURL, err = h.retryOnAuthFailure(c, route, requestID, started, resp, upstreamURL, hook)
|
||||||
|
|
||||||
|
if resp.StatusCode == http.StatusNotFound {
|
||||||
|
if fallbackHook, fallbackURL, ok := h.registryK8sFallbackAttempt(c, route, hook); ok {
|
||||||
|
resp.Body.Close()
|
||||||
|
resp, upstreamURL, err = h.executeRequest(c, route, fallbackHook)
|
||||||
|
if err == nil {
|
||||||
|
resp, upstreamURL, err = h.retryOnAuthFailure(c, route, requestID, started, resp, upstreamURL, fallbackHook)
|
||||||
|
}
|
||||||
|
fallbackEffectivePath = fallbackURL.Path
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Implementation notes:
|
||||||
|
- in `internal/hubmodule/docker/hooks.go`, keep only pure path helpers such as `isRegistryK8sHost` and `manifestFallbackPath`
|
||||||
|
- in `internal/proxy/handler.go`, add the retry orchestration helper that clones the current hook state with the derived fallback path when `manifestFallbackPath` returns true
|
||||||
|
- only evaluate fallback after `retryOnAuthFailure` completes and only for final `404`
|
||||||
|
- close the first response body before the retry
|
||||||
|
- if fallback succeeds with `200`, pass `EffectiveUpstreamPath` into cache `PutOptions`
|
||||||
|
- if fallback returns non-`200`, pass that second upstream response through unchanged
|
||||||
|
- emit a structured fallback log event from `internal/proxy/handler.go` with hub name, domain, upstream host, original path, fallback path, original status, and request method
|
||||||
|
|
||||||
|
- [ ] **Step 4: Run test to verify it passes**
|
||||||
|
|
||||||
|
Run: `go test ./tests/integration -run 'TestRegistryK8sManifestFallbackRetry|TestRegistryK8sManifestFallbackNotAttemptedWhenOriginalSucceeds|TestRegistryK8sManifestFallbackNotAttemptedForNonRegistryHost|TestRegistryK8sManifestFallbackSecondRequestHitsCache' -count=1`
|
||||||
|
Expected: PASS
|
||||||
|
|
||||||
|
- [ ] **Step 5: Commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add internal/proxy/handler.go tests/integration/cache_flow_test.go internal/hubmodule/docker/hooks.go
|
||||||
|
git commit -m "feat: retry registry k8s manifest fallback"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Task 4: Revalidate cached fallback entries against the effective upstream path
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `internal/proxy/handler.go`
|
||||||
|
- Modify: `tests/integration/cache_flow_test.go`
|
||||||
|
|
||||||
|
- [ ] **Step 1: Write the failing revalidation regression test**
|
||||||
|
|
||||||
|
```go
|
||||||
|
func TestRegistryK8sFallbackCacheRevalidatesEffectivePath(t *testing.T) {
|
||||||
|
// first GET: original path 404, fallback path 200, response cached
|
||||||
|
// second GET: cache hit path should revalidate against fallback path only
|
||||||
|
// assert original 404 path is not re-requested during revalidation
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 2: Run test to verify it fails**
|
||||||
|
|
||||||
|
Run: `go test ./tests/integration -run TestRegistryK8sFallbackCacheRevalidatesEffectivePath -count=1`
|
||||||
|
Expected: FAIL because revalidation still targets the original client-facing path.
|
||||||
|
|
||||||
|
- [ ] **Step 3: Write minimal implementation**
|
||||||
|
|
||||||
|
```go
|
||||||
|
func effectiveRevalidateURL(route *server.HubRoute, c fiber.Ctx, entry cache.Entry, hook *hookState) *url.URL {
|
||||||
|
if entry.EffectiveUpstreamPath == "" {
|
||||||
|
return resolveUpstreamURL(route, route.UpstreamURL, c, hook)
|
||||||
|
}
|
||||||
|
clone := *route.UpstreamURL
|
||||||
|
clone.Path = entry.EffectiveUpstreamPath
|
||||||
|
clone.RawPath = entry.EffectiveUpstreamPath
|
||||||
|
return &clone
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Implementation notes:
|
||||||
|
- use `Entry.EffectiveUpstreamPath` in both `isCacheFresh` and cached `HEAD` handling inside `serveCache`
|
||||||
|
- do not re-derive fallback during revalidation when metadata already specifies the effective path
|
||||||
|
- keep client-visible cache key unchanged
|
||||||
|
|
||||||
|
- [ ] **Step 4: Run test to verify it passes**
|
||||||
|
|
||||||
|
Run: `go test ./tests/integration -run TestRegistryK8sFallbackCacheRevalidatesEffectivePath -count=1`
|
||||||
|
Expected: PASS
|
||||||
|
|
||||||
|
- [ ] **Step 5: Commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add internal/proxy/handler.go tests/integration/cache_flow_test.go
|
||||||
|
git commit -m "fix: revalidate registry k8s fallback cache entries"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Task 5: Emit and verify fallback structured logging
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `internal/proxy/handler.go`
|
||||||
|
- Test: `internal/proxy/handler_test.go`
|
||||||
|
|
||||||
|
- [ ] **Step 1: Write the failing logging test**
|
||||||
|
|
||||||
|
```go
|
||||||
|
func TestRegistryK8sManifestFallbackLogsStructuredEvent(t *testing.T) {
|
||||||
|
// configure handler with a bytes.Buffer-backed logrus logger
|
||||||
|
// trigger fallback success for /v2/coredns/manifests/v1.13.1
|
||||||
|
// assert log output contains event name plus fields:
|
||||||
|
// hub, domain, upstream host, original path, fallback path, original status, method
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 2: Run test to verify it fails**
|
||||||
|
|
||||||
|
Run: `go test ./internal/proxy -run TestRegistryK8sManifestFallbackLogsStructuredEvent -count=1`
|
||||||
|
Expected: FAIL because the fallback log event does not exist yet.
|
||||||
|
|
||||||
|
- [ ] **Step 3: Write minimal implementation**
|
||||||
|
|
||||||
|
```go
|
||||||
|
func (h *Handler) logRegistryK8sFallback(route *server.HubRoute, requestID string, originalPath string, fallbackPath string, originalStatus int, method string) {
|
||||||
|
fields := logging.RequestFields(
|
||||||
|
route.Config.Name,
|
||||||
|
route.Config.Domain,
|
||||||
|
route.Config.Type,
|
||||||
|
route.Config.AuthMode(),
|
||||||
|
route.Module.Key,
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
fields["action"] = "proxy_fallback"
|
||||||
|
fields["upstream_host"] = route.UpstreamURL.Host
|
||||||
|
fields["original_path"] = originalPath
|
||||||
|
fields["fallback_path"] = fallbackPath
|
||||||
|
fields["original_status"] = originalStatus
|
||||||
|
fields["method"] = method
|
||||||
|
h.logger.WithFields(fields).Info("proxy_registry_k8s_fallback")
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 4: Run test to verify it passes**
|
||||||
|
|
||||||
|
Run: `go test ./internal/proxy -run TestRegistryK8sManifestFallbackLogsStructuredEvent -count=1`
|
||||||
|
Expected: PASS
|
||||||
|
|
||||||
|
- [ ] **Step 5: Commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add internal/proxy/handler.go internal/proxy/handler_test.go
|
||||||
|
git commit -m "test: cover registry k8s fallback logging"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Task 6: Final verification
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Verify: `internal/hubmodule/docker/hooks.go`
|
||||||
|
- Verify: `internal/cache/store.go`
|
||||||
|
- Verify: `internal/cache/fs_store.go`
|
||||||
|
- Verify: `internal/proxy/handler.go`
|
||||||
|
- Verify: `tests/integration/cache_flow_test.go`
|
||||||
|
|
||||||
|
- [ ] **Step 1: Run focused package tests**
|
||||||
|
|
||||||
|
Run: `go test ./internal/hubmodule/docker ./internal/cache ./tests/integration -count=1`
|
||||||
|
Expected: PASS
|
||||||
|
|
||||||
|
- [ ] **Step 2: Run full test suite**
|
||||||
|
|
||||||
|
Run: `go test ./... -count=1`
|
||||||
|
Expected: PASS
|
||||||
|
|
||||||
|
- [ ] **Step 3: Inspect git diff**
|
||||||
|
|
||||||
|
Run: `git diff --stat`
|
||||||
|
Expected: only the planned files changed for this feature.
|
||||||
|
|
||||||
|
- [ ] **Step 4: Verify fallback logging is present**
|
||||||
|
|
||||||
|
Run: `go test ./internal/proxy -run TestRegistryK8sManifestFallbackLogsStructuredEvent -count=1`
|
||||||
|
Expected: PASS, and the test explicitly checks the structured fallback log event fields.
|
||||||
|
|
||||||
|
- [ ] **Step 5: Commit final polish if needed**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add internal/hubmodule/docker/hooks.go internal/hubmodule/docker/hooks_test.go internal/cache/store.go internal/cache/fs_store.go internal/cache/store_test.go internal/proxy/handler.go internal/proxy/handler_test.go tests/integration/cache_flow_test.go
|
||||||
|
git commit -m "feat: add registry k8s manifest fallback compatibility"
|
||||||
|
```
|
||||||
181
docs/superpowers/specs/2026-03-23-registry-k8s-compat-design.md
Normal file
181
docs/superpowers/specs/2026-03-23-registry-k8s-compat-design.md
Normal file
@@ -0,0 +1,181 @@
|
|||||||
|
# registry.k8s.io coredns fallback design
|
||||||
|
|
||||||
|
## Goal
|
||||||
|
|
||||||
|
Add a narrow compatibility fallback for `registry.k8s.io` so requests like
|
||||||
|
`k8s.hub.ipao.vip/coredns:v1.13.1` can retry as
|
||||||
|
`k8s.hub.ipao.vip/coredns/coredns:v1.13.1` when the original manifest lookup is
|
||||||
|
not found upstream.
|
||||||
|
|
||||||
|
## Scope
|
||||||
|
|
||||||
|
This compatibility applies only when all of the following are true:
|
||||||
|
|
||||||
|
- hub module is `docker`
|
||||||
|
- upstream host is exactly `registry.k8s.io`
|
||||||
|
- requested repository name is a single segment such as `coredns`
|
||||||
|
- request targets `/manifests/<ref>`
|
||||||
|
- the first upstream request clearly fails as not found
|
||||||
|
|
||||||
|
Out of scope:
|
||||||
|
|
||||||
|
- changing behavior for Docker Hub
|
||||||
|
- changing behavior for other registries
|
||||||
|
- eagerly rewriting every single-segment repo under `registry.k8s.io`
|
||||||
|
- retrying more than one alternate repository form
|
||||||
|
- applying the initial compatibility behavior to blobs, tags, or referrers
|
||||||
|
|
||||||
|
## Recommended approach
|
||||||
|
|
||||||
|
Keep the current request path unchanged on the first attempt. After the normal
|
||||||
|
auth flow completes for that path, if the final first-attempt response from
|
||||||
|
`registry.k8s.io` is `404` for a single-segment manifest repository, perform
|
||||||
|
one internal retry against the derived path `/v2/<repo>/<repo>/...` and return
|
||||||
|
the retry result when it succeeds.
|
||||||
|
|
||||||
|
This keeps the existing behavior stable for paths that already work and limits
|
||||||
|
the compatibility behavior to the known `coredns -> coredns/coredns` style case.
|
||||||
|
|
||||||
|
## Alternatives considered
|
||||||
|
|
||||||
|
### 1. Always rewrite single-segment repos to `<repo>/<repo>`
|
||||||
|
|
||||||
|
Rejected because it would change successful existing paths and could break valid
|
||||||
|
single-segment repositories on `registry.k8s.io`.
|
||||||
|
|
||||||
|
### 2. Probe both paths before deciding
|
||||||
|
|
||||||
|
Rejected because it doubles upstream traffic and adds more complexity than the
|
||||||
|
known compatibility problem needs.
|
||||||
|
|
||||||
|
## Request flow
|
||||||
|
|
||||||
|
1. Receive docker request and run the existing normalization logic.
|
||||||
|
2. Send the first upstream request using the original normalized path.
|
||||||
|
3. Complete the existing auth retry behavior for that path first.
|
||||||
|
4. If the final response is successful, continue as today.
|
||||||
|
5. If the final response is `404`, check whether the request is eligible for
|
||||||
|
the `registry.k8s.io` fallback.
|
||||||
|
6. Close the first response body before retrying.
|
||||||
|
7. If eligible, derive a second path by transforming `/v2/<repo>/...` into
|
||||||
|
`/v2/<repo>/<repo>/...`.
|
||||||
|
8. Retry exactly once against the derived path, reusing the same method and the
|
||||||
|
same auth behavior used by the normal upstream flow.
|
||||||
|
9. If the retry succeeds, use that response and mark the cache entry with the
|
||||||
|
effective upstream path that produced the content.
|
||||||
|
10. If the retry also returns an upstream response, pass that second response
|
||||||
|
through unchanged.
|
||||||
|
11. If the retry fails with a transport error, return the existing proxy error
|
||||||
|
response path and do not continue retrying.
|
||||||
|
|
||||||
|
## Eligibility rules
|
||||||
|
|
||||||
|
The fallback should only trigger when all checks pass:
|
||||||
|
|
||||||
|
- upstream host matches `registry.k8s.io`
|
||||||
|
- request path parses as `/v2/<repo>/manifests/<ref>`
|
||||||
|
- repository name contains exactly one segment before `manifests`
|
||||||
|
- repository is not already `<repo>/<repo>`
|
||||||
|
- final response after normal auth handling is HTTP `404`
|
||||||
|
|
||||||
|
For the initial implementation, the trigger is status-based and limited to HTTP
|
||||||
|
`404`.
|
||||||
|
|
||||||
|
## Component changes
|
||||||
|
|
||||||
|
### `internal/hubmodule/docker/hooks.go`
|
||||||
|
|
||||||
|
- add helper logic to identify `registry.k8s.io`
|
||||||
|
- add helper logic to derive the duplicate-segment manifest path for eligible
|
||||||
|
requests
|
||||||
|
|
||||||
|
### `internal/proxy/handler.go`
|
||||||
|
|
||||||
|
- add a targeted retry path after the first upstream response is received and
|
||||||
|
before the response is finalized
|
||||||
|
- keep the retry count fixed at one alternate attempt
|
||||||
|
- complete the normal auth retry flow before evaluating fallback eligibility
|
||||||
|
- persist the effective upstream path when fallback succeeds so future cache
|
||||||
|
revalidation can target the path that actually produced the cached content
|
||||||
|
|
||||||
|
The retry should reuse the same auth, request method, response handling, and
|
||||||
|
streaming behavior as the normal upstream path.
|
||||||
|
|
||||||
|
### Cache metadata
|
||||||
|
|
||||||
|
- extend cached metadata for fallback-derived entries with an optional effective
|
||||||
|
upstream path field
|
||||||
|
- when a cached entry was populated through fallback, revalidate it against that
|
||||||
|
effective upstream path rather than the original client-facing path
|
||||||
|
- do not apply the fallback derivation again during revalidation if an effective
|
||||||
|
upstream path is already recorded
|
||||||
|
|
||||||
|
## Caching behavior
|
||||||
|
|
||||||
|
- first successful response remains cacheable under the locator path that served
|
||||||
|
the client request
|
||||||
|
- if fallback succeeds, cache the successful fallback response under the client
|
||||||
|
request path so the next identical client request can hit cache directly
|
||||||
|
- also persist the effective upstream path used to fill that cache entry
|
||||||
|
- do not cache the failed first attempt separately
|
||||||
|
|
||||||
|
This preserves the client-visible repository path while allowing the proxy to
|
||||||
|
internally source content from the alternate upstream location.
|
||||||
|
|
||||||
|
## Error handling
|
||||||
|
|
||||||
|
- if fallback path derivation is not possible, return the original response
|
||||||
|
- if the fallback request errors at the transport layer, log the retry attempt
|
||||||
|
and return the existing proxy error response path
|
||||||
|
- never loop between variants
|
||||||
|
|
||||||
|
## Logging
|
||||||
|
|
||||||
|
Add one targeted structured log event for visibility when the fallback is used,
|
||||||
|
including:
|
||||||
|
|
||||||
|
- hub name
|
||||||
|
- domain
|
||||||
|
- upstream host
|
||||||
|
- original path
|
||||||
|
- fallback path
|
||||||
|
- original status
|
||||||
|
- request method
|
||||||
|
|
||||||
|
This makes it easy to confirm that `registry.k8s.io` compatibility is active
|
||||||
|
without changing normal logging volume for unrelated registries.
|
||||||
|
|
||||||
|
## Tests
|
||||||
|
|
||||||
|
### Unit tests
|
||||||
|
|
||||||
|
Add docker hook tests for:
|
||||||
|
|
||||||
|
- recognizing `registry.k8s.io`
|
||||||
|
- deriving `/v2/coredns/manifests/v1.13.1` to
|
||||||
|
`/v2/coredns/coredns/manifests/v1.13.1`
|
||||||
|
- refusing fallback for already multi-segment repositories
|
||||||
|
- refusing fallback for non-`registry.k8s.io` hosts
|
||||||
|
- refusing fallback for non-manifest paths
|
||||||
|
|
||||||
|
### Integration tests
|
||||||
|
|
||||||
|
Add proxy tests for:
|
||||||
|
|
||||||
|
- original path returns `404`, fallback path returns `200` -> final client
|
||||||
|
response is `200`
|
||||||
|
- original path returns `200` -> fallback is not attempted
|
||||||
|
- non-`registry.k8s.io` upstream returns `404` -> fallback is not attempted
|
||||||
|
- fallback succeeds, second identical request is served from cache without
|
||||||
|
re-fetching the original `404` path
|
||||||
|
- fallback-derived cached entry revalidates against the recorded effective
|
||||||
|
upstream path instead of the original client-facing path
|
||||||
|
|
||||||
|
## Success criteria
|
||||||
|
|
||||||
|
- `k8s.hub.ipao.vip/coredns:v1.13.1` can succeed through fallback when the
|
||||||
|
upstream only exposes `coredns/coredns`
|
||||||
|
- existing Docker Hub behavior remains unchanged
|
||||||
|
- existing `registry.k8s.io` paths that already work continue to use the
|
||||||
|
original path first
|
||||||
|
- fallback is isolated to `registry.k8s.io`
|
||||||
67
internal/cache/fs_store.go
vendored
67
internal/cache/fs_store.go
vendored
@@ -2,6 +2,7 @@ package cache
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@@ -49,6 +50,10 @@ type entryLock struct {
|
|||||||
refs int
|
refs int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type entryMetadata struct {
|
||||||
|
EffectiveUpstreamPath string `json:"effective_upstream_path,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
func (s *fileStore) Get(ctx context.Context, locator Locator) (*ReadResult, error) {
|
func (s *fileStore) Get(ctx context.Context, locator Locator) (*ReadResult, error) {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
@@ -86,6 +91,12 @@ func (s *fileStore) Get(ctx context.Context, locator Locator) (*ReadResult, erro
|
|||||||
SizeBytes: info.Size(),
|
SizeBytes: info.Size(),
|
||||||
ModTime: info.ModTime(),
|
ModTime: info.ModTime(),
|
||||||
}
|
}
|
||||||
|
if metadata, err := s.readMetadata(filePath); err == nil {
|
||||||
|
entry.EffectiveUpstreamPath = metadata.EffectiveUpstreamPath
|
||||||
|
} else if !errors.Is(err, fs.ErrNotExist) {
|
||||||
|
file.Close()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
return &ReadResult{Entry: entry, Reader: file}, nil
|
return &ReadResult{Entry: entry, Reader: file}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -133,12 +144,16 @@ func (s *fileStore) Put(ctx context.Context, locator Locator, body io.Reader, op
|
|||||||
if err := os.Chtimes(filePath, modTime, modTime); err != nil {
|
if err := os.Chtimes(filePath, modTime, modTime); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if err := s.writeMetadata(filePath, opts.EffectiveUpstreamPath); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
entry := Entry{
|
entry := Entry{
|
||||||
Locator: locator,
|
Locator: locator,
|
||||||
FilePath: filePath,
|
FilePath: filePath,
|
||||||
SizeBytes: written,
|
SizeBytes: written,
|
||||||
ModTime: modTime,
|
ModTime: modTime,
|
||||||
|
EffectiveUpstreamPath: opts.EffectiveUpstreamPath,
|
||||||
}
|
}
|
||||||
return &entry, nil
|
return &entry, nil
|
||||||
}
|
}
|
||||||
@@ -157,6 +172,54 @@ func (s *fileStore) Remove(ctx context.Context, locator Locator) error {
|
|||||||
if err := os.Remove(filePath); err != nil && !errors.Is(err, fs.ErrNotExist) {
|
if err := os.Remove(filePath); err != nil && !errors.Is(err, fs.ErrNotExist) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if err := os.Remove(metadataPath(filePath)); err != nil && !errors.Is(err, fs.ErrNotExist) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *fileStore) readMetadata(filePath string) (entryMetadata, error) {
|
||||||
|
raw, err := os.ReadFile(metadataPath(filePath))
|
||||||
|
if err != nil {
|
||||||
|
return entryMetadata{}, err
|
||||||
|
}
|
||||||
|
var metadata entryMetadata
|
||||||
|
if err := json.Unmarshal(raw, &metadata); err != nil {
|
||||||
|
return entryMetadata{}, err
|
||||||
|
}
|
||||||
|
return metadata, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *fileStore) writeMetadata(filePath string, effectiveUpstreamPath string) error {
|
||||||
|
metaFilePath := metadataPath(filePath)
|
||||||
|
if effectiveUpstreamPath == "" {
|
||||||
|
if err := os.Remove(metaFilePath); err != nil && !errors.Is(err, fs.ErrNotExist) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
data, err := json.Marshal(entryMetadata{EffectiveUpstreamPath: effectiveUpstreamPath})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
tempFile, err := os.CreateTemp(filepath.Dir(metaFilePath), ".cache-meta-*")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
tempName := tempFile.Name()
|
||||||
|
if _, err := tempFile.Write(data); err != nil {
|
||||||
|
tempFile.Close()
|
||||||
|
_ = os.Remove(tempName)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := tempFile.Close(); err != nil {
|
||||||
|
_ = os.Remove(tempName)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := os.Rename(tempName, metaFilePath); err != nil {
|
||||||
|
_ = os.Remove(tempName)
|
||||||
|
return err
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -248,3 +311,7 @@ func copyWithContext(ctx context.Context, dst io.Writer, src io.Reader) (int64,
|
|||||||
func locatorKey(locator Locator) string {
|
func locatorKey(locator Locator) string {
|
||||||
return locator.HubName + "::" + locator.Path
|
return locator.HubName + "::" + locator.Path
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func metadataPath(filePath string) string {
|
||||||
|
return filePath + ".meta"
|
||||||
|
}
|
||||||
|
|||||||
4
internal/cache/store.go
vendored
4
internal/cache/store.go
vendored
@@ -27,6 +27,7 @@ type Store interface {
|
|||||||
// PutOptions 控制写入过程中的可选属性。
|
// PutOptions 控制写入过程中的可选属性。
|
||||||
type PutOptions struct {
|
type PutOptions struct {
|
||||||
ModTime time.Time
|
ModTime time.Time
|
||||||
|
EffectiveUpstreamPath string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Locator 唯一定位一个缓存条目(Hub + 相对路径),所有路径均为 URL 路径风格。
|
// Locator 唯一定位一个缓存条目(Hub + 相对路径),所有路径均为 URL 路径风格。
|
||||||
@@ -40,7 +41,8 @@ type Entry struct {
|
|||||||
Locator Locator `json:"locator"`
|
Locator Locator `json:"locator"`
|
||||||
FilePath string `json:"file_path"`
|
FilePath string `json:"file_path"`
|
||||||
SizeBytes int64 `json:"size_bytes"`
|
SizeBytes int64 `json:"size_bytes"`
|
||||||
ModTime time.Time
|
ModTime time.Time `json:"mod_time"`
|
||||||
|
EffectiveUpstreamPath string `json:"effective_upstream_path,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadResult 组合 Entry 与正文 Reader,便于代理层直接将 Body 流式返回。
|
// ReadResult 组合 Entry 与正文 Reader,便于代理层直接将 Body 流式返回。
|
||||||
|
|||||||
23
internal/cache/store_test.go
vendored
23
internal/cache/store_test.go
vendored
@@ -5,6 +5,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@@ -62,6 +63,28 @@ func TestStoreRemove(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStorePersistsEffectiveUpstreamPath(t *testing.T) {
|
||||||
|
store := newTestStore(t)
|
||||||
|
locator := Locator{HubName: "docker", Path: "/v2/coredns/manifests/v1.13.1"}
|
||||||
|
|
||||||
|
_, err := store.Put(context.Background(), locator, strings.NewReader("body"), PutOptions{
|
||||||
|
EffectiveUpstreamPath: "/v2/coredns/coredns/manifests/v1.13.1",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("put error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := store.Get(context.Background(), locator)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("get error: %v", err)
|
||||||
|
}
|
||||||
|
defer result.Reader.Close()
|
||||||
|
|
||||||
|
if result.Entry.EffectiveUpstreamPath != "/v2/coredns/coredns/manifests/v1.13.1" {
|
||||||
|
t.Fatalf("unexpected effective upstream path: %q", result.Entry.EffectiveUpstreamPath)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestStoreIgnoresDirectories(t *testing.T) {
|
func TestStoreIgnoresDirectories(t *testing.T) {
|
||||||
store := newTestStore(t)
|
store := newTestStore(t)
|
||||||
locator := Locator{HubName: "ghcr", Path: "/v2"}
|
locator := Locator{HubName: "ghcr", Path: "/v2"}
|
||||||
|
|||||||
@@ -69,6 +69,28 @@ func isDockerHubHost(host string) bool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isRegistryK8sHost(host string) bool {
|
||||||
|
if parsedHost, _, err := net.SplitHostPort(host); err == nil {
|
||||||
|
host = parsedHost
|
||||||
|
}
|
||||||
|
return strings.EqualFold(host, "registry.k8s.io")
|
||||||
|
}
|
||||||
|
|
||||||
|
func manifestFallbackPath(ctx *hooks.RequestContext, clean string) (string, bool) {
|
||||||
|
if ctx == nil || !isRegistryK8sHost(ctx.UpstreamHost) {
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
repo, rest, ok := splitDockerRepoPath(clean)
|
||||||
|
if !ok || strings.Contains(repo, "/") || !strings.HasPrefix(rest, "/manifests/") {
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
return "/v2/" + repo + "/" + repo + rest, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func RegistryK8sManifestFallbackPath(ctx *hooks.RequestContext, clean string) (string, bool) {
|
||||||
|
return manifestFallbackPath(ctx, clean)
|
||||||
|
}
|
||||||
|
|
||||||
func splitDockerRepoPath(path string) (string, string, bool) {
|
func splitDockerRepoPath(path string) (string, string, bool) {
|
||||||
if !strings.HasPrefix(path, "/v2/") {
|
if !strings.HasPrefix(path, "/v2/") {
|
||||||
return "", "", false
|
return "", "", false
|
||||||
|
|||||||
@@ -27,6 +27,44 @@ func TestNormalizePathSkipsLibraryForNonDockerHub(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIsRegistryK8sHost(t *testing.T) {
|
||||||
|
if !isRegistryK8sHost("registry.k8s.io") {
|
||||||
|
t.Fatalf("expected registry.k8s.io to match")
|
||||||
|
}
|
||||||
|
if isRegistryK8sHost("example.com") {
|
||||||
|
t.Fatalf("expected non-registry.k8s.io host to be ignored")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegistryK8sManifestFallbackPath(t *testing.T) {
|
||||||
|
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
|
||||||
|
path, ok := manifestFallbackPath(ctx, "/v2/coredns/manifests/v1.13.1")
|
||||||
|
if !ok || path != "/v2/coredns/coredns/manifests/v1.13.1" {
|
||||||
|
t.Fatalf("expected fallback path, got %q ok=%v", path, ok)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegistryK8sManifestFallbackPathRejectsMultiSegmentRepo(t *testing.T) {
|
||||||
|
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
|
||||||
|
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/coredns/manifests/v1.13.1"); ok {
|
||||||
|
t.Fatalf("expected multi-segment repo to be ignored")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegistryK8sManifestFallbackPathRejectsNonManifest(t *testing.T) {
|
||||||
|
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
|
||||||
|
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/blobs/sha256:deadbeef"); ok {
|
||||||
|
t.Fatalf("expected non-manifest path to be ignored")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegistryK8sManifestFallbackPathRejectsNonRegistryHost(t *testing.T) {
|
||||||
|
ctx := &hooks.RequestContext{UpstreamHost: "mirror.gcr.io"}
|
||||||
|
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/manifests/v1.13.1"); ok {
|
||||||
|
t.Fatalf("expected non-registry.k8s.io host to be ignored")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestSplitDockerRepoPath(t *testing.T) {
|
func TestSplitDockerRepoPath(t *testing.T) {
|
||||||
repo, rest, ok := splitDockerRepoPath("/v2/library/nginx/manifests/latest")
|
repo, rest, ok := splitDockerRepoPath("/v2/library/nginx/manifests/latest")
|
||||||
if !ok || repo != "library/nginx" || rest != "/manifests/latest" {
|
if !ok || repo != "library/nginx" || rest != "/manifests/latest" {
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ import (
|
|||||||
|
|
||||||
"github.com/any-hub/any-hub/internal/cache"
|
"github.com/any-hub/any-hub/internal/cache"
|
||||||
"github.com/any-hub/any-hub/internal/hubmodule"
|
"github.com/any-hub/any-hub/internal/hubmodule"
|
||||||
|
dockermodule "github.com/any-hub/any-hub/internal/hubmodule/docker"
|
||||||
"github.com/any-hub/any-hub/internal/logging"
|
"github.com/any-hub/any-hub/internal/logging"
|
||||||
"github.com/any-hub/any-hub/internal/proxy/hooks"
|
"github.com/any-hub/any-hub/internal/proxy/hooks"
|
||||||
"github.com/any-hub/any-hub/internal/server"
|
"github.com/any-hub/any-hub/internal/server"
|
||||||
@@ -218,7 +219,7 @@ func (h *Handler) serveCache(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if shouldRevalidate {
|
if shouldRevalidate {
|
||||||
if resp, err := h.revalidateRequest(c, route, resolveUpstreamURL(route, route.UpstreamURL, c, hook), result.Entry.Locator, ""); err == nil {
|
if resp, err := h.revalidateRequest(c, route, effectiveRevalidateURL(route, c, result.Entry, hook), result.Entry.Locator, ""); err == nil {
|
||||||
resp.Body.Close()
|
resp.Body.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -259,6 +260,17 @@ func (h *Handler) fetchAndStream(
|
|||||||
h.logResult(route, upstreamURL.String(), requestID, 0, false, started, err)
|
h.logResult(route, upstreamURL.String(), requestID, 0, false, started, err)
|
||||||
return h.writeError(c, fiber.StatusBadGateway, "upstream_failed")
|
return h.writeError(c, fiber.StatusBadGateway, "upstream_failed")
|
||||||
}
|
}
|
||||||
|
effectiveUpstreamPath := ""
|
||||||
|
originalStatus := resp.StatusCode
|
||||||
|
originalPath := ""
|
||||||
|
if hook != nil {
|
||||||
|
originalPath = hook.clean
|
||||||
|
}
|
||||||
|
resp, upstreamURL, effectiveUpstreamPath, err = h.retryRegistryK8sManifestFallback(c, route, requestID, resp, upstreamURL, hook, originalStatus, originalPath)
|
||||||
|
if err != nil {
|
||||||
|
h.logResult(route, upstreamURL.String(), requestID, 0, false, started, err)
|
||||||
|
return h.writeError(c, fiber.StatusBadGateway, "upstream_failed")
|
||||||
|
}
|
||||||
if hook != nil && hook.hasHooks && hook.def.RewriteResponse != nil {
|
if hook != nil && hook.hasHooks && hook.def.RewriteResponse != nil {
|
||||||
if rewritten, rewriteErr := applyHookRewrite(hook, resp, requestPath(c)); rewriteErr == nil {
|
if rewritten, rewriteErr := applyHookRewrite(hook, resp, requestPath(c)); rewriteErr == nil {
|
||||||
resp = rewritten
|
resp = rewritten
|
||||||
@@ -273,7 +285,7 @@ func (h *Handler) fetchAndStream(
|
|||||||
|
|
||||||
shouldStore := policy.allowStore && writer.Enabled() && isCacheableStatus(resp.StatusCode) &&
|
shouldStore := policy.allowStore && writer.Enabled() && isCacheableStatus(resp.StatusCode) &&
|
||||||
c.Method() == http.MethodGet
|
c.Method() == http.MethodGet
|
||||||
return h.consumeUpstream(c, route, locator, resp, shouldStore, writer, requestID, started, ctx)
|
return h.consumeUpstream(c, route, locator, resp, shouldStore, writer, requestID, started, ctx, effectiveUpstreamPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
func applyHookRewrite(hook *hookState, resp *http.Response, path string) (*http.Response, error) {
|
func applyHookRewrite(hook *hookState, resp *http.Response, path string) (*http.Response, error) {
|
||||||
@@ -325,13 +337,14 @@ func (h *Handler) consumeUpstream(
|
|||||||
requestID string,
|
requestID string,
|
||||||
started time.Time,
|
started time.Time,
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
effectiveUpstreamPath string,
|
||||||
) error {
|
) error {
|
||||||
upstreamURL := resp.Request.URL.String()
|
upstreamURL := resp.Request.URL.String()
|
||||||
method := c.Method()
|
method := c.Method()
|
||||||
authFailure := isAuthFailure(resp.StatusCode) && route.Config.HasCredentials()
|
authFailure := isAuthFailure(resp.StatusCode) && route.Config.HasCredentials()
|
||||||
|
|
||||||
if shouldStore {
|
if shouldStore {
|
||||||
return h.cacheAndStream(c, route, locator, resp, writer, requestID, started, ctx, upstreamURL)
|
return h.cacheAndStream(c, route, locator, resp, writer, requestID, started, ctx, upstreamURL, effectiveUpstreamPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
copyResponseHeaders(c, resp.Header)
|
copyResponseHeaders(c, resp.Header)
|
||||||
@@ -369,6 +382,7 @@ func (h *Handler) cacheAndStream(
|
|||||||
started time.Time,
|
started time.Time,
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
upstreamURL string,
|
upstreamURL string,
|
||||||
|
effectiveUpstreamPath string,
|
||||||
) error {
|
) error {
|
||||||
copyResponseHeaders(c, resp.Header)
|
copyResponseHeaders(c, resp.Header)
|
||||||
c.Set("X-Any-Hub-Upstream", upstreamURL)
|
c.Set("X-Any-Hub-Upstream", upstreamURL)
|
||||||
@@ -381,7 +395,7 @@ func (h *Handler) cacheAndStream(
|
|||||||
// 使用 TeeReader 边向客户端回写边落盘,避免大文件在内存中完整缓冲。
|
// 使用 TeeReader 边向客户端回写边落盘,避免大文件在内存中完整缓冲。
|
||||||
reader := io.TeeReader(resp.Body, c.Response().BodyWriter())
|
reader := io.TeeReader(resp.Body, c.Response().BodyWriter())
|
||||||
|
|
||||||
opts := cache.PutOptions{ModTime: extractModTime(resp.Header)}
|
opts := cache.PutOptions{ModTime: extractModTime(resp.Header), EffectiveUpstreamPath: effectiveUpstreamPath}
|
||||||
entry, err := writer.Put(ctx, locator, reader, opts)
|
entry, err := writer.Put(ctx, locator, reader, opts)
|
||||||
h.logResult(route, upstreamURL, requestID, resp.StatusCode, false, started, err)
|
h.logResult(route, upstreamURL, requestID, resp.StatusCode, false, started, err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -774,7 +788,7 @@ func (h *Handler) isCacheFresh(
|
|||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
}
|
}
|
||||||
|
|
||||||
upstreamURL := resolveUpstreamURL(route, route.UpstreamURL, c, hook)
|
upstreamURL := effectiveRevalidateURL(route, c, entry, hook)
|
||||||
resp, err := h.revalidateRequest(c, route, upstreamURL, locator, "")
|
resp, err := h.revalidateRequest(c, route, upstreamURL, locator, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
@@ -825,6 +839,77 @@ func (h *Handler) isCacheFresh(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func effectiveRevalidateURL(route *server.HubRoute, c fiber.Ctx, entry cache.Entry, hook *hookState) *url.URL {
|
||||||
|
if route == nil || route.UpstreamURL == nil || entry.EffectiveUpstreamPath == "" {
|
||||||
|
return resolveUpstreamURL(route, route.UpstreamURL, c, hook)
|
||||||
|
}
|
||||||
|
clone := *route.UpstreamURL
|
||||||
|
clone.Path = entry.EffectiveUpstreamPath
|
||||||
|
clone.RawPath = entry.EffectiveUpstreamPath
|
||||||
|
return &clone
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) retryRegistryK8sManifestFallback(
|
||||||
|
c fiber.Ctx,
|
||||||
|
route *server.HubRoute,
|
||||||
|
requestID string,
|
||||||
|
resp *http.Response,
|
||||||
|
upstreamURL *url.URL,
|
||||||
|
hook *hookState,
|
||||||
|
originalStatus int,
|
||||||
|
originalPath string,
|
||||||
|
) (*http.Response, *url.URL, string, error) {
|
||||||
|
if resp == nil || resp.StatusCode != http.StatusNotFound || hook == nil || hook.ctx == nil {
|
||||||
|
return resp, upstreamURL, "", nil
|
||||||
|
}
|
||||||
|
fallbackPath, ok := dockermodule.RegistryK8sManifestFallbackPath(hook.ctx, hook.clean)
|
||||||
|
if !ok {
|
||||||
|
return resp, upstreamURL, "", nil
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
fallbackHook := *hook
|
||||||
|
fallbackHook.clean = fallbackPath
|
||||||
|
fallbackResp, fallbackURL, err := h.executeRequest(c, route, &fallbackHook)
|
||||||
|
if err != nil {
|
||||||
|
return nil, upstreamURL, "", err
|
||||||
|
}
|
||||||
|
fallbackResp, fallbackURL, err = h.retryOnAuthFailure(c, route, requestID, time.Now(), fallbackResp, fallbackURL, &fallbackHook)
|
||||||
|
if err != nil {
|
||||||
|
return nil, upstreamURL, "", err
|
||||||
|
}
|
||||||
|
h.logRegistryK8sFallback(route, requestID, originalPath, fallbackPath, originalStatus, c.Method())
|
||||||
|
if fallbackResp.StatusCode == http.StatusOK {
|
||||||
|
return fallbackResp, fallbackURL, fallbackPath, nil
|
||||||
|
}
|
||||||
|
return fallbackResp, fallbackURL, "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) logRegistryK8sFallback(route *server.HubRoute, requestID string, originalPath string, fallbackPath string, originalStatus int, method string) {
|
||||||
|
if route == nil || h == nil || h.logger == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fields := logging.RequestFields(
|
||||||
|
route.Config.Name,
|
||||||
|
route.Config.Domain,
|
||||||
|
route.Config.Type,
|
||||||
|
route.Config.AuthMode(),
|
||||||
|
route.Module.Key,
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
fields["action"] = "proxy_fallback"
|
||||||
|
if route.UpstreamURL != nil {
|
||||||
|
fields["upstream_host"] = route.UpstreamURL.Hostname()
|
||||||
|
}
|
||||||
|
fields["original_path"] = originalPath
|
||||||
|
fields["fallback_path"] = fallbackPath
|
||||||
|
fields["original_status"] = originalStatus
|
||||||
|
fields["method"] = method
|
||||||
|
if requestID != "" {
|
||||||
|
fields["request_id"] = requestID
|
||||||
|
}
|
||||||
|
h.logger.WithFields(fields).Info("proxy_registry_k8s_fallback")
|
||||||
|
}
|
||||||
|
|
||||||
func (h *Handler) revalidateRequest(
|
func (h *Handler) revalidateRequest(
|
||||||
c fiber.Ctx,
|
c fiber.Ctx,
|
||||||
route *server.HubRoute,
|
route *server.HubRoute,
|
||||||
|
|||||||
55
internal/proxy/handler_test.go
Normal file
55
internal/proxy/handler_test.go
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"net/url"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/any-hub/any-hub/internal/config"
|
||||||
|
"github.com/any-hub/any-hub/internal/hubmodule"
|
||||||
|
"github.com/any-hub/any-hub/internal/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRegistryK8sManifestFallbackLogsStructuredEvent(t *testing.T) {
|
||||||
|
buf := new(bytes.Buffer)
|
||||||
|
logger := logrus.New()
|
||||||
|
logger.SetOutput(buf)
|
||||||
|
logger.SetFormatter(&logrus.JSONFormatter{})
|
||||||
|
|
||||||
|
upstreamURL, err := url.Parse("http://registry.k8s.io")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("parse upstream url: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
h := NewHandler(nil, logger, nil)
|
||||||
|
route := &server.HubRoute{
|
||||||
|
Config: config.HubConfig{
|
||||||
|
Name: "docker",
|
||||||
|
Domain: "k8s.hub.local",
|
||||||
|
Type: "docker",
|
||||||
|
},
|
||||||
|
Module: hubmodule.ModuleMetadata{Key: "docker"},
|
||||||
|
UpstreamURL: upstreamURL,
|
||||||
|
}
|
||||||
|
|
||||||
|
h.logRegistryK8sFallback(route, "req-1", "/v2/coredns/manifests/v1.13.1", "/v2/coredns/coredns/manifests/v1.13.1", 404, "GET")
|
||||||
|
|
||||||
|
output := buf.String()
|
||||||
|
for _, want := range []string{
|
||||||
|
"proxy_registry_k8s_fallback",
|
||||||
|
`"hub":"docker"`,
|
||||||
|
`"domain":"k8s.hub.local"`,
|
||||||
|
`"upstream_host":"registry.k8s.io"`,
|
||||||
|
`"original_path":"/v2/coredns/manifests/v1.13.1"`,
|
||||||
|
`"fallback_path":"/v2/coredns/coredns/manifests/v1.13.1"`,
|
||||||
|
`"original_status":404`,
|
||||||
|
`"method":"GET"`,
|
||||||
|
} {
|
||||||
|
if !strings.Contains(output, want) {
|
||||||
|
t.Fatalf("expected log output to contain %s, got %s", want, output)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -26,6 +26,8 @@ import (
|
|||||||
const (
|
const (
|
||||||
dockerManifestPath = "/v2/library/cache-flow/manifests/latest"
|
dockerManifestPath = "/v2/library/cache-flow/manifests/latest"
|
||||||
dockerManifestNoNamespacePath = "/v2/cache-flow/manifests/latest"
|
dockerManifestNoNamespacePath = "/v2/cache-flow/manifests/latest"
|
||||||
|
registryK8sOriginalManifest = "/v2/coredns/manifests/v1.13.1"
|
||||||
|
registryK8sFallbackManifest = "/v2/coredns/coredns/manifests/v1.13.1"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestCacheFlowWithConditionalRequest(t *testing.T) {
|
func TestCacheFlowWithConditionalRequest(t *testing.T) {
|
||||||
@@ -286,6 +288,140 @@ func TestDockerNonDockerHubUpstreamKeepsOriginalPath(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRegistryK8sManifestFallbackRetry(t *testing.T) {
|
||||||
|
stub := newRegistryK8sStub(t)
|
||||||
|
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
|
||||||
|
defer stub.Close()
|
||||||
|
|
||||||
|
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
|
||||||
|
req.Host = "k8s.hub.local"
|
||||||
|
resp, err := app.Test(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("app.Test failed: %v", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != fiber.StatusOK {
|
||||||
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
t.Fatalf("expected 200, got %d (body=%s)", resp.StatusCode, string(body))
|
||||||
|
}
|
||||||
|
if stub.pathHits(registryK8sOriginalManifest, http.MethodGet) != 1 {
|
||||||
|
t.Fatalf("expected single original manifest hit")
|
||||||
|
}
|
||||||
|
if stub.pathHits(registryK8sFallbackManifest, http.MethodGet) != 1 {
|
||||||
|
t.Fatalf("expected single fallback manifest hit")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegistryK8sManifestFallbackNotAttemptedWhenOriginalSucceeds(t *testing.T) {
|
||||||
|
stub := newRegistryK8sStub(t)
|
||||||
|
stub.setResponse(registryK8sOriginalManifest, http.StatusOK, []byte("original manifest"))
|
||||||
|
defer stub.Close()
|
||||||
|
|
||||||
|
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
|
||||||
|
req.Host = "k8s.hub.local"
|
||||||
|
resp, err := app.Test(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("app.Test failed: %v", err)
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != fiber.StatusOK {
|
||||||
|
t.Fatalf("expected 200, got %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
if stub.pathHits(registryK8sFallbackManifest, http.MethodGet) != 0 {
|
||||||
|
t.Fatalf("expected no fallback hit")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegistryK8sManifestFallbackNotAttemptedForNonRegistryHost(t *testing.T) {
|
||||||
|
stub := newRegistryK8sStub(t)
|
||||||
|
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
|
||||||
|
defer stub.Close()
|
||||||
|
|
||||||
|
app := newDockerProxyTestApp(t, t.TempDir(), "mirror.hub.local", stub.URL)
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "http://mirror.hub.local"+registryK8sOriginalManifest, nil)
|
||||||
|
req.Host = "mirror.hub.local"
|
||||||
|
resp, err := app.Test(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("app.Test failed: %v", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != fiber.StatusNotFound {
|
||||||
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
t.Fatalf("expected 404, got %d (body=%s)", resp.StatusCode, string(body))
|
||||||
|
}
|
||||||
|
if stub.pathHits(registryK8sFallbackManifest, http.MethodGet) != 0 {
|
||||||
|
t.Fatalf("expected no fallback hit for non-registry host")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegistryK8sManifestFallbackSecondRequestHitsCache(t *testing.T) {
|
||||||
|
stub := newRegistryK8sStub(t)
|
||||||
|
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
|
||||||
|
defer stub.Close()
|
||||||
|
|
||||||
|
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
|
||||||
|
|
||||||
|
makeRequest := func() *http.Response {
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
|
||||||
|
req.Host = "k8s.hub.local"
|
||||||
|
resp, err := app.Test(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("app.Test failed: %v", err)
|
||||||
|
}
|
||||||
|
return resp
|
||||||
|
}
|
||||||
|
|
||||||
|
resp1 := makeRequest()
|
||||||
|
resp1.Body.Close()
|
||||||
|
resp2 := makeRequest()
|
||||||
|
defer resp2.Body.Close()
|
||||||
|
|
||||||
|
if resp2.Header.Get("X-Any-Hub-Cache-Hit") != "true" {
|
||||||
|
t.Fatalf("expected second request to hit cache")
|
||||||
|
}
|
||||||
|
if stub.pathHits(registryK8sOriginalManifest, http.MethodGet) != 1 {
|
||||||
|
t.Fatalf("expected original 404 path to be requested once, got %d", stub.pathHits(registryK8sOriginalManifest, http.MethodGet))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegistryK8sFallbackCacheRevalidatesEffectivePath(t *testing.T) {
|
||||||
|
stub := newRegistryK8sStub(t)
|
||||||
|
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
|
||||||
|
defer stub.Close()
|
||||||
|
|
||||||
|
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
|
||||||
|
|
||||||
|
makeRequest := func() *http.Response {
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
|
||||||
|
req.Host = "k8s.hub.local"
|
||||||
|
resp, err := app.Test(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("app.Test failed: %v", err)
|
||||||
|
}
|
||||||
|
return resp
|
||||||
|
}
|
||||||
|
|
||||||
|
resp1 := makeRequest()
|
||||||
|
resp1.Body.Close()
|
||||||
|
resp2 := makeRequest()
|
||||||
|
resp2.Body.Close()
|
||||||
|
|
||||||
|
if stub.pathHits(registryK8sOriginalManifest, http.MethodHead) != 0 {
|
||||||
|
t.Fatalf("expected no HEAD revalidation against original path, got %d", stub.pathHits(registryK8sOriginalManifest, http.MethodHead))
|
||||||
|
}
|
||||||
|
if stub.pathHits(registryK8sFallbackManifest, http.MethodHead) == 0 {
|
||||||
|
t.Fatalf("expected HEAD revalidation against fallback path")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type cacheFlowStub struct {
|
type cacheFlowStub struct {
|
||||||
server *http.Server
|
server *http.Server
|
||||||
listener net.Listener
|
listener net.Listener
|
||||||
@@ -300,6 +436,22 @@ type cacheFlowStub struct {
|
|||||||
lastMod string
|
lastMod string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type registryK8sStub struct {
|
||||||
|
server *http.Server
|
||||||
|
listener net.Listener
|
||||||
|
URL string
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
responses map[string]registryK8sResponse
|
||||||
|
hits map[string]int
|
||||||
|
}
|
||||||
|
|
||||||
|
type registryK8sResponse struct {
|
||||||
|
status int
|
||||||
|
body []byte
|
||||||
|
etag string
|
||||||
|
}
|
||||||
|
|
||||||
func newCacheFlowStub(t *testing.T, paths ...string) *cacheFlowStub {
|
func newCacheFlowStub(t *testing.T, paths ...string) *cacheFlowStub {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
stub := &cacheFlowStub{
|
stub := &cacheFlowStub{
|
||||||
@@ -335,6 +487,32 @@ func newCacheFlowStub(t *testing.T, paths ...string) *cacheFlowStub {
|
|||||||
return stub
|
return stub
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newRegistryK8sStub(t *testing.T) *registryK8sStub {
|
||||||
|
t.Helper()
|
||||||
|
stub := ®istryK8sStub{
|
||||||
|
responses: make(map[string]registryK8sResponse),
|
||||||
|
hits: make(map[string]int),
|
||||||
|
}
|
||||||
|
mux := http.NewServeMux()
|
||||||
|
mux.HandleFunc("/", stub.handle)
|
||||||
|
|
||||||
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
if err != nil {
|
||||||
|
t.Skipf("unable to start registry.k8s.io stub listener: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
server := &http.Server{Handler: mux}
|
||||||
|
stub.server = server
|
||||||
|
stub.listener = listener
|
||||||
|
stub.URL = "http://" + listener.Addr().String()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
_ = server.Serve(listener)
|
||||||
|
}()
|
||||||
|
|
||||||
|
return stub
|
||||||
|
}
|
||||||
|
|
||||||
func (s *cacheFlowStub) Close() {
|
func (s *cacheFlowStub) Close() {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
@@ -346,6 +524,109 @@ func (s *cacheFlowStub) Close() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *registryK8sStub) Close() {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
defer cancel()
|
||||||
|
if s.server != nil {
|
||||||
|
_ = s.server.Shutdown(ctx)
|
||||||
|
}
|
||||||
|
if s.listener != nil {
|
||||||
|
_ = s.listener.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *registryK8sStub) setResponse(path string, status int, body []byte) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
s.responses[path] = registryK8sResponse{
|
||||||
|
status: status,
|
||||||
|
body: append([]byte(nil), body...),
|
||||||
|
etag: fmt.Sprintf(`"%s-etag"`, strings.ReplaceAll(strings.Trim(path, "/"), "/", "-")),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *registryK8sStub) pathHits(path string, method string) int {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
return s.hits[method+" "+path]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *registryK8sStub) handle(w http.ResponseWriter, r *http.Request) {
|
||||||
|
s.mu.Lock()
|
||||||
|
s.hits[r.Method+" "+r.URL.Path]++
|
||||||
|
resp, ok := s.responses[r.URL.Path]
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
_, _ = w.Write([]byte(`{"errors":[{"code":"MANIFEST_UNKNOWN","message":"manifest unknown"}]}`))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/vnd.docker.distribution.manifest.v2+json")
|
||||||
|
w.Header().Set("Etag", resp.etag)
|
||||||
|
w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat))
|
||||||
|
if r.Method == http.MethodHead {
|
||||||
|
for _, candidate := range r.Header.Values("If-None-Match") {
|
||||||
|
if strings.Trim(candidate, `"`) == strings.Trim(resp.etag, `"`) {
|
||||||
|
w.WriteHeader(http.StatusNotModified)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
w.WriteHeader(resp.status)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(resp.status)
|
||||||
|
_, _ = w.Write(resp.body)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newDockerProxyTestApp(t *testing.T, storageDir string, domain string, upstream string, proxyURLs ...string) *fiber.App {
|
||||||
|
t.Helper()
|
||||||
|
proxyURL := ""
|
||||||
|
if len(proxyURLs) > 0 {
|
||||||
|
proxyURL = proxyURLs[0]
|
||||||
|
}
|
||||||
|
cfg := &config.Config{
|
||||||
|
Global: config.GlobalConfig{
|
||||||
|
ListenPort: 5000,
|
||||||
|
CacheTTL: config.Duration(30 * time.Second),
|
||||||
|
StoragePath: storageDir,
|
||||||
|
},
|
||||||
|
Hubs: []config.HubConfig{
|
||||||
|
{
|
||||||
|
Name: "docker",
|
||||||
|
Domain: domain,
|
||||||
|
Type: "docker",
|
||||||
|
Upstream: upstream,
|
||||||
|
Proxy: proxyURL,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
registry, err := server.NewHubRegistry(cfg)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("registry error: %v", err)
|
||||||
|
}
|
||||||
|
logger := logrus.New()
|
||||||
|
logger.SetOutput(io.Discard)
|
||||||
|
store, err := cache.NewStore(storageDir)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("store error: %v", err)
|
||||||
|
}
|
||||||
|
client := server.NewUpstreamClient(cfg)
|
||||||
|
handler := proxy.NewHandler(client, logger, store)
|
||||||
|
app, err := server.NewApp(server.AppOptions{
|
||||||
|
Logger: logger,
|
||||||
|
Registry: registry,
|
||||||
|
Proxy: handler,
|
||||||
|
ListenPort: 5000,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("app error: %v", err)
|
||||||
|
}
|
||||||
|
return app
|
||||||
|
}
|
||||||
|
|
||||||
func (s *cacheFlowStub) handle(w http.ResponseWriter, r *http.Request) {
|
func (s *cacheFlowStub) handle(w http.ResponseWriter, r *http.Request) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
etag := s.etag
|
etag := s.etag
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ type upstreamStub struct {
|
|||||||
requests []RecordedRequest
|
requests []RecordedRequest
|
||||||
mode upstreamMode
|
mode upstreamMode
|
||||||
blobBytes []byte
|
blobBytes []byte
|
||||||
|
lastMod string
|
||||||
}
|
}
|
||||||
|
|
||||||
// RecordedRequest 捕获每次请求的方法/路径/Host/Headers,便于断言代理行为。
|
// RecordedRequest 捕获每次请求的方法/路径/Host/Headers,便于断言代理行为。
|
||||||
@@ -48,13 +49,14 @@ func newUpstreamStub(t *testing.T, mode upstreamMode) *upstreamStub {
|
|||||||
stub := &upstreamStub{
|
stub := &upstreamStub{
|
||||||
mode: mode,
|
mode: mode,
|
||||||
blobBytes: []byte("stub-layer-payload"),
|
blobBytes: []byte("stub-layer-payload"),
|
||||||
|
lastMod: time.Now().UTC().Format(http.TimeFormat),
|
||||||
}
|
}
|
||||||
|
|
||||||
switch mode {
|
switch mode {
|
||||||
case upstreamDocker:
|
case upstreamDocker:
|
||||||
registerDockerHandlers(mux, stub.blobBytes)
|
registerDockerHandlers(mux, stub.blobBytes)
|
||||||
case upstreamNPM:
|
case upstreamNPM:
|
||||||
registerNPMHandlers(mux)
|
registerNPMHandlers(mux, stub)
|
||||||
default:
|
default:
|
||||||
t.Fatalf("unsupported stub mode: %s", mode)
|
t.Fatalf("unsupported stub mode: %s", mode)
|
||||||
}
|
}
|
||||||
@@ -153,10 +155,10 @@ func registerDockerHandlers(mux *http.ServeMux, blob []byte) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func registerNPMHandlers(mux *http.ServeMux) {
|
func registerNPMHandlers(mux *http.ServeMux, stub *upstreamStub) {
|
||||||
mux.HandleFunc("/lodash", func(w http.ResponseWriter, r *http.Request) {
|
mux.HandleFunc("/lodash", func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat))
|
w.Header().Set("Last-Modified", stub.lastMod)
|
||||||
resp := map[string]any{
|
resp := map[string]any{
|
||||||
"name": "lodash",
|
"name": "lodash",
|
||||||
"dist-tags": map[string]string{
|
"dist-tags": map[string]string{
|
||||||
@@ -175,7 +177,7 @@ func registerNPMHandlers(mux *http.ServeMux) {
|
|||||||
|
|
||||||
mux.HandleFunc("/lodash/-/lodash-4.17.21.tgz", func(w http.ResponseWriter, r *http.Request) {
|
mux.HandleFunc("/lodash/-/lodash-4.17.21.tgz", func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Header().Set("Content-Type", "application/octet-stream")
|
w.Header().Set("Content-Type", "application/octet-stream")
|
||||||
w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat))
|
w.Header().Set("Last-Modified", stub.lastMod)
|
||||||
_, _ = w.Write([]byte("tarball-bytes"))
|
_, _ = w.Write([]byte("tarball-bytes"))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -232,6 +234,37 @@ func TestDockerStubServesManifestAndBlob(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNPMStubKeepsStableValidatorsAcrossHeadRequests(t *testing.T) {
|
||||||
|
stub := newUpstreamStub(t, upstreamNPM)
|
||||||
|
defer stub.Close()
|
||||||
|
|
||||||
|
req1, err := http.NewRequest(http.MethodHead, stub.URL+"/lodash", nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("build first HEAD request: %v", err)
|
||||||
|
}
|
||||||
|
resp1, err := http.DefaultClient.Do(req1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("first HEAD failed: %v", err)
|
||||||
|
}
|
||||||
|
resp1.Body.Close()
|
||||||
|
|
||||||
|
time.Sleep(1100 * time.Millisecond)
|
||||||
|
|
||||||
|
req2, err := http.NewRequest(http.MethodHead, stub.URL+"/lodash", nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("build second HEAD request: %v", err)
|
||||||
|
}
|
||||||
|
resp2, err := http.DefaultClient.Do(req2)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("second HEAD failed: %v", err)
|
||||||
|
}
|
||||||
|
resp2.Body.Close()
|
||||||
|
|
||||||
|
if resp1.Header.Get("Last-Modified") != resp2.Header.Get("Last-Modified") {
|
||||||
|
t.Fatalf("expected stable Last-Modified, got %q then %q", resp1.Header.Get("Last-Modified"), resp2.Header.Get("Last-Modified"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestNPMStubServesMetadataAndTarball(t *testing.T) {
|
func TestNPMStubServesMetadataAndTarball(t *testing.T) {
|
||||||
stub := newUpstreamStub(t, upstreamNPM)
|
stub := newUpstreamStub(t, upstreamNPM)
|
||||||
defer stub.Close()
|
defer stub.Close()
|
||||||
|
|||||||
Reference in New Issue
Block a user