6 Commits

Author SHA1 Message Date
47c58ecb69 docs: add registry k8s fallback design and plan
Some checks failed
docker-release / build-and-push (push) Failing after 9m51s
2026-03-23 17:44:18 +08:00
271c8fe538 test: stabilize npm upstream validators 2026-03-23 16:59:36 +08:00
442c27a26d feat: add registry k8s manifest fallback 2026-03-23 16:55:20 +08:00
9c85b9b097 feat: persist cache effective upstream path 2026-03-23 16:24:39 +08:00
7c95ee0210 test: cover registry k8s fallback path derivation 2026-03-23 16:22:28 +08:00
05d5037e97 chore: ignore local worktrees 2026-03-23 16:20:15 +08:00
12 changed files with 1220 additions and 18 deletions

1
.gitignore vendored
View File

@@ -20,6 +20,7 @@ Thumbs.db
.vscode/
.idea/
.cache/
.worktrees/
storage/
logs/
tmp/

View File

@@ -0,0 +1,414 @@
# registry.k8s.io Compatibility Fallback Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Add a `registry.k8s.io`-only manifest fallback so single-segment repos like `coredns` retry once as `coredns/coredns` after a final first-attempt `404`.
**Architecture:** Keep the first upstream request unchanged, then add one targeted retry in the proxy after the normal auth flow completes. Persist the effective upstream path for fallback-filled cache entries so later cache revalidation uses the path that actually produced the content instead of rechecking the original `404` path.
**Tech Stack:** Go 1.25, Fiber v3, internal docker hooks, internal proxy handler, filesystem cache store, Go tests
---
## File map
- Modify: `internal/hubmodule/docker/hooks.go`
- add `registry.k8s.io` host detection and manifest fallback path derivation helpers
- Modify: `internal/hubmodule/docker/hooks_test.go`
- add unit tests for host gating and manifest fallback derivation rules
- Modify: `internal/cache/store.go`
- extend cache entry/options to carry optional effective upstream path metadata
- Modify: `internal/cache/fs_store.go`
- persist and load cache metadata for effective upstream path
- Modify: `internal/cache/store_test.go`
- verify metadata round-trip behavior and remove behavior
- Modify: `internal/proxy/handler.go`
- Modify: `internal/proxy/handler_test.go`
- add one-shot `registry.k8s.io` fallback retry after auth retry flow
- write/read effective upstream path metadata for fallback-backed cache entries
- use effective upstream path during revalidation
- Modify: `tests/integration/cache_flow_test.go`
- add integration coverage for fallback success, no-fallback success, and cache/revalidation behavior
### Task 1: Add docker fallback derivation helpers
**Files:**
- Modify: `internal/hubmodule/docker/hooks.go`
- Test: `internal/hubmodule/docker/hooks_test.go`
- [ ] **Step 1: Write the failing unit tests**
```go
func TestIsRegistryK8sHost(t *testing.T) {
if !isRegistryK8sHost("registry.k8s.io") {
t.Fatalf("expected registry.k8s.io to match")
}
if isRegistryK8sHost("example.com") {
t.Fatalf("expected non-registry.k8s.io host to be ignored")
}
}
func TestRegistryK8sManifestFallbackPath(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
path, ok := manifestFallbackPath(ctx, "/v2/coredns/manifests/v1.13.1")
if !ok || path != "/v2/coredns/coredns/manifests/v1.13.1" {
t.Fatalf("expected fallback path, got %q ok=%v", path, ok)
}
}
func TestRegistryK8sManifestFallbackPathRejectsMultiSegmentRepo(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/coredns/manifests/v1.13.1"); ok {
t.Fatalf("expected multi-segment repo to be ignored")
}
}
func TestRegistryK8sManifestFallbackPathRejectsNonManifest(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/blobs/sha256:deadbeef"); ok {
t.Fatalf("expected non-manifest path to be ignored")
}
}
func TestRegistryK8sManifestFallbackPathRejectsNonRegistryHost(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "mirror.gcr.io"}
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/manifests/v1.13.1"); ok {
t.Fatalf("expected non-registry.k8s.io host to be ignored")
}
}
```
- [ ] **Step 2: Run test to verify it fails**
Run: `go test ./internal/hubmodule/docker -run 'TestIsRegistryK8sHost|TestRegistryK8sManifestFallbackPath' -count=1`
Expected: FAIL because `isRegistryK8sHost` and `manifestFallbackPath` do not exist yet.
- [ ] **Step 3: Write minimal implementation**
```go
func isRegistryK8sHost(host string) bool {
if parsedHost, _, err := net.SplitHostPort(host); err == nil {
host = parsedHost
}
return strings.EqualFold(host, "registry.k8s.io")
}
func manifestFallbackPath(ctx *hooks.RequestContext, clean string) (string, bool) {
if ctx == nil || !isRegistryK8sHost(ctx.UpstreamHost) {
return "", false
}
repo, rest, ok := splitDockerRepoPath(clean)
if !ok || strings.Count(repo, "/") != 0 || !strings.HasPrefix(rest, "/manifests/") {
return "", false
}
return "/v2/" + repo + "/" + repo + rest, true
}
```
- [ ] **Step 4: Run test to verify it passes**
Run: `go test ./internal/hubmodule/docker -count=1`
Expected: PASS
- [ ] **Step 5: Commit**
```bash
git add internal/hubmodule/docker/hooks.go internal/hubmodule/docker/hooks_test.go
git commit -m "test: cover registry k8s fallback path derivation"
```
### Task 2: Persist effective upstream path in cache metadata
**Files:**
- Modify: `internal/cache/store.go`
- Modify: `internal/cache/fs_store.go`
- Test: `internal/cache/store_test.go`
- [ ] **Step 1: Write the failing cache metadata test**
```go
func TestStorePersistsEffectiveUpstreamPath(t *testing.T) {
store, err := NewStore(t.TempDir())
if err != nil {
t.Fatalf("NewStore: %v", err)
}
loc := Locator{HubName: "docker", Path: "/v2/coredns/manifests/v1.13.1"}
_, err = store.Put(context.Background(), loc, strings.NewReader("body"), PutOptions{
EffectiveUpstreamPath: "/v2/coredns/coredns/manifests/v1.13.1",
})
if err != nil {
t.Fatalf("Put: %v", err)
}
got, err := store.Get(context.Background(), loc)
if err != nil {
t.Fatalf("Get: %v", err)
}
if got.Entry.EffectiveUpstreamPath != "/v2/coredns/coredns/manifests/v1.13.1" {
t.Fatalf("unexpected effective path: %q", got.Entry.EffectiveUpstreamPath)
}
}
```
- [ ] **Step 2: Run test to verify it fails**
Run: `go test ./internal/cache -run TestStorePersistsEffectiveUpstreamPath -count=1`
Expected: FAIL because cache entry metadata does not yet include effective upstream path.
- [ ] **Step 3: Write minimal implementation**
```go
type PutOptions struct {
ModTime time.Time
EffectiveUpstreamPath string
}
type Entry struct {
Locator Locator `json:"locator"`
FilePath string `json:"file_path"`
SizeBytes int64 `json:"size_bytes"`
ModTime time.Time `json:"mod_time"`
EffectiveUpstreamPath string `json:"effective_upstream_path,omitempty"`
}
```
Implementation notes:
- store metadata next to the cached body as a small JSON file such as `<entry>.meta`
- on `Get`, load metadata if present and populate `Entry.EffectiveUpstreamPath`
- on `Put`, write metadata atomically only when `EffectiveUpstreamPath` is non-empty
- on `Remove`, delete both body and metadata files
- [ ] **Step 4: Run test to verify it passes**
Run: `go test ./internal/cache -count=1`
Expected: PASS
- [ ] **Step 5: Commit**
```bash
git add internal/cache/store.go internal/cache/fs_store.go internal/cache/store_test.go
git commit -m "feat: persist cache effective upstream path"
```
### Task 3: Add one-shot fallback retry in proxy fetch flow
**Files:**
- Modify: `internal/proxy/handler.go`
- Modify: `internal/hubmodule/docker/hooks.go`
- Test: `tests/integration/cache_flow_test.go`
- [ ] **Step 1: Write the failing integration test for fallback success**
```go
func TestRegistryK8sManifestFallbackRetry(t *testing.T) {
// stub returns 404 for /v2/coredns/manifests/v1.13.1
// stub returns 200 for /v2/coredns/coredns/manifests/v1.13.1
// request /v2/coredns/manifests/v1.13.1 through proxy
// expect 200 and exactly two upstream hits
}
func TestRegistryK8sManifestFallbackNotAttemptedWhenOriginalSucceeds(t *testing.T) {
// stub returns 200 for original path
// request original path
// expect 200 and only one upstream hit
}
func TestRegistryK8sManifestFallbackNotAttemptedForNonRegistryHost(t *testing.T) {
// non-registry.k8s.io upstream returns 404 for original path
// request original path
// expect final 404 and no derived path hit
}
func TestRegistryK8sManifestFallbackSecondRequestHitsCache(t *testing.T) {
// first GET falls back and succeeds
// second identical GET should be cache hit
// expect no new hit to original 404 path on second request
}
```
- [ ] **Step 2: Run test to verify it fails**
Run: `go test ./tests/integration -run 'TestRegistryK8sManifestFallbackRetry|TestRegistryK8sManifestFallbackNotAttemptedWhenOriginalSucceeds|TestRegistryK8sManifestFallbackNotAttemptedForNonRegistryHost|TestRegistryK8sManifestFallbackSecondRequestHitsCache' -count=1`
Expected: FAIL because the fallback tests were written before the retry logic exists.
- [ ] **Step 3: Write minimal implementation**
```go
resp, upstreamURL, err := h.executeRequest(c, route, hook)
resp, upstreamURL, err = h.retryOnAuthFailure(c, route, requestID, started, resp, upstreamURL, hook)
if resp.StatusCode == http.StatusNotFound {
if fallbackHook, fallbackURL, ok := h.registryK8sFallbackAttempt(c, route, hook); ok {
resp.Body.Close()
resp, upstreamURL, err = h.executeRequest(c, route, fallbackHook)
if err == nil {
resp, upstreamURL, err = h.retryOnAuthFailure(c, route, requestID, started, resp, upstreamURL, fallbackHook)
}
fallbackEffectivePath = fallbackURL.Path
}
}
```
Implementation notes:
- in `internal/hubmodule/docker/hooks.go`, keep only pure path helpers such as `isRegistryK8sHost` and `manifestFallbackPath`
- in `internal/proxy/handler.go`, add the retry orchestration helper that clones the current hook state with the derived fallback path when `manifestFallbackPath` returns true
- only evaluate fallback after `retryOnAuthFailure` completes and only for final `404`
- close the first response body before the retry
- if fallback succeeds with `200`, pass `EffectiveUpstreamPath` into cache `PutOptions`
- if fallback returns non-`200`, pass that second upstream response through unchanged
- emit a structured fallback log event from `internal/proxy/handler.go` with hub name, domain, upstream host, original path, fallback path, original status, and request method
- [ ] **Step 4: Run test to verify it passes**
Run: `go test ./tests/integration -run 'TestRegistryK8sManifestFallbackRetry|TestRegistryK8sManifestFallbackNotAttemptedWhenOriginalSucceeds|TestRegistryK8sManifestFallbackNotAttemptedForNonRegistryHost|TestRegistryK8sManifestFallbackSecondRequestHitsCache' -count=1`
Expected: PASS
- [ ] **Step 5: Commit**
```bash
git add internal/proxy/handler.go tests/integration/cache_flow_test.go internal/hubmodule/docker/hooks.go
git commit -m "feat: retry registry k8s manifest fallback"
```
### Task 4: Revalidate cached fallback entries against the effective upstream path
**Files:**
- Modify: `internal/proxy/handler.go`
- Modify: `tests/integration/cache_flow_test.go`
- [ ] **Step 1: Write the failing revalidation regression test**
```go
func TestRegistryK8sFallbackCacheRevalidatesEffectivePath(t *testing.T) {
// first GET: original path 404, fallback path 200, response cached
// second GET: cache hit path should revalidate against fallback path only
// assert original 404 path is not re-requested during revalidation
}
```
- [ ] **Step 2: Run test to verify it fails**
Run: `go test ./tests/integration -run TestRegistryK8sFallbackCacheRevalidatesEffectivePath -count=1`
Expected: FAIL because revalidation still targets the original client-facing path.
- [ ] **Step 3: Write minimal implementation**
```go
func effectiveRevalidateURL(route *server.HubRoute, c fiber.Ctx, entry cache.Entry, hook *hookState) *url.URL {
if entry.EffectiveUpstreamPath == "" {
return resolveUpstreamURL(route, route.UpstreamURL, c, hook)
}
clone := *route.UpstreamURL
clone.Path = entry.EffectiveUpstreamPath
clone.RawPath = entry.EffectiveUpstreamPath
return &clone
}
```
Implementation notes:
- use `Entry.EffectiveUpstreamPath` in both `isCacheFresh` and cached `HEAD` handling inside `serveCache`
- do not re-derive fallback during revalidation when metadata already specifies the effective path
- keep client-visible cache key unchanged
- [ ] **Step 4: Run test to verify it passes**
Run: `go test ./tests/integration -run TestRegistryK8sFallbackCacheRevalidatesEffectivePath -count=1`
Expected: PASS
- [ ] **Step 5: Commit**
```bash
git add internal/proxy/handler.go tests/integration/cache_flow_test.go
git commit -m "fix: revalidate registry k8s fallback cache entries"
```
### Task 5: Emit and verify fallback structured logging
**Files:**
- Modify: `internal/proxy/handler.go`
- Test: `internal/proxy/handler_test.go`
- [ ] **Step 1: Write the failing logging test**
```go
func TestRegistryK8sManifestFallbackLogsStructuredEvent(t *testing.T) {
// configure handler with a bytes.Buffer-backed logrus logger
// trigger fallback success for /v2/coredns/manifests/v1.13.1
// assert log output contains event name plus fields:
// hub, domain, upstream host, original path, fallback path, original status, method
}
```
- [ ] **Step 2: Run test to verify it fails**
Run: `go test ./internal/proxy -run TestRegistryK8sManifestFallbackLogsStructuredEvent -count=1`
Expected: FAIL because the fallback log event does not exist yet.
- [ ] **Step 3: Write minimal implementation**
```go
func (h *Handler) logRegistryK8sFallback(route *server.HubRoute, requestID string, originalPath string, fallbackPath string, originalStatus int, method string) {
fields := logging.RequestFields(
route.Config.Name,
route.Config.Domain,
route.Config.Type,
route.Config.AuthMode(),
route.Module.Key,
false,
)
fields["action"] = "proxy_fallback"
fields["upstream_host"] = route.UpstreamURL.Host
fields["original_path"] = originalPath
fields["fallback_path"] = fallbackPath
fields["original_status"] = originalStatus
fields["method"] = method
h.logger.WithFields(fields).Info("proxy_registry_k8s_fallback")
}
```
- [ ] **Step 4: Run test to verify it passes**
Run: `go test ./internal/proxy -run TestRegistryK8sManifestFallbackLogsStructuredEvent -count=1`
Expected: PASS
- [ ] **Step 5: Commit**
```bash
git add internal/proxy/handler.go internal/proxy/handler_test.go
git commit -m "test: cover registry k8s fallback logging"
```
### Task 6: Final verification
**Files:**
- Verify: `internal/hubmodule/docker/hooks.go`
- Verify: `internal/cache/store.go`
- Verify: `internal/cache/fs_store.go`
- Verify: `internal/proxy/handler.go`
- Verify: `tests/integration/cache_flow_test.go`
- [ ] **Step 1: Run focused package tests**
Run: `go test ./internal/hubmodule/docker ./internal/cache ./tests/integration -count=1`
Expected: PASS
- [ ] **Step 2: Run full test suite**
Run: `go test ./... -count=1`
Expected: PASS
- [ ] **Step 3: Inspect git diff**
Run: `git diff --stat`
Expected: only the planned files changed for this feature.
- [ ] **Step 4: Verify fallback logging is present**
Run: `go test ./internal/proxy -run TestRegistryK8sManifestFallbackLogsStructuredEvent -count=1`
Expected: PASS, and the test explicitly checks the structured fallback log event fields.
- [ ] **Step 5: Commit final polish if needed**
```bash
git add internal/hubmodule/docker/hooks.go internal/hubmodule/docker/hooks_test.go internal/cache/store.go internal/cache/fs_store.go internal/cache/store_test.go internal/proxy/handler.go internal/proxy/handler_test.go tests/integration/cache_flow_test.go
git commit -m "feat: add registry k8s manifest fallback compatibility"
```

