8 Commits

Author SHA1 Message Date
47c58ecb69 docs: add registry k8s fallback design and plan
Some checks failed
docker-release / build-and-push (push) Failing after 9m51s
2026-03-23 17:44:18 +08:00
271c8fe538 test: stabilize npm upstream validators 2026-03-23 16:59:36 +08:00
442c27a26d feat: add registry k8s manifest fallback 2026-03-23 16:55:20 +08:00
9c85b9b097 feat: persist cache effective upstream path 2026-03-23 16:24:39 +08:00
7c95ee0210 test: cover registry k8s fallback path derivation 2026-03-23 16:22:28 +08:00
05d5037e97 chore: ignore local worktrees 2026-03-23 16:20:15 +08:00
bbc7204901 fix: limit docker library rewrite to Docker Hub
Some checks failed
docker-release / build-and-push (push) Failing after 12m34s
2026-03-23 15:42:45 +08:00
40c6f2fcce fix: align cache controls with config
Remove unused head-check config, make TTL overrides explicit, and tighten revalidation to avoid stale cache behavior.
2026-01-26 15:55:03 +08:00
22 changed files with 1293 additions and 56 deletions

1
.gitignore vendored
View File

@@ -20,6 +20,7 @@ Thumbs.db
.vscode/
.idea/
.cache/
.worktrees/
storage/
logs/
tmp/

View File

@@ -21,7 +21,6 @@ Type = "docker"
Username = ""
Password = ""
CacheTTL = 43200
EnableHeadCheck = true
[[Hub]]
Name = "composer-cache"

View File

@@ -19,4 +19,3 @@ Module = "legacy"
Username = ""
Password = ""
CacheTTL = 43200
EnableHeadCheck = true

View File

@@ -19,4 +19,3 @@ Module = "legacy"
Username = ""
Password = ""
CacheTTL = 43200
EnableHeadCheck = false

View File