View File

@@ -0,0 +1,181 @@
# registry.k8s.io coredns fallback design
## Goal
Add a narrow compatibility fallback for `registry.k8s.io` so requests like
`k8s.hub.ipao.vip/coredns:v1.13.1` can retry as
`k8s.hub.ipao.vip/coredns/coredns:v1.13.1` when the original manifest lookup is
not found upstream.
## Scope
This compatibility applies only when all of the following are true:
- hub module is `docker`
- upstream host is exactly `registry.k8s.io`
- requested repository name is a single segment such as `coredns`
- request targets `/manifests/<ref>`
- the first upstream request clearly fails as not found
Out of scope:
- changing behavior for Docker Hub
- changing behavior for other registries
- eagerly rewriting every single-segment repo under `registry.k8s.io`
- retrying more than one alternate repository form
- applying the initial compatibility behavior to blobs, tags, or referrers
## Recommended approach
Keep the current request path unchanged on the first attempt. After the normal
auth flow completes for that path, if the final first-attempt response from
`registry.k8s.io` is `404` for a single-segment manifest repository, perform
one internal retry against the derived path `/v2/<repo>/<repo>/...` and return
the retry result when it succeeds.
This keeps the existing behavior stable for paths that already work and limits
the compatibility behavior to the known `coredns -> coredns/coredns` style case.
## Alternatives considered
### 1. Always rewrite single-segment repos to `<repo>/<repo>`
Rejected because it would change successful existing paths and could break valid
single-segment repositories on `registry.k8s.io`.
### 2. Probe both paths before deciding
Rejected because it doubles upstream traffic and adds more complexity than the
known compatibility problem needs.
## Request flow
1. Receive docker request and run the existing normalization logic.
2. Send the first upstream request using the original normalized path.
3. Complete the existing auth retry behavior for that path first.
4. If the final response is successful, continue as today.
5. If the final response is `404`, check whether the request is eligible for
the `registry.k8s.io` fallback.
6. Close the first response body before retrying.
7. If eligible, derive a second path by transforming `/v2/<repo>/...` into
`/v2/<repo>/<repo>/...`.
8. Retry exactly once against the derived path, reusing the same method and the
same auth behavior used by the normal upstream flow.
9. If the retry succeeds, use that response and mark the cache entry with the
effective upstream path that produced the content.
10. If the retry also returns an upstream response, pass that second response
through unchanged.
11. If the retry fails with a transport error, return the existing proxy error
response path and do not continue retrying.
## Eligibility rules
The fallback should only trigger when all checks pass:
- upstream host matches `registry.k8s.io`
- request path parses as `/v2/<repo>/manifests/<ref>`
- repository name contains exactly one segment before `manifests`
- repository is not already `<repo>/<repo>`
- final response after normal auth handling is HTTP `404`
For the initial implementation, the trigger is status-based and limited to HTTP
`404`.
## Component changes
### `internal/hubmodule/docker/hooks.go`
- add helper logic to identify `registry.k8s.io`
- add helper logic to derive the duplicate-segment manifest path for eligible
requests
### `internal/proxy/handler.go`
- add a targeted retry path after the first upstream response is received and
before the response is finalized
- keep the retry count fixed at one alternate attempt
- complete the normal auth retry flow before evaluating fallback eligibility
- persist the effective upstream path when fallback succeeds so future cache
revalidation can target the path that actually produced the cached content
The retry should reuse the same auth, request method, response handling, and
streaming behavior as the normal upstream path.
### Cache metadata
- extend cached metadata for fallback-derived entries with an optional effective
upstream path field
- when a cached entry was populated through fallback, revalidate it against that
effective upstream path rather than the original client-facing path
- do not apply the fallback derivation again during revalidation if an effective
upstream path is already recorded
## Caching behavior
- first successful response remains cacheable under the locator path that served
the client request
- if fallback succeeds, cache the successful fallback response under the client
request path so the next identical client request can hit cache directly
- also persist the effective upstream path used to fill that cache entry
- do not cache the failed first attempt separately
This preserves the client-visible repository path while allowing the proxy to
internally source content from the alternate upstream location.
## Error handling
- if fallback path derivation is not possible, return the original response
- if the fallback request errors at the transport layer, log the retry attempt
and return the existing proxy error response path
- never loop between variants
## Logging
Add one targeted structured log event for visibility when the fallback is used,
including:
- hub name
- domain
- upstream host
- original path
- fallback path
- original status
- request method
This makes it easy to confirm that `registry.k8s.io` compatibility is active
without changing normal logging volume for unrelated registries.
## Tests
### Unit tests
Add docker hook tests for:
- recognizing `registry.k8s.io`
- deriving `/v2/coredns/manifests/v1.13.1` to
`/v2/coredns/coredns/manifests/v1.13.1`
- refusing fallback for already multi-segment repositories
- refusing fallback for non-`registry.k8s.io` hosts
- refusing fallback for non-manifest paths
### Integration tests
Add proxy tests for:
- original path returns `404`, fallback path returns `200` -> final client
response is `200`
- original path returns `200` -> fallback is not attempted
- non-`registry.k8s.io` upstream returns `404` -> fallback is not attempted
- fallback succeeds, second identical request is served from cache without
re-fetching the original `404` path
- fallback-derived cached entry revalidates against the recorded effective
upstream path instead of the original client-facing path
## Success criteria
- `k8s.hub.ipao.vip/coredns:v1.13.1` can succeed through fallback when the
upstream only exposes `coredns/coredns`
- existing Docker Hub behavior remains unchanged
- existing `registry.k8s.io` paths that already work continue to use the
original path first
- fallback is isolated to `registry.k8s.io`