@@ -0,0 +1,414 @@
# registry.k8s.io Compatibility Fallback Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Add a `registry.k8s.io`-only manifest fallback so single-segment repos like `coredns` retry once as `coredns/coredns` after a final first-attempt `404`.
**Architecture:** Keep the first upstream request unchanged, then add one targeted retry in the proxy after the normal auth flow completes. Persist the effective upstream path for fallback-filled cache entries so later cache revalidation uses the path that actually produced the content instead of rechecking the original `404` path.
**Tech Stack:** Go 1.25, Fiber v3, internal docker hooks, internal proxy handler, filesystem cache store, Go tests
---
## File map
- Modify: `internal/hubmodule/docker/hooks.go`
- add `registry.k8s.io` host detection and manifest fallback path derivation helpers
- Modify: `internal/hubmodule/docker/hooks_test.go`
- add unit tests for host gating and manifest fallback derivation rules
- Modify: `internal/cache/store.go`
- extend cache entry/options to carry optional effective upstream path metadata
- Modify: `internal/cache/fs_store.go`
- persist and load cache metadata for effective upstream path
- Modify: `internal/cache/store_test.go`
- verify metadata round-trip behavior and remove behavior
- Modify: `internal/proxy/handler.go`
- Modify: `internal/proxy/handler_test.go`
- add one-shot `registry.k8s.io` fallback retry after auth retry flow
- write/read effective upstream path metadata for fallback-backed cache entries
- use effective upstream path during revalidation
- Modify: `tests/integration/cache_flow_test.go`
- add integration coverage for fallback success, no-fallback success, and cache/revalidation behavior
### Task 1: Add docker fallback derivation helpers
**Files:**
- Modify: `internal/hubmodule/docker/hooks.go`
- Test: `internal/hubmodule/docker/hooks_test.go`
- [ ] **Step 1: Write the failing unit tests**
```go
func TestIsRegistryK8sHost(t *testing.T) {
if !isRegistryK8sHost("registry.k8s.io") {
t.Fatalf("expected registry.k8s.io to match")
}
if isRegistryK8sHost("example.com") {
t.Fatalf("expected non-registry.k8s.io host to be ignored")
}
}
func TestRegistryK8sManifestFallbackPath(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
path, ok := manifestFallbackPath(ctx, "/v2/coredns/manifests/v1.13.1")
if !ok || path != "/v2/coredns/coredns/manifests/v1.13.1" {
t.Fatalf("expected fallback path, got %q ok=%v", path, ok)
}
}
func TestRegistryK8sManifestFallbackPathRejectsMultiSegmentRepo(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/coredns/manifests/v1.13.1"); ok {
t.Fatalf("expected multi-segment repo to be ignored")
}
}
func TestRegistryK8sManifestFallbackPathRejectsNonManifest(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/blobs/sha256:deadbeef"); ok {
t.Fatalf("expected non-manifest path to be ignored")
}
}
func TestRegistryK8sManifestFallbackPathRejectsNonRegistryHost(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "mirror.gcr.io"}
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/manifests/v1.13.1"); ok {
t.Fatalf("expected non-registry.k8s.io host to be ignored")
}
}
```
- [ ] **Step 2: Run test to verify it fails**
Run: `go test ./internal/hubmodule/docker -run 'TestIsRegistryK8sHost|TestRegistryK8sManifestFallbackPath' -count=1`
Expected: FAIL because `isRegistryK8sHost` and `manifestFallbackPath` do not exist yet.
- [ ] **Step 3: Write minimal implementation**
```go
func isRegistryK8sHost(host string) bool {
if parsedHost, _, err := net.SplitHostPort(host); err == nil {
host = parsedHost
}
return strings.EqualFold(host, "registry.k8s.io")
}
func manifestFallbackPath(ctx *hooks.RequestContext, clean string) (string, bool) {
if ctx == nil || !isRegistryK8sHost(ctx.UpstreamHost) {
return "", false
}
repo, rest, ok := splitDockerRepoPath(clean)
if !ok || strings.Count(repo, "/") != 0 || !strings.HasPrefix(rest, "/manifests/") {
return "", false
}
return "/v2/" + repo + "/" + repo + rest, true
}
```
- [ ] **Step 4: Run test to verify it passes**
Run: `go test ./internal/hubmodule/docker -count=1`
Expected: PASS
- [ ] **Step 5: Commit**
```bash
git add internal/hubmodule/docker/hooks.go internal/hubmodule/docker/hooks_test.go
git commit -m "test: cover registry k8s fallback path derivation"
```
### Task 2: Persist effective upstream path in cache metadata
**Files:**
- Modify: `internal/cache/store.go`
- Modify: `internal/cache/fs_store.go`
- Test: `internal/cache/store_test.go`
- [ ] **Step 1: Write the failing cache metadata test**
```go
func TestStorePersistsEffectiveUpstreamPath(t *testing.T) {
store, err := NewStore(t.TempDir())
if err != nil {
t.Fatalf("NewStore: %v", err)
}
loc := Locator{HubName: "docker", Path: "/v2/coredns/manifests/v1.13.1"}
_, err = store.Put(context.Background(), loc, strings.NewReader("body"), PutOptions{
EffectiveUpstreamPath: "/v2/coredns/coredns/manifests/v1.13.1",
})
if err != nil {
t.Fatalf("Put: %v", err)
}
got, err := store.Get(context.Background(), loc)
if err != nil {
t.Fatalf("Get: %v", err)
}
if got.Entry.EffectiveUpstreamPath != "/v2/coredns/coredns/manifests/v1.13.1" {
t.Fatalf("unexpected effective path: %q", got.Entry.EffectiveUpstreamPath)
}
}
```
- [ ] **Step 2: Run test to verify it fails**
Run: `go test ./internal/cache -run TestStorePersistsEffectiveUpstreamPath -count=1`
Expected: FAIL because cache entry metadata does not yet include effective upstream path.
- [ ] **Step 3: Write minimal implementation**
```go
type PutOptions struct {
ModTime time.Time
EffectiveUpstreamPath string
}
type Entry struct {
Locator Locator `json:"locator"`
FilePath string `json:"file_path"`
SizeBytes int64 `json:"size_bytes"`
ModTime time.Time `json:"mod_time"`
EffectiveUpstreamPath string `json:"effective_upstream_path,omitempty"`
}
```
Implementation notes:
- store metadata next to the cached body as a small JSON file such as `<entry>.meta`
- on `Get`, load metadata if present and populate `Entry.EffectiveUpstreamPath`
- on `Put`, write metadata atomically only when `EffectiveUpstreamPath` is non-empty
- on `Remove`, delete both body and metadata files
- [ ] **Step 4: Run test to verify it passes**
Run: `go test ./internal/cache -count=1`
Expected: PASS
- [ ] **Step 5: Commit**
```bash
git add internal/cache/store.go internal/cache/fs_store.go internal/cache/store_test.go
git commit -m "feat: persist cache effective upstream path"
```
### Task 3: Add one-shot fallback retry in proxy fetch flow
**Files:**
- Modify: `internal/proxy/handler.go`
- Modify: `internal/hubmodule/docker/hooks.go`
- Test: `tests/integration/cache_flow_test.go`
- [ ] **Step 1: Write the failing integration test for fallback success**
```go
func TestRegistryK8sManifestFallbackRetry(t *testing.T) {
// stub returns 404 for /v2/coredns/manifests/v1.13.1
// stub returns 200 for /v2/coredns/coredns/manifests/v1.13.1
// request /v2/coredns/manifests/v1.13.1 through proxy
// expect 200 and exactly two upstream hits
}
func TestRegistryK8sManifestFallbackNotAttemptedWhenOriginalSucceeds(t *testing.T) {
// stub returns 200 for original path
// request original path
// expect 200 and only one upstream hit
}
func TestRegistryK8sManifestFallbackNotAttemptedForNonRegistryHost(t *testing.T) {
// non-registry.k8s.io upstream returns 404 for original path
// request original path
// expect final 404 and no derived path hit
}
func TestRegistryK8sManifestFallbackSecondRequestHitsCache(t *testing.T) {
// first GET falls back and succeeds
// second identical GET should be cache hit
// expect no new hit to original 404 path on second request
}
```
- [ ] **Step 2: Run test to verify it fails**
Run: `go test ./tests/integration -run 'TestRegistryK8sManifestFallbackRetry|TestRegistryK8sManifestFallbackNotAttemptedWhenOriginalSucceeds|TestRegistryK8sManifestFallbackNotAttemptedForNonRegistryHost|TestRegistryK8sManifestFallbackSecondRequestHitsCache' -count=1`
Expected: FAIL because the fallback tests were written before the retry logic exists.
- [ ] **Step 3: Write minimal implementation**
```go
resp, upstreamURL, err := h.executeRequest(c, route, hook)
resp, upstreamURL, err = h.retryOnAuthFailure(c, route, requestID, started, resp, upstreamURL, hook)
if resp.StatusCode == http.StatusNotFound {
if fallbackHook, fallbackURL, ok := h.registryK8sFallbackAttempt(c, route, hook); ok {
resp.Body.Close()
resp, upstreamURL, err = h.executeRequest(c, route, fallbackHook)
if err == nil {
resp, upstreamURL, err = h.retryOnAuthFailure(c, route, requestID, started, resp, upstreamURL, fallbackHook)
}
fallbackEffectivePath = fallbackURL.Path
}
}
```
Implementation notes:
- in `internal/hubmodule/docker/hooks.go`, keep only pure path helpers such as `isRegistryK8sHost` and `manifestFallbackPath`
- in `internal/proxy/handler.go`, add the retry orchestration helper that clones the current hook state with the derived fallback path when `manifestFallbackPath` returns true
- only evaluate fallback after `retryOnAuthFailure` completes and only for final `404`
- close the first response body before the retry
- if fallback succeeds with `200`, pass `EffectiveUpstreamPath` into cache `PutOptions`
- if fallback returns non-`200`, pass that second upstream response through unchanged
- emit a structured fallback log event from `internal/proxy/handler.go` with hub name, domain, upstream host, original path, fallback path, original status, and request method
- [ ] **Step 4: Run test to verify it passes**
Run: `go test ./tests/integration -run 'TestRegistryK8sManifestFallbackRetry|TestRegistryK8sManifestFallbackNotAttemptedWhenOriginalSucceeds|TestRegistryK8sManifestFallbackNotAttemptedForNonRegistryHost|TestRegistryK8sManifestFallbackSecondRequestHitsCache' -count=1`
Expected: PASS
- [ ] **Step 5: Commit**
```bash
git add internal/proxy/handler.go tests/integration/cache_flow_test.go internal/hubmodule/docker/hooks.go
git commit -m "feat: retry registry k8s manifest fallback"
```
### Task 4: Revalidate cached fallback entries against the effective upstream path
**Files:**
- Modify: `internal/proxy/handler.go`
- Modify: `tests/integration/cache_flow_test.go`
- [ ] **Step 1: Write the failing revalidation regression test**
```go
func TestRegistryK8sFallbackCacheRevalidatesEffectivePath(t *testing.T) {
// first GET: original path 404, fallback path 200, response cached
// second GET: cache hit path should revalidate against fallback path only
// assert original 404 path is not re-requested during revalidation
}
```
- [ ] **Step 2: Run test to verify it fails**
Run: `go test ./tests/integration -run TestRegistryK8sFallbackCacheRevalidatesEffectivePath -count=1`
Expected: FAIL because revalidation still targets the original client-facing path.
- [ ] **Step 3: Write minimal implementation**
```go
func effectiveRevalidateURL(route *server.HubRoute, c fiber.Ctx, entry cache.Entry, hook *hookState) *url.URL {
if entry.EffectiveUpstreamPath == "" {
return resolveUpstreamURL(route, route.UpstreamURL, c, hook)
}
clone := *route.UpstreamURL
clone.Path = entry.EffectiveUpstreamPath
clone.RawPath = entry.EffectiveUpstreamPath
return &clone
}
```
Implementation notes:
- use `Entry.EffectiveUpstreamPath` in both `isCacheFresh` and cached `HEAD` handling inside `serveCache`
- do not re-derive fallback during revalidation when metadata already specifies the effective path
- keep client-visible cache key unchanged
- [ ] **Step 4: Run test to verify it passes**
Run: `go test ./tests/integration -run TestRegistryK8sFallbackCacheRevalidatesEffectivePath -count=1`
Expected: PASS
- [ ] **Step 5: Commit**
```bash
git add internal/proxy/handler.go tests/integration/cache_flow_test.go
git commit -m "fix: revalidate registry k8s fallback cache entries"
```
### Task 5: Emit and verify fallback structured logging
**Files:**
- Modify: `internal/proxy/handler.go`
- Test: `internal/proxy/handler_test.go`
- [ ] **Step 1: Write the failing logging test**
```go
func TestRegistryK8sManifestFallbackLogsStructuredEvent(t *testing.T) {
// configure handler with a bytes.Buffer-backed logrus logger
// trigger fallback success for /v2/coredns/manifests/v1.13.1
// assert log output contains event name plus fields:
// hub, domain, upstream host, original path, fallback path, original status, method
}
```
- [ ] **Step 2: Run test to verify it fails**
Run: `go test ./internal/proxy -run TestRegistryK8sManifestFallbackLogsStructuredEvent -count=1`
Expected: FAIL because the fallback log event does not exist yet.
- [ ] **Step 3: Write minimal implementation**
```go
func (h *Handler) logRegistryK8sFallback(route *server.HubRoute, requestID string, originalPath string, fallbackPath string, originalStatus int, method string) {
fields := logging.RequestFields(
route.Config.Name,
route.Config.Domain,
route.Config.Type,
route.Config.AuthMode(),
route.Module.Key,
false,
)
fields["action"] = "proxy_fallback"
fields["upstream_host"] = route.UpstreamURL.Host
fields["original_path"] = originalPath
fields["fallback_path"] = fallbackPath
fields["original_status"] = originalStatus
fields["method"] = method
h.logger.WithFields(fields).Info("proxy_registry_k8s_fallback")
}
```
- [ ] **Step 4: Run test to verify it passes**
Run: `go test ./internal/proxy -run TestRegistryK8sManifestFallbackLogsStructuredEvent -count=1`
Expected: PASS
- [ ] **Step 5: Commit**
```bash
git add internal/proxy/handler.go internal/proxy/handler_test.go
git commit -m "test: cover registry k8s fallback logging"
```
### Task 6: Final verification
**Files:**
- Verify: `internal/hubmodule/docker/hooks.go`
- Verify: `internal/cache/store.go`
- Verify: `internal/cache/fs_store.go`
- Verify: `internal/proxy/handler.go`
- Verify: `tests/integration/cache_flow_test.go`
- [ ] **Step 1: Run focused package tests**
Run: `go test ./internal/hubmodule/docker ./internal/cache ./tests/integration -count=1`
Expected: PASS
- [ ] **Step 2: Run full test suite**
Run: `go test ./... -count=1`
Expected: PASS
- [ ] **Step 3: Inspect git diff**
Run: `git diff --stat`
Expected: only the planned files changed for this feature.
- [ ] **Step 4: Verify fallback logging is present**
Run: `go test ./internal/proxy -run TestRegistryK8sManifestFallbackLogsStructuredEvent -count=1`
Expected: PASS, and the test explicitly checks the structured fallback log event fields.
- [ ] **Step 5: Commit final polish if needed**
```bash
git add internal/hubmodule/docker/hooks.go internal/hubmodule/docker/hooks_test.go internal/cache/store.go internal/cache/fs_store.go internal/cache/store_test.go internal/proxy/handler.go internal/proxy/handler_test.go tests/integration/cache_flow_test.go
git commit -m "feat: add registry k8s manifest fallback compatibility"
```

View File

@@ -0,0 +1,181 @@
# registry.k8s.io coredns fallback design
## Goal
Add a narrow compatibility fallback for `registry.k8s.io` so requests like
`k8s.hub.ipao.vip/coredns:v1.13.1` can retry as
`k8s.hub.ipao.vip/coredns/coredns:v1.13.1` when the original manifest lookup is
not found upstream.
## Scope
This compatibility applies only when all of the following are true:
- hub module is `docker`
- upstream host is exactly `registry.k8s.io`
- requested repository name is a single segment such as `coredns`
- request targets `/manifests/<ref>`
- the first upstream request clearly fails as not found
Out of scope:
- changing behavior for Docker Hub
- changing behavior for other registries
- eagerly rewriting every single-segment repo under `registry.k8s.io`
- retrying more than one alternate repository form
- applying the initial compatibility behavior to blobs, tags, or referrers
## Recommended approach
Keep the current request path unchanged on the first attempt. After the normal
auth flow completes for that path, if the final first-attempt response from
`registry.k8s.io` is `404` for a single-segment manifest repository, perform
one internal retry against the derived path `/v2/<repo>/<repo>/...` and return
the retry result when it succeeds.
This keeps the existing behavior stable for paths that already work and limits
the compatibility behavior to the known `coredns -> coredns/coredns` style case.
## Alternatives considered
### 1. Always rewrite single-segment repos to `<repo>/<repo>`
Rejected because it would change successful existing paths and could break valid
single-segment repositories on `registry.k8s.io`.
### 2. Probe both paths before deciding
Rejected because it doubles upstream traffic and adds more complexity than the
known compatibility problem needs.
## Request flow
1. Receive docker request and run the existing normalization logic.
2. Send the first upstream request using the original normalized path.
3. Complete the existing auth retry behavior for that path first.
4. If the final response is successful, continue as today.
5. If the final response is `404`, check whether the request is eligible for
the `registry.k8s.io` fallback.
6. Close the first response body before retrying.
7. If eligible, derive a second path by transforming `/v2/<repo>/...` into
`/v2/<repo>/<repo>/...`.
8. Retry exactly once against the derived path, reusing the same method and the
same auth behavior used by the normal upstream flow.
9. If the retry succeeds, use that response and mark the cache entry with the
effective upstream path that produced the content.
10. If the retry also returns an upstream response, pass that second response
through unchanged.
11. If the retry fails with a transport error, return the existing proxy error
response path and do not continue retrying.
## Eligibility rules
The fallback should only trigger when all checks pass:
- upstream host matches `registry.k8s.io`
- request path parses as `/v2/<repo>/manifests/<ref>`
- repository name contains exactly one segment before `manifests`
- repository is not already `<repo>/<repo>`
- final response after normal auth handling is HTTP `404`
For the initial implementation, the trigger is status-based and limited to HTTP
`404`.
## Component changes
### `internal/hubmodule/docker/hooks.go`
- add helper logic to identify `registry.k8s.io`
- add helper logic to derive the duplicate-segment manifest path for eligible
requests
### `internal/proxy/handler.go`
- add a targeted retry path after the first upstream response is received and
before the response is finalized
- keep the retry count fixed at one alternate attempt
- complete the normal auth retry flow before evaluating fallback eligibility
- persist the effective upstream path when fallback succeeds so future cache
revalidation can target the path that actually produced the cached content
The retry should reuse the same auth, request method, response handling, and
streaming behavior as the normal upstream path.
### Cache metadata
- extend cached metadata for fallback-derived entries with an optional effective
upstream path field
- when a cached entry was populated through fallback, revalidate it against that
effective upstream path rather than the original client-facing path
- do not apply the fallback derivation again during revalidation if an effective
upstream path is already recorded
## Caching behavior
- first successful response remains cacheable under the locator path that served
the client request
- if fallback succeeds, cache the successful fallback response under the client
request path so the next identical client request can hit cache directly
- also persist the effective upstream path used to fill that cache entry
- do not cache the failed first attempt separately
This preserves the client-visible repository path while allowing the proxy to
internally source content from the alternate upstream location.
## Error handling
- if fallback path derivation is not possible, return the original response
- if the fallback request errors at the transport layer, log the retry attempt
and return the existing proxy error response path
- never loop between variants
## Logging
Add one targeted structured log event for visibility when the fallback is used,
including:
- hub name
- domain
- upstream host
- original path
- fallback path
- original status
- request method
This makes it easy to confirm that `registry.k8s.io` compatibility is active
without changing normal logging volume for unrelated registries.
## Tests
### Unit tests
Add docker hook tests for:
- recognizing `registry.k8s.io`
- deriving `/v2/coredns/manifests/v1.13.1` to
`/v2/coredns/coredns/manifests/v1.13.1`
- refusing fallback for already multi-segment repositories
- refusing fallback for non-`registry.k8s.io` hosts
- refusing fallback for non-manifest paths
### Integration tests
Add proxy tests for:
- original path returns `404`, fallback path returns `200` -> final client
response is `200`
- original path returns `200` -> fallback is not attempted
- non-`registry.k8s.io` upstream returns `404` -> fallback is not attempted
- fallback succeeds, second identical request is served from cache without
re-fetching the original `404` path
- fallback-derived cached entry revalidates against the recorded effective
upstream path instead of the original client-facing path
## Success criteria
- `k8s.hub.ipao.vip/coredns:v1.13.1` can succeed through fallback when the
upstream only exposes `coredns/coredns`
- existing Docker Hub behavior remains unchanged
- existing `registry.k8s.io` paths that already work continue to use the
original path first
- fallback is isolated to `registry.k8s.io`

View File