View File

@@ -2,6 +2,7 @@ package cache
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
@@ -49,6 +50,10 @@ type entryLock struct {
refs int
}
type entryMetadata struct {
EffectiveUpstreamPath string `json:"effective_upstream_path,omitempty"`
}
func (s *fileStore) Get(ctx context.Context, locator Locator) (*ReadResult, error) {
select {
case <-ctx.Done():
@@ -86,6 +91,12 @@ func (s *fileStore) Get(ctx context.Context, locator Locator) (*ReadResult, erro
SizeBytes: info.Size(),
ModTime: info.ModTime(),
}
if metadata, err := s.readMetadata(filePath); err == nil {
entry.EffectiveUpstreamPath = metadata.EffectiveUpstreamPath
} else if !errors.Is(err, fs.ErrNotExist) {
file.Close()
return nil, err
}
return &ReadResult{Entry: entry, Reader: file}, nil
}
@@ -133,12 +144,16 @@ func (s *fileStore) Put(ctx context.Context, locator Locator, body io.Reader, op
if err := os.Chtimes(filePath, modTime, modTime); err != nil {
return nil, err
}
if err := s.writeMetadata(filePath, opts.EffectiveUpstreamPath); err != nil {
return nil, err
}
entry := Entry{
Locator: locator,
FilePath: filePath,
SizeBytes: written,
ModTime: modTime,
Locator: locator,
FilePath: filePath,
SizeBytes: written,
ModTime: modTime,
EffectiveUpstreamPath: opts.EffectiveUpstreamPath,
}
return &entry, nil
}
@@ -157,6 +172,54 @@ func (s *fileStore) Remove(ctx context.Context, locator Locator) error {
if err := os.Remove(filePath); err != nil && !errors.Is(err, fs.ErrNotExist) {
return err
}
if err := os.Remove(metadataPath(filePath)); err != nil && !errors.Is(err, fs.ErrNotExist) {
return err
}
return nil
}
func (s *fileStore) readMetadata(filePath string) (entryMetadata, error) {
raw, err := os.ReadFile(metadataPath(filePath))
if err != nil {
return entryMetadata{}, err
}
var metadata entryMetadata
if err := json.Unmarshal(raw, &metadata); err != nil {
return entryMetadata{}, err
}
return metadata, nil
}
func (s *fileStore) writeMetadata(filePath string, effectiveUpstreamPath string) error {
metaFilePath := metadataPath(filePath)
if effectiveUpstreamPath == "" {
if err := os.Remove(metaFilePath); err != nil && !errors.Is(err, fs.ErrNotExist) {
return err
}
return nil
}
data, err := json.Marshal(entryMetadata{EffectiveUpstreamPath: effectiveUpstreamPath})
if err != nil {
return err
}
tempFile, err := os.CreateTemp(filepath.Dir(metaFilePath), ".cache-meta-*")
if err != nil {
return err
}
tempName := tempFile.Name()
if _, err := tempFile.Write(data); err != nil {
tempFile.Close()
_ = os.Remove(tempName)
return err
}
if err := tempFile.Close(); err != nil {
_ = os.Remove(tempName)
return err
}
if err := os.Rename(tempName, metaFilePath); err != nil {
_ = os.Remove(tempName)
return err
}
return nil
}
@@ -248,3 +311,7 @@ func copyWithContext(ctx context.Context, dst io.Writer, src io.Reader) (int64,
func locatorKey(locator Locator) string {
return locator.HubName + "::" + locator.Path
}
func metadataPath(filePath string) string {
return filePath + ".meta"
}

View File

@@ -26,7 +26,8 @@ type Store interface {
// PutOptions 控制写入过程中的可选属性。
type PutOptions struct {
ModTime time.Time
ModTime time.Time
EffectiveUpstreamPath string
}
// Locator 唯一定位一个缓存条目Hub + 相对路径),所有路径均为 URL 路径风格。
@@ -37,10 +38,11 @@ type Locator struct {
// Entry 表示一次缓存命中结果,包含绝对文件路径及文件信息。
type Entry struct {
Locator Locator `json:"locator"`
FilePath string `json:"file_path"`
SizeBytes int64 `json:"size_bytes"`
ModTime time.Time
Locator Locator `json:"locator"`
FilePath string `json:"file_path"`
SizeBytes int64 `json:"size_bytes"`
ModTime time.Time `json:"mod_time"`
EffectiveUpstreamPath string `json:"effective_upstream_path,omitempty"`
}
// ReadResult 组合 Entry 与正文 Reader便于代理层直接将 Body 流式返回。

View File

@@ -5,6 +5,7 @@ import (
"context"
"io"
"os"
"strings"
"testing"
"time"
)
@@ -62,6 +63,28 @@ func TestStoreRemove(t *testing.T) {
}
}
func TestStorePersistsEffectiveUpstreamPath(t *testing.T) {
store := newTestStore(t)
locator := Locator{HubName: "docker", Path: "/v2/coredns/manifests/v1.13.1"}
_, err := store.Put(context.Background(), locator, strings.NewReader("body"), PutOptions{
EffectiveUpstreamPath: "/v2/coredns/coredns/manifests/v1.13.1",
})
if err != nil {
t.Fatalf("put error: %v", err)
}
result, err := store.Get(context.Background(), locator)
if err != nil {
t.Fatalf("get error: %v", err)
}
defer result.Reader.Close()
if result.Entry.EffectiveUpstreamPath != "/v2/coredns/coredns/manifests/v1.13.1" {
t.Fatalf("unexpected effective upstream path: %q", result.Entry.EffectiveUpstreamPath)
}
}
func TestStoreIgnoresDirectories(t *testing.T) {
store := newTestStore(t)
locator := Locator{HubName: "ghcr", Path: "/v2"}

View File

@@ -69,6 +69,28 @@ func isDockerHubHost(host string) bool {
}
}
func isRegistryK8sHost(host string) bool {
if parsedHost, _, err := net.SplitHostPort(host); err == nil {
host = parsedHost
}
return strings.EqualFold(host, "registry.k8s.io")
}
func manifestFallbackPath(ctx *hooks.RequestContext, clean string) (string, bool) {
if ctx == nil || !isRegistryK8sHost(ctx.UpstreamHost) {
return "", false
}
repo, rest, ok := splitDockerRepoPath(clean)
if !ok || strings.Contains(repo, "/") || !strings.HasPrefix(rest, "/manifests/") {
return "", false
}
return "/v2/" + repo + "/" + repo + rest, true
}
func RegistryK8sManifestFallbackPath(ctx *hooks.RequestContext, clean string) (string, bool) {
return manifestFallbackPath(ctx, clean)
}
func splitDockerRepoPath(path string) (string, string, bool) {
if !strings.HasPrefix(path, "/v2/") {
return "", "", false

View File

@@ -27,6 +27,44 @@ func TestNormalizePathSkipsLibraryForNonDockerHub(t *testing.T) {
}
}
func TestIsRegistryK8sHost(t *testing.T) {
if !isRegistryK8sHost("registry.k8s.io") {
t.Fatalf("expected registry.k8s.io to match")
}
if isRegistryK8sHost("example.com") {
t.Fatalf("expected non-registry.k8s.io host to be ignored")
}
}
func TestRegistryK8sManifestFallbackPath(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
path, ok := manifestFallbackPath(ctx, "/v2/coredns/manifests/v1.13.1")
if !ok || path != "/v2/coredns/coredns/manifests/v1.13.1" {
t.Fatalf("expected fallback path, got %q ok=%v", path, ok)
}
}
func TestRegistryK8sManifestFallbackPathRejectsMultiSegmentRepo(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/coredns/manifests/v1.13.1"); ok {
t.Fatalf("expected multi-segment repo to be ignored")
}
}
func TestRegistryK8sManifestFallbackPathRejectsNonManifest(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/blobs/sha256:deadbeef"); ok {
t.Fatalf("expected non-manifest path to be ignored")
}
}
func TestRegistryK8sManifestFallbackPathRejectsNonRegistryHost(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "mirror.gcr.io"}
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/manifests/v1.13.1"); ok {
t.Fatalf("expected non-registry.k8s.io host to be ignored")
}
}
func TestSplitDockerRepoPath(t *testing.T) {
repo, rest, ok := splitDockerRepoPath("/v2/library/nginx/manifests/latest")
if !ok || repo != "library/nginx" || rest != "/manifests/latest" {

View File

@@ -22,6 +22,7 @@ import (
"github.com/any-hub/any-hub/internal/cache"
"github.com/any-hub/any-hub/internal/hubmodule"
dockermodule "github.com/any-hub/any-hub/internal/hubmodule/docker"
"github.com/any-hub/any-hub/internal/logging"
"github.com/any-hub/any-hub/internal/proxy/hooks"
"github.com/any-hub/any-hub/internal/server"
@@ -218,7 +219,7 @@ func (h *Handler) serveCache(
}
if shouldRevalidate {
if resp, err := h.revalidateRequest(c, route, resolveUpstreamURL(route, route.UpstreamURL, c, hook), result.Entry.Locator, ""); err == nil {
if resp, err := h.revalidateRequest(c, route, effectiveRevalidateURL(route, c, result.Entry, hook), result.Entry.Locator, ""); err == nil {
resp.Body.Close()
}
}
@@ -259,6 +260,17 @@ func (h *Handler) fetchAndStream(
h.logResult(route, upstreamURL.String(), requestID, 0, false, started, err)
return h.writeError(c, fiber.StatusBadGateway, "upstream_failed")
}
effectiveUpstreamPath := ""
originalStatus := resp.StatusCode
originalPath := ""
if hook != nil {
originalPath = hook.clean
}
resp, upstreamURL, effectiveUpstreamPath, err = h.retryRegistryK8sManifestFallback(c, route, requestID, resp, upstreamURL, hook, originalStatus, originalPath)
if err != nil {
h.logResult(route, upstreamURL.String(), requestID, 0, false, started, err)
return h.writeError(c, fiber.StatusBadGateway, "upstream_failed")
}
if hook != nil && hook.hasHooks && hook.def.RewriteResponse != nil {
if rewritten, rewriteErr := applyHookRewrite(hook, resp, requestPath(c)); rewriteErr == nil {
resp = rewritten
@@ -273,7 +285,7 @@ func (h *Handler) fetchAndStream(
shouldStore := policy.allowStore && writer.Enabled() && isCacheableStatus(resp.StatusCode) &&
c.Method() == http.MethodGet
return h.consumeUpstream(c, route, locator, resp, shouldStore, writer, requestID, started, ctx)
return h.consumeUpstream(c, route, locator, resp, shouldStore, writer, requestID, started, ctx, effectiveUpstreamPath)
}
func applyHookRewrite(hook *hookState, resp *http.Response, path string) (*http.Response, error) {
@@ -325,13 +337,14 @@ func (h *Handler) consumeUpstream(
requestID string,
started time.Time,
ctx context.Context,
effectiveUpstreamPath string,
) error {
upstreamURL := resp.Request.URL.String()
method := c.Method()
authFailure := isAuthFailure(resp.StatusCode) && route.Config.HasCredentials()
if shouldStore {
return h.cacheAndStream(c, route, locator, resp, writer, requestID, started, ctx, upstreamURL)
return h.cacheAndStream(c, route, locator, resp, writer, requestID, started, ctx, upstreamURL, effectiveUpstreamPath)
}
copyResponseHeaders(c, resp.Header)
@@ -369,6 +382,7 @@ func (h *Handler) cacheAndStream(
started time.Time,
ctx context.Context,
upstreamURL string,
effectiveUpstreamPath string,
) error {
copyResponseHeaders(c, resp.Header)
c.Set("X-Any-Hub-Upstream", upstreamURL)
@@ -381,7 +395,7 @@ func (h *Handler) cacheAndStream(
// 使用 TeeReader 边向客户端回写边落盘,避免大文件在内存中完整缓冲。
reader := io.TeeReader(resp.Body, c.Response().BodyWriter())
opts := cache.PutOptions{ModTime: extractModTime(resp.Header)}
opts := cache.PutOptions{ModTime: extractModTime(resp.Header), EffectiveUpstreamPath: effectiveUpstreamPath}
entry, err := writer.Put(ctx, locator, reader, opts)
h.logResult(route, upstreamURL, requestID, resp.StatusCode, false, started, err)
if err != nil {
@@ -774,7 +788,7 @@ func (h *Handler) isCacheFresh(
ctx = context.Background()
}
upstreamURL := resolveUpstreamURL(route, route.UpstreamURL, c, hook)
upstreamURL := effectiveRevalidateURL(route, c, entry, hook)
resp, err := h.revalidateRequest(c, route, upstreamURL, locator, "")
if err != nil {
return false, err
@@ -825,6 +839,77 @@ func (h *Handler) isCacheFresh(
}
}
func effectiveRevalidateURL(route *server.HubRoute, c fiber.Ctx, entry cache.Entry, hook *hookState) *url.URL {
if route == nil || route.UpstreamURL == nil || entry.EffectiveUpstreamPath == "" {
return resolveUpstreamURL(route, route.UpstreamURL, c, hook)
}
clone := *route.UpstreamURL
clone.Path = entry.EffectiveUpstreamPath
clone.RawPath = entry.EffectiveUpstreamPath
return &clone
}
func (h *Handler) retryRegistryK8sManifestFallback(
c fiber.Ctx,
route *server.HubRoute,
requestID string,
resp *http.Response,
upstreamURL *url.URL,
hook *hookState,
originalStatus int,
originalPath string,
) (*http.Response, *url.URL, string, error) {
if resp == nil || resp.StatusCode != http.StatusNotFound || hook == nil || hook.ctx == nil {
return resp, upstreamURL, "", nil
}
fallbackPath, ok := dockermodule.RegistryK8sManifestFallbackPath(hook.ctx, hook.clean)
if !ok {
return resp, upstreamURL, "", nil
}
resp.Body.Close()
fallbackHook := *hook
fallbackHook.clean = fallbackPath
fallbackResp, fallbackURL, err := h.executeRequest(c, route, &fallbackHook)
if err != nil {
return nil, upstreamURL, "", err
}
fallbackResp, fallbackURL, err = h.retryOnAuthFailure(c, route, requestID, time.Now(), fallbackResp, fallbackURL, &fallbackHook)
if err != nil {
return nil, upstreamURL, "", err
}
h.logRegistryK8sFallback(route, requestID, originalPath, fallbackPath, originalStatus, c.Method())
if fallbackResp.StatusCode == http.StatusOK {
return fallbackResp, fallbackURL, fallbackPath, nil
}
return fallbackResp, fallbackURL, "", nil
}
func (h *Handler) logRegistryK8sFallback(route *server.HubRoute, requestID string, originalPath string, fallbackPath string, originalStatus int, method string) {
if route == nil || h == nil || h.logger == nil {
return
}
fields := logging.RequestFields(
route.Config.Name,
route.Config.Domain,
route.Config.Type,
route.Config.AuthMode(),
route.Module.Key,
false,
)
fields["action"] = "proxy_fallback"
if route.UpstreamURL != nil {
fields["upstream_host"] = route.UpstreamURL.Hostname()
}
fields["original_path"] = originalPath
fields["fallback_path"] = fallbackPath
fields["original_status"] = originalStatus
fields["method"] = method
if requestID != "" {
fields["request_id"] = requestID
}
h.logger.WithFields(fields).Info("proxy_registry_k8s_fallback")
}
func (h *Handler) revalidateRequest(
c fiber.Ctx,
route *server.HubRoute,

View File

@@ -0,0 +1,55 @@
package proxy
import (
"bytes"
"net/url"
"strings"
"testing"
"github.com/sirupsen/logrus"
"github.com/any-hub/any-hub/internal/config"
"github.com/any-hub/any-hub/internal/hubmodule"
"github.com/any-hub/any-hub/internal/server"
)
func TestRegistryK8sManifestFallbackLogsStructuredEvent(t *testing.T) {
buf := new(bytes.Buffer)
logger := logrus.New()
logger.SetOutput(buf)
logger.SetFormatter(&logrus.JSONFormatter{})
upstreamURL, err := url.Parse("http://registry.k8s.io")
if err != nil {
t.Fatalf("parse upstream url: %v", err)
}
h := NewHandler(nil, logger, nil)
route := &server.HubRoute{
Config: config.HubConfig{
Name: "docker",
Domain: "k8s.hub.local",
Type: "docker",
},
Module: hubmodule.ModuleMetadata{Key: "docker"},
UpstreamURL: upstreamURL,
}
h.logRegistryK8sFallback(route, "req-1", "/v2/coredns/manifests/v1.13.1", "/v2/coredns/coredns/manifests/v1.13.1", 404, "GET")
output := buf.String()
for _, want := range []string{
"proxy_registry_k8s_fallback",
`"hub":"docker"`,
`"domain":"k8s.hub.local"`,
`"upstream_host":"registry.k8s.io"`,
`"original_path":"/v2/coredns/manifests/v1.13.1"`,
`"fallback_path":"/v2/coredns/coredns/manifests/v1.13.1"`,
`"original_status":404`,
`"method":"GET"`,
} {
if !strings.Contains(output, want) {
t.Fatalf("expected log output to contain %s, got %s", want, output)
}
}
}

View File

@@ -26,6 +26,8 @@ import (
const (
dockerManifestPath = "/v2/library/cache-flow/manifests/latest"
dockerManifestNoNamespacePath = "/v2/cache-flow/manifests/latest"
registryK8sOriginalManifest = "/v2/coredns/manifests/v1.13.1"
registryK8sFallbackManifest = "/v2/coredns/coredns/manifests/v1.13.1"
)
func TestCacheFlowWithConditionalRequest(t *testing.T) {
@@ -286,6 +288,140 @@ func TestDockerNonDockerHubUpstreamKeepsOriginalPath(t *testing.T) {
}
}
func TestRegistryK8sManifestFallbackRetry(t *testing.T) {
stub := newRegistryK8sStub(t)
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
defer stub.Close()
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
req.Host = "k8s.hub.local"
resp, err := app.Test(req)
if err != nil {
t.Fatalf("app.Test failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != fiber.StatusOK {
body, _ := io.ReadAll(resp.Body)
t.Fatalf("expected 200, got %d (body=%s)", resp.StatusCode, string(body))
}
if stub.pathHits(registryK8sOriginalManifest, http.MethodGet) != 1 {
t.Fatalf("expected single original manifest hit")
}
if stub.pathHits(registryK8sFallbackManifest, http.MethodGet) != 1 {
t.Fatalf("expected single fallback manifest hit")
}
}
func TestRegistryK8sManifestFallbackNotAttemptedWhenOriginalSucceeds(t *testing.T) {
stub := newRegistryK8sStub(t)
stub.setResponse(registryK8sOriginalManifest, http.StatusOK, []byte("original manifest"))
defer stub.Close()
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
req.Host = "k8s.hub.local"
resp, err := app.Test(req)
if err != nil {
t.Fatalf("app.Test failed: %v", err)
}
resp.Body.Close()
if resp.StatusCode != fiber.StatusOK {
t.Fatalf("expected 200, got %d", resp.StatusCode)
}
if stub.pathHits(registryK8sFallbackManifest, http.MethodGet) != 0 {
t.Fatalf("expected no fallback hit")
}
}
func TestRegistryK8sManifestFallbackNotAttemptedForNonRegistryHost(t *testing.T) {
stub := newRegistryK8sStub(t)
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
defer stub.Close()
app := newDockerProxyTestApp(t, t.TempDir(), "mirror.hub.local", stub.URL)
req := httptest.NewRequest(http.MethodGet, "http://mirror.hub.local"+registryK8sOriginalManifest, nil)
req.Host = "mirror.hub.local"
resp, err := app.Test(req)
if err != nil {
t.Fatalf("app.Test failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != fiber.StatusNotFound {
body, _ := io.ReadAll(resp.Body)
t.Fatalf("expected 404, got %d (body=%s)", resp.StatusCode, string(body))
}
if stub.pathHits(registryK8sFallbackManifest, http.MethodGet) != 0 {
t.Fatalf("expected no fallback hit for non-registry host")
}
}
func TestRegistryK8sManifestFallbackSecondRequestHitsCache(t *testing.T) {
stub := newRegistryK8sStub(t)
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
defer stub.Close()
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
makeRequest := func() *http.Response {
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
req.Host = "k8s.hub.local"
resp, err := app.Test(req)
if err != nil {
t.Fatalf("app.Test failed: %v", err)
}
return resp
}
resp1 := makeRequest()
resp1.Body.Close()
resp2 := makeRequest()
defer resp2.Body.Close()
if resp2.Header.Get("X-Any-Hub-Cache-Hit") != "true" {
t.Fatalf("expected second request to hit cache")
}
if stub.pathHits(registryK8sOriginalManifest, http.MethodGet) != 1 {
t.Fatalf("expected original 404 path to be requested once, got %d", stub.pathHits(registryK8sOriginalManifest, http.MethodGet))
}
}
func TestRegistryK8sFallbackCacheRevalidatesEffectivePath(t *testing.T) {
stub := newRegistryK8sStub(t)
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
defer stub.Close()
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
makeRequest := func() *http.Response {
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
req.Host = "k8s.hub.local"
resp, err := app.Test(req)
if err != nil {
t.Fatalf("app.Test failed: %v", err)
}
return resp
}
resp1 := makeRequest()
resp1.Body.Close()
resp2 := makeRequest()
resp2.Body.Close()
if stub.pathHits(registryK8sOriginalManifest, http.MethodHead) != 0 {
t.Fatalf("expected no HEAD revalidation against original path, got %d", stub.pathHits(registryK8sOriginalManifest, http.MethodHead))
}
if stub.pathHits(registryK8sFallbackManifest, http.MethodHead) == 0 {
t.Fatalf("expected HEAD revalidation against fallback path")
}
}
type cacheFlowStub struct {
server *http.Server
listener net.Listener
@@ -300,6 +436,22 @@ type cacheFlowStub struct {
lastMod string
}
type registryK8sStub struct {
server *http.Server
listener net.Listener
URL string
mu sync.Mutex
responses map[string]registryK8sResponse
hits map[string]int
}
type registryK8sResponse struct {
status int
body []byte
etag string
}
func newCacheFlowStub(t *testing.T, paths ...string) *cacheFlowStub {
t.Helper()
stub := &cacheFlowStub{
@@ -335,6 +487,32 @@ func newCacheFlowStub(t *testing.T, paths ...string) *cacheFlowStub {
return stub
}
func newRegistryK8sStub(t *testing.T) *registryK8sStub {
t.Helper()
stub := &registryK8sStub{
responses: make(map[string]registryK8sResponse),
hits: make(map[string]int),
}
mux := http.NewServeMux()
mux.HandleFunc("/", stub.handle)
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Skipf("unable to start registry.k8s.io stub listener: %v", err)
}
server := &http.Server{Handler: mux}
stub.server = server
stub.listener = listener
stub.URL = "http://" + listener.Addr().String()
go func() {
_ = server.Serve(listener)
}()
return stub
}
func (s *cacheFlowStub) Close() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
@@ -346,6 +524,109 @@ func (s *cacheFlowStub) Close() {
}
}
func (s *registryK8sStub) Close() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if s.server != nil {
_ = s.server.Shutdown(ctx)
}
if s.listener != nil {
_ = s.listener.Close()
}
}
func (s *registryK8sStub) setResponse(path string, status int, body []byte) {
s.mu.Lock()
defer s.mu.Unlock()
s.responses[path] = registryK8sResponse{
status: status,
body: append([]byte(nil), body...),
etag: fmt.Sprintf(`"%s-etag"`, strings.ReplaceAll(strings.Trim(path, "/"), "/", "-")),
}
}
func (s *registryK8sStub) pathHits(path string, method string) int {
s.mu.Lock()
defer s.mu.Unlock()
return s.hits[method+" "+path]
}
func (s *registryK8sStub) handle(w http.ResponseWriter, r *http.Request) {
s.mu.Lock()
s.hits[r.Method+" "+r.URL.Path]++
resp, ok := s.responses[r.URL.Path]
s.mu.Unlock()
if !ok {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(`{"errors":[{"code":"MANIFEST_UNKNOWN","message":"manifest unknown"}]}`))
return
}
w.Header().Set("Content-Type", "application/vnd.docker.distribution.manifest.v2+json")
w.Header().Set("Etag", resp.etag)
w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat))
if r.Method == http.MethodHead {
for _, candidate := range r.Header.Values("If-None-Match") {
if strings.Trim(candidate, `"`) == strings.Trim(resp.etag, `"`) {
w.WriteHeader(http.StatusNotModified)
return
}
}
w.WriteHeader(resp.status)
return
}
w.WriteHeader(resp.status)
_, _ = w.Write(resp.body)
}
func newDockerProxyTestApp(t *testing.T, storageDir string, domain string, upstream string, proxyURLs ...string) *fiber.App {
t.Helper()
proxyURL := ""
if len(proxyURLs) > 0 {
proxyURL = proxyURLs[0]
}
cfg := &config.Config{
Global: config.GlobalConfig{
ListenPort: 5000,
CacheTTL: config.Duration(30 * time.Second),
StoragePath: storageDir,
},
Hubs: []config.HubConfig{
{
Name: "docker",
Domain: domain,
Type: "docker",
Upstream: upstream,
Proxy: proxyURL,
},
},
}
registry, err := server.NewHubRegistry(cfg)
if err != nil {
t.Fatalf("registry error: %v", err)
}
logger := logrus.New()
logger.SetOutput(io.Discard)
store, err := cache.NewStore(storageDir)
if err != nil {
t.Fatalf("store error: %v", err)
}
client := server.NewUpstreamClient(cfg)
handler := proxy.NewHandler(client, logger, store)
app, err := server.NewApp(server.AppOptions{
Logger: logger,
Registry: registry,
Proxy: handler,
ListenPort: 5000,
})
if err != nil {
t.Fatalf("app error: %v", err)
}
return app
}
func (s *cacheFlowStub) handle(w http.ResponseWriter, r *http.Request) {
s.mu.Lock()
etag := s.etag

View File

@@ -30,6 +30,7 @@ type upstreamStub struct {
requests []RecordedRequest
mode upstreamMode
blobBytes []byte
lastMod string
}
// RecordedRequest 捕获每次请求的方法/路径/Host/Headers便于断言代理行为。
@@ -48,13 +49,14 @@ func newUpstreamStub(t *testing.T, mode upstreamMode) *upstreamStub {
stub := &upstreamStub{
mode: mode,
blobBytes: []byte("stub-layer-payload"),
lastMod: time.Now().UTC().Format(http.TimeFormat),
}
switch mode {
case upstreamDocker:
registerDockerHandlers(mux, stub.blobBytes)
case upstreamNPM:
registerNPMHandlers(mux)
registerNPMHandlers(mux, stub)
default:
t.Fatalf("unsupported stub mode: %s", mode)
}
@@ -153,10 +155,10 @@ func registerDockerHandlers(mux *http.ServeMux, blob []byte) {
})
}
func registerNPMHandlers(mux *http.ServeMux) {
func registerNPMHandlers(mux *http.ServeMux, stub *upstreamStub) {
mux.HandleFunc("/lodash", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat))
w.Header().Set("Last-Modified", stub.lastMod)
resp := map[string]any{
"name": "lodash",
"dist-tags": map[string]string{
@@ -175,7 +177,7 @@ func registerNPMHandlers(mux *http.ServeMux) {
mux.HandleFunc("/lodash/-/lodash-4.17.21.tgz", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat))
w.Header().Set("Last-Modified", stub.lastMod)
_, _ = w.Write([]byte("tarball-bytes"))
})
}
@@ -232,6 +234,37 @@ func TestDockerStubServesManifestAndBlob(t *testing.T) {
}
}
func TestNPMStubKeepsStableValidatorsAcrossHeadRequests(t *testing.T) {
stub := newUpstreamStub(t, upstreamNPM)
defer stub.Close()
req1, err := http.NewRequest(http.MethodHead, stub.URL+"/lodash", nil)
if err != nil {
t.Fatalf("build first HEAD request: %v", err)
}
resp1, err := http.DefaultClient.Do(req1)
if err != nil {
t.Fatalf("first HEAD failed: %v", err)
}
resp1.Body.Close()
time.Sleep(1100 * time.Millisecond)
req2, err := http.NewRequest(http.MethodHead, stub.URL+"/lodash", nil)
if err != nil {
t.Fatalf("build second HEAD request: %v", err)
}
resp2, err := http.DefaultClient.Do(req2)
if err != nil {
t.Fatalf("second HEAD failed: %v", err)
}
resp2.Body.Close()
if resp1.Header.Get("Last-Modified") != resp2.Header.Get("Last-Modified") {
t.Fatalf("expected stable Last-Modified, got %q then %q", resp1.Header.Get("Last-Modified"), resp2.Header.Get("Last-Modified"))
}
}
func TestNPMStubServesMetadataAndTarball(t *testing.T) {
stub := newUpstreamStub(t, upstreamNPM)
defer stub.Close()