@@ -2,6 +2,7 @@ package cache
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
@@ -49,6 +50,10 @@ type entryLock struct {
refs int
}
type entryMetadata struct {
EffectiveUpstreamPath string `json:"effective_upstream_path,omitempty"`
}
func (s *fileStore) Get(ctx context.Context, locator Locator) (*ReadResult, error) {
select {
case <-ctx.Done():
@@ -86,6 +91,12 @@ func (s *fileStore) Get(ctx context.Context, locator Locator) (*ReadResult, erro
SizeBytes: info.Size(),
ModTime: info.ModTime(),
}
if metadata, err := s.readMetadata(filePath); err == nil {
entry.EffectiveUpstreamPath = metadata.EffectiveUpstreamPath
} else if !errors.Is(err, fs.ErrNotExist) {
file.Close()
return nil, err
}
return &ReadResult{Entry: entry, Reader: file}, nil
}
@@ -133,12 +144,16 @@ func (s *fileStore) Put(ctx context.Context, locator Locator, body io.Reader, op
if err := os.Chtimes(filePath, modTime, modTime); err != nil {
return nil, err
}
if err := s.writeMetadata(filePath, opts.EffectiveUpstreamPath); err != nil {
return nil, err
}
entry := Entry{
Locator: locator,
FilePath: filePath,
SizeBytes: written,
ModTime: modTime,
Locator: locator,
FilePath: filePath,
SizeBytes: written,
ModTime: modTime,
EffectiveUpstreamPath: opts.EffectiveUpstreamPath,
}
return &entry, nil
}
@@ -157,6 +172,54 @@ func (s *fileStore) Remove(ctx context.Context, locator Locator) error {
if err := os.Remove(filePath); err != nil && !errors.Is(err, fs.ErrNotExist) {
return err
}
if err := os.Remove(metadataPath(filePath)); err != nil && !errors.Is(err, fs.ErrNotExist) {
return err
}
return nil
}
func (s *fileStore) readMetadata(filePath string) (entryMetadata, error) {
raw, err := os.ReadFile(metadataPath(filePath))
if err != nil {
return entryMetadata{}, err
}
var metadata entryMetadata
if err := json.Unmarshal(raw, &metadata); err != nil {
return entryMetadata{}, err
}
return metadata, nil
}
func (s *fileStore) writeMetadata(filePath string, effectiveUpstreamPath string) error {
metaFilePath := metadataPath(filePath)
if effectiveUpstreamPath == "" {
if err := os.Remove(metaFilePath); err != nil && !errors.Is(err, fs.ErrNotExist) {
return err
}
return nil
}
data, err := json.Marshal(entryMetadata{EffectiveUpstreamPath: effectiveUpstreamPath})
if err != nil {
return err
}
tempFile, err := os.CreateTemp(filepath.Dir(metaFilePath), ".cache-meta-*")
if err != nil {
return err
}
tempName := tempFile.Name()
if _, err := tempFile.Write(data); err != nil {
tempFile.Close()
_ = os.Remove(tempName)
return err
}
if err := tempFile.Close(); err != nil {
_ = os.Remove(tempName)
return err
}
if err := os.Rename(tempName, metaFilePath); err != nil {
_ = os.Remove(tempName)
return err
}
return nil
}
@@ -248,3 +311,7 @@ func copyWithContext(ctx context.Context, dst io.Writer, src io.Reader) (int64,
func locatorKey(locator Locator) string {
return locator.HubName + "::" + locator.Path
}
func metadataPath(filePath string) string {
return filePath + ".meta"
}

View File

@@ -26,7 +26,8 @@ type Store interface {
// PutOptions 控制写入过程中的可选属性。
type PutOptions struct {
ModTime time.Time
ModTime time.Time
EffectiveUpstreamPath string
}
// Locator 唯一定位一个缓存条目Hub + 相对路径),所有路径均为 URL 路径风格。
@@ -37,10 +38,11 @@ type Locator struct {
// Entry 表示一次缓存命中结果,包含绝对文件路径及文件信息。
type Entry struct {
Locator Locator `json:"locator"`
FilePath string `json:"file_path"`
SizeBytes int64 `json:"size_bytes"`
ModTime time.Time
Locator Locator `json:"locator"`
FilePath string `json:"file_path"`
SizeBytes int64 `json:"size_bytes"`
ModTime time.Time `json:"mod_time"`
EffectiveUpstreamPath string `json:"effective_upstream_path,omitempty"`
}
// ReadResult 组合 Entry 与正文 Reader便于代理层直接将 Body 流式返回。

View File

@@ -5,6 +5,7 @@ import (
"context"
"io"
"os"
"strings"
"testing"
"time"
)
@@ -62,6 +63,28 @@ func TestStoreRemove(t *testing.T) {
}
}
func TestStorePersistsEffectiveUpstreamPath(t *testing.T) {
store := newTestStore(t)
locator := Locator{HubName: "docker", Path: "/v2/coredns/manifests/v1.13.1"}
_, err := store.Put(context.Background(), locator, strings.NewReader("body"), PutOptions{
EffectiveUpstreamPath: "/v2/coredns/coredns/manifests/v1.13.1",
})
if err != nil {
t.Fatalf("put error: %v", err)
}
result, err := store.Get(context.Background(), locator)
if err != nil {
t.Fatalf("get error: %v", err)
}
defer result.Reader.Close()
if result.Entry.EffectiveUpstreamPath != "/v2/coredns/coredns/manifests/v1.13.1" {
t.Fatalf("unexpected effective upstream path: %q", result.Entry.EffectiveUpstreamPath)
}
}
func TestStoreIgnoresDirectories(t *testing.T) {
store := newTestStore(t)
locator := Locator{HubName: "ghcr", Path: "/v2"}

View File

@@ -64,16 +64,15 @@ type GlobalConfig struct {
// HubConfig 决定单个代理实例如何与下游/上游交互。
type HubConfig struct {
Name string `mapstructure:"Name"`
Domain string `mapstructure:"Domain"`
Upstream string `mapstructure:"Upstream"`
Proxy string `mapstructure:"Proxy"`
Type string `mapstructure:"Type"`
Username string `mapstructure:"Username"`
Password string `mapstructure:"Password"`
CacheTTL Duration `mapstructure:"CacheTTL"`
ValidationMode string `mapstructure:"ValidationMode"`
EnableHeadCheck bool `mapstructure:"EnableHeadCheck"`
Name string `mapstructure:"Name"`
Domain string `mapstructure:"Domain"`
Upstream string `mapstructure:"Upstream"`
Proxy string `mapstructure:"Proxy"`
Type string `mapstructure:"Type"`
Username string `mapstructure:"Username"`
Password string `mapstructure:"Password"`
CacheTTL Duration `mapstructure:"CacheTTL"`
ValidationMode string `mapstructure:"ValidationMode"`
}
// Config 是 TOML 文件映射的整体结构。
@@ -109,11 +108,12 @@ func CredentialModes(hubs []HubConfig) []string {
// StrategyOverrides 将 hub 层的 TTL/Validation 配置映射为模块策略覆盖项。
func (h HubConfig) StrategyOverrides(ttl time.Duration) hubmodule.StrategyOptions {
opts := hubmodule.StrategyOptions{
TTLOverride: ttl,
}
opts := hubmodule.StrategyOptions{}
if mode := strings.TrimSpace(h.ValidationMode); mode != "" {
opts.ValidationOverride = hubmodule.ValidationMode(mode)
}
if h.CacheTTL.DurationValue() > 0 {
opts.TTLOverride = ttl
}
return opts
}

View File

@@ -37,7 +37,14 @@ func cachePolicy(_ *hooks.RequestContext, locatorPath string, current hooks.Cach
// 索引类Release/Packages/Contents需要 If-None-Match/If-Modified-Since 再验证。
if strings.HasSuffix(clean, "/release") ||
strings.HasSuffix(clean, "/inrelease") ||
strings.HasSuffix(clean, "/release.gpg") {
strings.HasSuffix(clean, "/release.gpg") ||
strings.HasSuffix(clean, "/packages") ||
strings.HasSuffix(clean, "/packages.gz") ||
strings.HasSuffix(clean, "/packages.xz") ||
strings.HasSuffix(clean, "/sources") ||
strings.HasSuffix(clean, "/sources.gz") ||
strings.HasSuffix(clean, "/sources.xz") ||
strings.Contains(clean, "/contents-") {
current.AllowCache = true
current.AllowStore = true
current.RequireRevalidate = true

View File

@@ -1,6 +1,7 @@
package docker
import (
"net"
"strings"
"github.com/any-hub/any-hub/internal/proxy/hooks"
@@ -15,6 +16,9 @@ func init() {
}
func normalizePath(ctx *hooks.RequestContext, clean string, rawQuery []byte) (string, []byte) {
if ctx == nil || !isDockerHubHost(ctx.UpstreamHost) {
return clean, rawQuery
}
repo, rest, ok := splitDockerRepoPath(clean)
if !ok || repo == "" || strings.Contains(repo, "/") || repo == "library" {
return clean, rawQuery
@@ -54,6 +58,9 @@ func contentType(_ *hooks.RequestContext, locatorPath string) string {
}
func isDockerHubHost(host string) bool {
if parsedHost, _, err := net.SplitHostPort(host); err == nil {
host = parsedHost
}
switch strings.ToLower(host) {
case "registry-1.docker.io", "docker.io", "index.docker.io":
return true
@@ -62,6 +69,28 @@ func isDockerHubHost(host string) bool {
}
}
func isRegistryK8sHost(host string) bool {
if parsedHost, _, err := net.SplitHostPort(host); err == nil {
host = parsedHost
}
return strings.EqualFold(host, "registry.k8s.io")
}
func manifestFallbackPath(ctx *hooks.RequestContext, clean string) (string, bool) {
if ctx == nil || !isRegistryK8sHost(ctx.UpstreamHost) {
return "", false
}
repo, rest, ok := splitDockerRepoPath(clean)
if !ok || strings.Contains(repo, "/") || !strings.HasPrefix(rest, "/manifests/") {
return "", false
}
return "/v2/" + repo + "/" + repo + rest, true
}
func RegistryK8sManifestFallbackPath(ctx *hooks.RequestContext, clean string) (string, bool) {
return manifestFallbackPath(ctx, clean)
}
func splitDockerRepoPath(path string) (string, string, bool) {
if !strings.HasPrefix(path, "/v2/") {
return "", "", false

View File

@@ -19,6 +19,52 @@ func TestNormalizePathAddsLibraryForDockerHub(t *testing.T) {
}
}
func TestNormalizePathSkipsLibraryForNonDockerHub(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
path, _ := normalizePath(ctx, "/v2/kube-apiserver/manifests/v1.35.3", nil)
if path != "/v2/kube-apiserver/manifests/v1.35.3" {
t.Fatalf("expected non-docker hub path to remain unchanged, got %s", path)
}
}
func TestIsRegistryK8sHost(t *testing.T) {
if !isRegistryK8sHost("registry.k8s.io") {
t.Fatalf("expected registry.k8s.io to match")
}
if isRegistryK8sHost("example.com") {
t.Fatalf("expected non-registry.k8s.io host to be ignored")
}
}
func TestRegistryK8sManifestFallbackPath(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
path, ok := manifestFallbackPath(ctx, "/v2/coredns/manifests/v1.13.1")
if !ok || path != "/v2/coredns/coredns/manifests/v1.13.1" {
t.Fatalf("expected fallback path, got %q ok=%v", path, ok)
}
}
func TestRegistryK8sManifestFallbackPathRejectsMultiSegmentRepo(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/coredns/manifests/v1.13.1"); ok {
t.Fatalf("expected multi-segment repo to be ignored")
}
}
func TestRegistryK8sManifestFallbackPathRejectsNonManifest(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "registry.k8s.io"}
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/blobs/sha256:deadbeef"); ok {
t.Fatalf("expected non-manifest path to be ignored")
}
}
func TestRegistryK8sManifestFallbackPathRejectsNonRegistryHost(t *testing.T) {
ctx := &hooks.RequestContext{UpstreamHost: "mirror.gcr.io"}
if _, ok := manifestFallbackPath(ctx, "/v2/coredns/manifests/v1.13.1"); ok {
t.Fatalf("expected non-registry.k8s.io host to be ignored")
}
}
func TestSplitDockerRepoPath(t *testing.T) {
repo, rest, ok := splitDockerRepoPath("/v2/library/nginx/manifests/latest")
if !ok || repo != "library/nginx" || rest != "/manifests/latest" {

View File

@@ -11,13 +11,14 @@ type StrategyOptions struct {
// ResolveStrategy 将模块的默认策略与 hub 级覆盖合并。
func ResolveStrategy(meta ModuleMetadata, opts StrategyOptions) CacheStrategyProfile {
strategy := meta.CacheStrategy
if strategy.TTLHint > 0 && opts.TTLOverride > 0 {
strategy.TTLHint = opts.TTLOverride
}
if opts.ValidationOverride != "" {
strategy.ValidationMode = opts.ValidationOverride
}
return normalizeStrategy(strategy)
strategy = normalizeStrategy(strategy)
if opts.TTLOverride > 0 {
strategy.TTLHint = opts.TTLOverride
}
return strategy
}
func normalizeStrategy(profile CacheStrategyProfile) CacheStrategyProfile {

View File

@@ -3,7 +3,7 @@ package proxy
import (
"bytes"
"context"
"crypto/sha1"
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"encoding/json"
@@ -22,6 +22,7 @@ import (
"github.com/any-hub/any-hub/internal/cache"
"github.com/any-hub/any-hub/internal/hubmodule"
dockermodule "github.com/any-hub/any-hub/internal/hubmodule/docker"
"github.com/any-hub/any-hub/internal/logging"
"github.com/any-hub/any-hub/internal/proxy/hooks"
"github.com/any-hub/any-hub/internal/server"
@@ -202,10 +203,25 @@ func (h *Handler) serveCache(
c.Status(status)
if method == http.MethodHead {
// 对 HEAD 请求仍向上游发起一次 HEAD以满足“显式请求 + 再验证”的期望。
if route != nil && route.UpstreamURL != nil {
if resp, err := h.revalidateRequest(c, route, resolveUpstreamURL(route, route.UpstreamURL, c, hook), result.Entry.Locator, ""); err == nil {
resp.Body.Close()
shouldRevalidate := true
if hook != nil && hook.hasHooks && hook.def.CachePolicy != nil {
policy := hook.def.CachePolicy(hook.ctx, result.Entry.Locator.Path, hooks.CachePolicy{
AllowCache: true,
AllowStore: true,
RequireRevalidate: true,
})
if !policy.AllowCache || !policy.AllowStore {
shouldRevalidate = false
} else {
shouldRevalidate = policy.RequireRevalidate
}
}
if shouldRevalidate {
if resp, err := h.revalidateRequest(c, route, effectiveRevalidateURL(route, c, result.Entry, hook), result.Entry.Locator, ""); err == nil {
resp.Body.Close()
}
}
}
result.Reader.Close()
@@ -244,6 +260,17 @@ func (h *Handler) fetchAndStream(
h.logResult(route, upstreamURL.String(), requestID, 0, false, started, err)
return h.writeError(c, fiber.StatusBadGateway, "upstream_failed")
}
effectiveUpstreamPath := ""
originalStatus := resp.StatusCode
originalPath := ""
if hook != nil {
originalPath = hook.clean
}
resp, upstreamURL, effectiveUpstreamPath, err = h.retryRegistryK8sManifestFallback(c, route, requestID, resp, upstreamURL, hook, originalStatus, originalPath)
if err != nil {
h.logResult(route, upstreamURL.String(), requestID, 0, false, started, err)
return h.writeError(c, fiber.StatusBadGateway, "upstream_failed")
}
if hook != nil && hook.hasHooks && hook.def.RewriteResponse != nil {
if rewritten, rewriteErr := applyHookRewrite(hook, resp, requestPath(c)); rewriteErr == nil {
resp = rewritten
@@ -258,7 +285,7 @@ func (h *Handler) fetchAndStream(
shouldStore := policy.allowStore && writer.Enabled() && isCacheableStatus(resp.StatusCode) &&
c.Method() == http.MethodGet
return h.consumeUpstream(c, route, locator, resp, shouldStore, writer, requestID, started, ctx)
return h.consumeUpstream(c, route, locator, resp, shouldStore, writer, requestID, started, ctx, effectiveUpstreamPath)
}
func applyHookRewrite(hook *hookState, resp *http.Response, path string) (*http.Response, error) {
@@ -310,13 +337,14 @@ func (h *Handler) consumeUpstream(
requestID string,
started time.Time,
ctx context.Context,
effectiveUpstreamPath string,
) error {
upstreamURL := resp.Request.URL.String()
method := c.Method()
authFailure := isAuthFailure(resp.StatusCode) && route.Config.HasCredentials()
if shouldStore {
return h.cacheAndStream(c, route, locator, resp, writer, requestID, started, ctx, upstreamURL)
return h.cacheAndStream(c, route, locator, resp, writer, requestID, started, ctx, upstreamURL, effectiveUpstreamPath)
}
copyResponseHeaders(c, resp.Header)
@@ -354,6 +382,7 @@ func (h *Handler) cacheAndStream(
started time.Time,
ctx context.Context,
upstreamURL string,
effectiveUpstreamPath string,
) error {
copyResponseHeaders(c, resp.Header)
c.Set("X-Any-Hub-Upstream", upstreamURL)
@@ -366,7 +395,7 @@ func (h *Handler) cacheAndStream(
// 使用 TeeReader 边向客户端回写边落盘,避免大文件在内存中完整缓冲。
reader := io.TeeReader(resp.Body, c.Response().BodyWriter())
opts := cache.PutOptions{ModTime: extractModTime(resp.Header)}
opts := cache.PutOptions{ModTime: extractModTime(resp.Header), EffectiveUpstreamPath: effectiveUpstreamPath}
entry, err := writer.Put(ctx, locator, reader, opts)
h.logResult(route, upstreamURL, requestID, resp.StatusCode, false, started, err)
if err != nil {
@@ -571,7 +600,7 @@ func resolveContentType(route *server.HubRoute, locator cache.Locator, hook *hoo
func buildLocator(route *server.HubRoute, c fiber.Ctx, clean string, rawQuery []byte) cache.Locator {
query := rawQuery
if len(query) > 0 {
sum := sha1.Sum(query)
sum := sha256.Sum256(query)
clean = fmt.Sprintf("%s/__qs/%s", clean, hex.EncodeToString(sum[:]))
}
loc := cache.Locator{
@@ -759,7 +788,7 @@ func (h *Handler) isCacheFresh(
ctx = context.Background()
}
upstreamURL := resolveUpstreamURL(route, route.UpstreamURL, c, hook)
upstreamURL := effectiveRevalidateURL(route, c, entry, hook)
resp, err := h.revalidateRequest(c, route, upstreamURL, locator, "")
if err != nil {
return false, err
@@ -791,7 +820,7 @@ func (h *Handler) isCacheFresh(
return true, nil
case http.StatusOK:
if resp.Header.Get("Etag") == "" && resp.Header.Get("Docker-Content-Digest") == "" && resp.Header.Get("Last-Modified") == "" {
return true, nil
return false, nil
}
h.rememberETag(route, locator, resp)
remote := extractModTime(resp.Header)
@@ -810,6 +839,77 @@ func (h *Handler) isCacheFresh(
}
}
func effectiveRevalidateURL(route *server.HubRoute, c fiber.Ctx, entry cache.Entry, hook *hookState) *url.URL {
if route == nil || route.UpstreamURL == nil || entry.EffectiveUpstreamPath == "" {
return resolveUpstreamURL(route, route.UpstreamURL, c, hook)
}
clone := *route.UpstreamURL
clone.Path = entry.EffectiveUpstreamPath
clone.RawPath = entry.EffectiveUpstreamPath
return &clone
}
func (h *Handler) retryRegistryK8sManifestFallback(
c fiber.Ctx,
route *server.HubRoute,
requestID string,
resp *http.Response,
upstreamURL *url.URL,
hook *hookState,
originalStatus int,
originalPath string,
) (*http.Response, *url.URL, string, error) {
if resp == nil || resp.StatusCode != http.StatusNotFound || hook == nil || hook.ctx == nil {
return resp, upstreamURL, "", nil
}
fallbackPath, ok := dockermodule.RegistryK8sManifestFallbackPath(hook.ctx, hook.clean)
if !ok {
return resp, upstreamURL, "", nil
}
resp.Body.Close()
fallbackHook := *hook
fallbackHook.clean = fallbackPath
fallbackResp, fallbackURL, err := h.executeRequest(c, route, &fallbackHook)
if err != nil {
return nil, upstreamURL, "", err
}
fallbackResp, fallbackURL, err = h.retryOnAuthFailure(c, route, requestID, time.Now(), fallbackResp, fallbackURL, &fallbackHook)
if err != nil {
return nil, upstreamURL, "", err
}
h.logRegistryK8sFallback(route, requestID, originalPath, fallbackPath, originalStatus, c.Method())
if fallbackResp.StatusCode == http.StatusOK {
return fallbackResp, fallbackURL, fallbackPath, nil
}
return fallbackResp, fallbackURL, "", nil
}
func (h *Handler) logRegistryK8sFallback(route *server.HubRoute, requestID string, originalPath string, fallbackPath string, originalStatus int, method string) {
if route == nil || h == nil || h.logger == nil {
return
}
fields := logging.RequestFields(
route.Config.Name,
route.Config.Domain,
route.Config.Type,
route.Config.AuthMode(),
route.Module.Key,
false,
)
fields["action"] = "proxy_fallback"
if route.UpstreamURL != nil {
fields["upstream_host"] = route.UpstreamURL.Hostname()
}
fields["original_path"] = originalPath
fields["fallback_path"] = fallbackPath
fields["original_status"] = originalStatus
fields["method"] = method
if requestID != "" {
fields["request_id"] = requestID
}
h.logger.WithFields(fields).Info("proxy_registry_k8s_fallback")
}
func (h *Handler) revalidateRequest(
c fiber.Ctx,
route *server.HubRoute,

View File

@@ -0,0 +1,55 @@
package proxy
import (
"bytes"
"net/url"
"strings"
"testing"
"github.com/sirupsen/logrus"
"github.com/any-hub/any-hub/internal/config"
"github.com/any-hub/any-hub/internal/hubmodule"
"github.com/any-hub/any-hub/internal/server"
)
func TestRegistryK8sManifestFallbackLogsStructuredEvent(t *testing.T) {
buf := new(bytes.Buffer)
logger := logrus.New()
logger.SetOutput(buf)
logger.SetFormatter(&logrus.JSONFormatter{})
upstreamURL, err := url.Parse("http://registry.k8s.io")
if err != nil {
t.Fatalf("parse upstream url: %v", err)
}
h := NewHandler(nil, logger, nil)
route := &server.HubRoute{
Config: config.HubConfig{
Name: "docker",
Domain: "k8s.hub.local",
Type: "docker",
},
Module: hubmodule.ModuleMetadata{Key: "docker"},
UpstreamURL: upstreamURL,
}
h.logRegistryK8sFallback(route, "req-1", "/v2/coredns/manifests/v1.13.1", "/v2/coredns/coredns/manifests/v1.13.1", 404, "GET")
output := buf.String()
for _, want := range []string{
"proxy_registry_k8s_fallback",
`"hub":"docker"`,
`"domain":"k8s.hub.local"`,
`"upstream_host":"registry.k8s.io"`,
`"original_path":"/v2/coredns/manifests/v1.13.1"`,
`"fallback_path":"/v2/coredns/coredns/manifests/v1.13.1"`,
`"original_status":404`,
`"method":"GET"`,
} {
if !strings.Contains(output, want) {
t.Fatalf("expected log output to contain %s, got %s", want, output)
}
}
}

View File

@@ -15,11 +15,10 @@ func TestHubRegistryLookupByHost(t *testing.T) {
},
Hubs: []config.HubConfig{
{
Name: "docker",
Domain: "docker.hub.local",
Type: "docker",
Upstream: "https://registry-1.docker.io",
EnableHeadCheck: true,
Name: "docker",
Domain: "docker.hub.local",
Type: "docker",
Upstream: "https://registry-1.docker.io",
},
{
Name: "npm",

View File

@@ -28,7 +28,6 @@
- `Upstream` (string, required, http/https URL)
- `Proxy` (string, optional, URL)
- `CacheTTL` (duration, optional, overrides global)
- `EnableHeadCheck` (bool, optional, default true)
- **Validation Rules**: `Name` 必须唯一;`Domain` + `Port` 组合不得冲突URL 必须可解析。
- **Relationships**: 属于 `Config`,在运行时用于初始化路由、缓存目录 `StoragePath/<Name>`

View File

@@ -4,7 +4,7 @@
### HubRoute
- **Description**: Host/端口到上游仓库的映射,供 Fiber 路由和 Proxy handler 使用。
- **Fields**: `Name` (string, unique), `Domain` (string, FQDN), `Port` (int, 1-65535), `Upstream` (URL), `Proxy` (URL, optional), `CacheTTL` (duration override), `EnableHeadCheck` (bool).
- **Fields**: `Name` (string, unique), `Domain` (string, FQDN), `Port` (int, 1-65535), `Upstream` (URL), `Proxy` (URL, optional), `CacheTTL` (duration override).
- **Validation**: Name 唯一Domain 不含协议/路径Upstream 必须 http/https。
- **Relationships**: 由 config 加载到 `HubRegistry`;与 CacheEntry、ProxyRequest 通过 `Name` 关联。

View File

@@ -26,6 +26,8 @@ import (
const (
dockerManifestPath = "/v2/library/cache-flow/manifests/latest"
dockerManifestNoNamespacePath = "/v2/cache-flow/manifests/latest"
registryK8sOriginalManifest = "/v2/coredns/manifests/v1.13.1"
registryK8sFallbackManifest = "/v2/coredns/coredns/manifests/v1.13.1"
)
func TestCacheFlowWithConditionalRequest(t *testing.T) {
@@ -222,8 +224,8 @@ func TestDockerManifestHeadDoesNotOverwriteCache(t *testing.T) {
}
}
func TestDockerNamespaceFallbackAddsLibrary(t *testing.T) {
stub := newCacheFlowStub(t, dockerManifestPath)
func TestDockerNonDockerHubUpstreamKeepsOriginalPath(t *testing.T) {
stub := newCacheFlowStub(t, dockerManifestNoNamespacePath)
defer stub.Close()
storageDir := t.TempDir()
@@ -277,7 +279,7 @@ func TestDockerNamespaceFallbackAddsLibrary(t *testing.T) {
}
if resp.StatusCode != fiber.StatusOK {
body, _ := io.ReadAll(resp.Body)
t.Fatalf("expected 200 when fallback applies, got %d (body=%s)", resp.StatusCode, string(body))
t.Fatalf("expected 200 when upstream keeps original path, got %d (body=%s)", resp.StatusCode, string(body))
}
resp.Body.Close()
@@ -286,6 +288,140 @@ func TestDockerNamespaceFallbackAddsLibrary(t *testing.T) {
}
}
func TestRegistryK8sManifestFallbackRetry(t *testing.T) {
stub := newRegistryK8sStub(t)
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
defer stub.Close()
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
req.Host = "k8s.hub.local"
resp, err := app.Test(req)
if err != nil {
t.Fatalf("app.Test failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != fiber.StatusOK {
body, _ := io.ReadAll(resp.Body)
t.Fatalf("expected 200, got %d (body=%s)", resp.StatusCode, string(body))
}
if stub.pathHits(registryK8sOriginalManifest, http.MethodGet) != 1 {
t.Fatalf("expected single original manifest hit")
}
if stub.pathHits(registryK8sFallbackManifest, http.MethodGet) != 1 {
t.Fatalf("expected single fallback manifest hit")
}
}
func TestRegistryK8sManifestFallbackNotAttemptedWhenOriginalSucceeds(t *testing.T) {
stub := newRegistryK8sStub(t)
stub.setResponse(registryK8sOriginalManifest, http.StatusOK, []byte("original manifest"))
defer stub.Close()
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
req.Host = "k8s.hub.local"
resp, err := app.Test(req)
if err != nil {
t.Fatalf("app.Test failed: %v", err)
}
resp.Body.Close()
if resp.StatusCode != fiber.StatusOK {
t.Fatalf("expected 200, got %d", resp.StatusCode)
}
if stub.pathHits(registryK8sFallbackManifest, http.MethodGet) != 0 {
t.Fatalf("expected no fallback hit")
}
}
func TestRegistryK8sManifestFallbackNotAttemptedForNonRegistryHost(t *testing.T) {
stub := newRegistryK8sStub(t)
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
defer stub.Close()
app := newDockerProxyTestApp(t, t.TempDir(), "mirror.hub.local", stub.URL)
req := httptest.NewRequest(http.MethodGet, "http://mirror.hub.local"+registryK8sOriginalManifest, nil)
req.Host = "mirror.hub.local"
resp, err := app.Test(req)
if err != nil {
t.Fatalf("app.Test failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != fiber.StatusNotFound {
body, _ := io.ReadAll(resp.Body)
t.Fatalf("expected 404, got %d (body=%s)", resp.StatusCode, string(body))
}
if stub.pathHits(registryK8sFallbackManifest, http.MethodGet) != 0 {
t.Fatalf("expected no fallback hit for non-registry host")
}
}
func TestRegistryK8sManifestFallbackSecondRequestHitsCache(t *testing.T) {
stub := newRegistryK8sStub(t)
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
defer stub.Close()
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
makeRequest := func() *http.Response {
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
req.Host = "k8s.hub.local"
resp, err := app.Test(req)
if err != nil {
t.Fatalf("app.Test failed: %v", err)
}
return resp
}
resp1 := makeRequest()
resp1.Body.Close()
resp2 := makeRequest()
defer resp2.Body.Close()
if resp2.Header.Get("X-Any-Hub-Cache-Hit") != "true" {
t.Fatalf("expected second request to hit cache")
}
if stub.pathHits(registryK8sOriginalManifest, http.MethodGet) != 1 {
t.Fatalf("expected original 404 path to be requested once, got %d", stub.pathHits(registryK8sOriginalManifest, http.MethodGet))
}
}
func TestRegistryK8sFallbackCacheRevalidatesEffectivePath(t *testing.T) {
stub := newRegistryK8sStub(t)
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
defer stub.Close()
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
makeRequest := func() *http.Response {
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
req.Host = "k8s.hub.local"
resp, err := app.Test(req)
if err != nil {
t.Fatalf("app.Test failed: %v", err)
}
return resp
}
resp1 := makeRequest()
resp1.Body.Close()
resp2 := makeRequest()
resp2.Body.Close()
if stub.pathHits(registryK8sOriginalManifest, http.MethodHead) != 0 {
t.Fatalf("expected no HEAD revalidation against original path, got %d", stub.pathHits(registryK8sOriginalManifest, http.MethodHead))
}
if stub.pathHits(registryK8sFallbackManifest, http.MethodHead) == 0 {
t.Fatalf("expected HEAD revalidation against fallback path")
}
}
type cacheFlowStub struct {
server *http.Server
listener net.Listener
@@ -300,6 +436,22 @@ type cacheFlowStub struct {
lastMod string
}
type registryK8sStub struct {
server *http.Server
listener net.Listener
URL string
mu sync.Mutex
responses map[string]registryK8sResponse
hits map[string]int
}
type registryK8sResponse struct {
status int
body []byte
etag string
}
func newCacheFlowStub(t *testing.T, paths ...string) *cacheFlowStub {
t.Helper()
stub := &cacheFlowStub{
@@ -335,6 +487,32 @@ func newCacheFlowStub(t *testing.T, paths ...string) *cacheFlowStub {
return stub
}
func newRegistryK8sStub(t *testing.T) *registryK8sStub {
t.Helper()
stub := &registryK8sStub{
responses: make(map[string]registryK8sResponse),
hits: make(map[string]int),
}
mux := http.NewServeMux()
mux.HandleFunc("/", stub.handle)
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Skipf("unable to start registry.k8s.io stub listener: %v", err)
}
server := &http.Server{Handler: mux}
stub.server = server
stub.listener = listener
stub.URL = "http://" + listener.Addr().String()
go func() {
_ = server.Serve(listener)
}()
return stub
}
func (s *cacheFlowStub) Close() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
@@ -346,6 +524,109 @@ func (s *cacheFlowStub) Close() {
}
}
func (s *registryK8sStub) Close() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if s.server != nil {
_ = s.server.Shutdown(ctx)
}
if s.listener != nil {
_ = s.listener.Close()
}
}
func (s *registryK8sStub) setResponse(path string, status int, body []byte) {
s.mu.Lock()
defer s.mu.Unlock()
s.responses[path] = registryK8sResponse{
status: status,
body: append([]byte(nil), body...),
etag: fmt.Sprintf(`"%s-etag"`, strings.ReplaceAll(strings.Trim(path, "/"), "/", "-")),
}
}
func (s *registryK8sStub) pathHits(path string, method string) int {
s.mu.Lock()
defer s.mu.Unlock()
return s.hits[method+" "+path]
}
func (s *registryK8sStub) handle(w http.ResponseWriter, r *http.Request) {
s.mu.Lock()
s.hits[r.Method+" "+r.URL.Path]++
resp, ok := s.responses[r.URL.Path]
s.mu.Unlock()
if !ok {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(`{"errors":[{"code":"MANIFEST_UNKNOWN","message":"manifest unknown"}]}`))
return
}
w.Header().Set("Content-Type", "application/vnd.docker.distribution.manifest.v2+json")
w.Header().Set("Etag", resp.etag)
w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat))
if r.Method == http.MethodHead {
for _, candidate := range r.Header.Values("If-None-Match") {
if strings.Trim(candidate, `"`) == strings.Trim(resp.etag, `"`) {
w.WriteHeader(http.StatusNotModified)
return
}
}
w.WriteHeader(resp.status)
return
}
w.WriteHeader(resp.status)
_, _ = w.Write(resp.body)
}
func newDockerProxyTestApp(t *testing.T, storageDir string, domain string, upstream string, proxyURLs ...string) *fiber.App {
t.Helper()
proxyURL := ""
if len(proxyURLs) > 0 {
proxyURL = proxyURLs[0]
}
cfg := &config.Config{
Global: config.GlobalConfig{
ListenPort: 5000,
CacheTTL: config.Duration(30 * time.Second),
StoragePath: storageDir,
},
Hubs: []config.HubConfig{
{
Name: "docker",
Domain: domain,
Type: "docker",
Upstream: upstream,
Proxy: proxyURL,
},
},
}
registry, err := server.NewHubRegistry(cfg)
if err != nil {
t.Fatalf("registry error: %v", err)
}
logger := logrus.New()
logger.SetOutput(io.Discard)
store, err := cache.NewStore(storageDir)
if err != nil {
t.Fatalf("store error: %v", err)
}
client := server.NewUpstreamClient(cfg)
handler := proxy.NewHandler(client, logger, store)
app, err := server.NewApp(server.AppOptions{
Logger: logger,
Registry: registry,
Proxy: handler,
ListenPort: 5000,
})
if err != nil {
t.Fatalf("app error: %v", err)
}
return app
}
func (s *cacheFlowStub) handle(w http.ResponseWriter, r *http.Request) {
s.mu.Lock()
etag := s.etag

View File

@@ -68,8 +68,8 @@ func TestCacheStrategyOverrides(t *testing.T) {
}
resp2.Body.Close()
if headCount := countRequests(stub.Requests(), http.MethodHead, "/lodash"); headCount != 0 {
t.Fatalf("expected no HEAD before TTL expiry, got %d", headCount)
if headCount := countRequests(stub.Requests(), http.MethodHead, "/lodash"); headCount != 1 {
t.Fatalf("expected single HEAD before TTL expiry, got %d", headCount)
}
if getCount := countRequests(stub.Requests(), http.MethodGet, "/lodash"); getCount != 1 {
t.Fatalf("upstream should be hit once before TTL expiry, got %d", getCount)
@@ -85,8 +85,8 @@ func TestCacheStrategyOverrides(t *testing.T) {
}
resp3.Body.Close()
if headCount := countRequests(stub.Requests(), http.MethodHead, "/lodash"); headCount != 1 {
t.Fatalf("expected single HEAD after TTL expiry, got %d", headCount)
if headCount := countRequests(stub.Requests(), http.MethodHead, "/lodash"); headCount != 2 {
t.Fatalf("expected two HEAD requests after TTL expiry, got %d", headCount)
}
if getCount := countRequests(stub.Requests(), http.MethodGet, "/lodash"); getCount != 1 {
t.Fatalf("upstream GET count should remain 1, got %d", getCount)

View File

@@ -30,6 +30,7 @@ type upstreamStub struct {
requests []RecordedRequest
mode upstreamMode
blobBytes []byte
lastMod string
}
// RecordedRequest 捕获每次请求的方法/路径/Host/Headers便于断言代理行为。
@@ -48,13 +49,14 @@ func newUpstreamStub(t *testing.T, mode upstreamMode) *upstreamStub {
stub := &upstreamStub{
mode: mode,
blobBytes: []byte("stub-layer-payload"),
lastMod: time.Now().UTC().Format(http.TimeFormat),
}
switch mode {
case upstreamDocker:
registerDockerHandlers(mux, stub.blobBytes)
case upstreamNPM:
registerNPMHandlers(mux)
registerNPMHandlers(mux, stub)
default:
t.Fatalf("unsupported stub mode: %s", mode)
}
@@ -153,9 +155,10 @@ func registerDockerHandlers(mux *http.ServeMux, blob []byte) {
})
}
func registerNPMHandlers(mux *http.ServeMux) {
func registerNPMHandlers(mux *http.ServeMux, stub *upstreamStub) {
mux.HandleFunc("/lodash", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Last-Modified", stub.lastMod)
resp := map[string]any{
"name": "lodash",
"dist-tags": map[string]string{
@@ -174,6 +177,7 @@ func registerNPMHandlers(mux *http.ServeMux) {
mux.HandleFunc("/lodash/-/lodash-4.17.21.tgz", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Last-Modified", stub.lastMod)
_, _ = w.Write([]byte("tarball-bytes"))
})
}
@@ -230,6 +234,37 @@ func TestDockerStubServesManifestAndBlob(t *testing.T) {
}
}
func TestNPMStubKeepsStableValidatorsAcrossHeadRequests(t *testing.T) {
stub := newUpstreamStub(t, upstreamNPM)
defer stub.Close()
req1, err := http.NewRequest(http.MethodHead, stub.URL+"/lodash", nil)
if err != nil {
t.Fatalf("build first HEAD request: %v", err)
}
resp1, err := http.DefaultClient.Do(req1)
if err != nil {
t.Fatalf("first HEAD failed: %v", err)
}
resp1.Body.Close()
time.Sleep(1100 * time.Millisecond)
req2, err := http.NewRequest(http.MethodHead, stub.URL+"/lodash", nil)
if err != nil {
t.Fatalf("build second HEAD request: %v", err)
}
resp2, err := http.DefaultClient.Do(req2)
if err != nil {
t.Fatalf("second HEAD failed: %v", err)
}
resp2.Body.Close()
if resp1.Header.Get("Last-Modified") != resp2.Header.Get("Last-Modified") {
t.Fatalf("expected stable Last-Modified, got %q then %q", resp1.Header.Get("Last-Modified"), resp2.Header.Get("Last-Modified"))
}
}
func TestNPMStubServesMetadataAndTarball(t *testing.T) {
stub := newUpstreamStub(t, upstreamNPM)
defer stub.Close()