feat: add registry k8s manifest fallback

This commit is contained in:
2026-03-23 16:55:20 +08:00
parent 9c85b9b097
commit 442c27a26d
4 changed files with 430 additions and 5 deletions

View File

@@ -26,6 +26,8 @@ import (
const (
dockerManifestPath = "/v2/library/cache-flow/manifests/latest"
dockerManifestNoNamespacePath = "/v2/cache-flow/manifests/latest"
registryK8sOriginalManifest = "/v2/coredns/manifests/v1.13.1"
registryK8sFallbackManifest = "/v2/coredns/coredns/manifests/v1.13.1"
)
func TestCacheFlowWithConditionalRequest(t *testing.T) {
@@ -286,6 +288,140 @@ func TestDockerNonDockerHubUpstreamKeepsOriginalPath(t *testing.T) {
}
}
func TestRegistryK8sManifestFallbackRetry(t *testing.T) {
stub := newRegistryK8sStub(t)
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
defer stub.Close()
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
req.Host = "k8s.hub.local"
resp, err := app.Test(req)
if err != nil {
t.Fatalf("app.Test failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != fiber.StatusOK {
body, _ := io.ReadAll(resp.Body)
t.Fatalf("expected 200, got %d (body=%s)", resp.StatusCode, string(body))
}
if stub.pathHits(registryK8sOriginalManifest, http.MethodGet) != 1 {
t.Fatalf("expected single original manifest hit")
}
if stub.pathHits(registryK8sFallbackManifest, http.MethodGet) != 1 {
t.Fatalf("expected single fallback manifest hit")
}
}
func TestRegistryK8sManifestFallbackNotAttemptedWhenOriginalSucceeds(t *testing.T) {
stub := newRegistryK8sStub(t)
stub.setResponse(registryK8sOriginalManifest, http.StatusOK, []byte("original manifest"))
defer stub.Close()
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
req.Host = "k8s.hub.local"
resp, err := app.Test(req)
if err != nil {
t.Fatalf("app.Test failed: %v", err)
}
resp.Body.Close()
if resp.StatusCode != fiber.StatusOK {
t.Fatalf("expected 200, got %d", resp.StatusCode)
}
if stub.pathHits(registryK8sFallbackManifest, http.MethodGet) != 0 {
t.Fatalf("expected no fallback hit")
}
}
func TestRegistryK8sManifestFallbackNotAttemptedForNonRegistryHost(t *testing.T) {
stub := newRegistryK8sStub(t)
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
defer stub.Close()
app := newDockerProxyTestApp(t, t.TempDir(), "mirror.hub.local", stub.URL)
req := httptest.NewRequest(http.MethodGet, "http://mirror.hub.local"+registryK8sOriginalManifest, nil)
req.Host = "mirror.hub.local"
resp, err := app.Test(req)
if err != nil {
t.Fatalf("app.Test failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != fiber.StatusNotFound {
body, _ := io.ReadAll(resp.Body)
t.Fatalf("expected 404, got %d (body=%s)", resp.StatusCode, string(body))
}
if stub.pathHits(registryK8sFallbackManifest, http.MethodGet) != 0 {
t.Fatalf("expected no fallback hit for non-registry host")
}
}
func TestRegistryK8sManifestFallbackSecondRequestHitsCache(t *testing.T) {
stub := newRegistryK8sStub(t)
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
defer stub.Close()
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
makeRequest := func() *http.Response {
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
req.Host = "k8s.hub.local"
resp, err := app.Test(req)
if err != nil {
t.Fatalf("app.Test failed: %v", err)
}
return resp
}
resp1 := makeRequest()
resp1.Body.Close()
resp2 := makeRequest()
defer resp2.Body.Close()
if resp2.Header.Get("X-Any-Hub-Cache-Hit") != "true" {
t.Fatalf("expected second request to hit cache")
}
if stub.pathHits(registryK8sOriginalManifest, http.MethodGet) != 1 {
t.Fatalf("expected original 404 path to be requested once, got %d", stub.pathHits(registryK8sOriginalManifest, http.MethodGet))
}
}
func TestRegistryK8sFallbackCacheRevalidatesEffectivePath(t *testing.T) {
stub := newRegistryK8sStub(t)
stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest"))
defer stub.Close()
app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL)
makeRequest := func() *http.Response {
req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil)
req.Host = "k8s.hub.local"
resp, err := app.Test(req)
if err != nil {
t.Fatalf("app.Test failed: %v", err)
}
return resp
}
resp1 := makeRequest()
resp1.Body.Close()
resp2 := makeRequest()
resp2.Body.Close()
if stub.pathHits(registryK8sOriginalManifest, http.MethodHead) != 0 {
t.Fatalf("expected no HEAD revalidation against original path, got %d", stub.pathHits(registryK8sOriginalManifest, http.MethodHead))
}
if stub.pathHits(registryK8sFallbackManifest, http.MethodHead) == 0 {
t.Fatalf("expected HEAD revalidation against fallback path")
}
}
type cacheFlowStub struct {
server *http.Server
listener net.Listener
@@ -300,6 +436,22 @@ type cacheFlowStub struct {
lastMod string
}
type registryK8sStub struct {
server *http.Server
listener net.Listener
URL string
mu sync.Mutex
responses map[string]registryK8sResponse
hits map[string]int
}
type registryK8sResponse struct {
status int
body []byte
etag string
}
func newCacheFlowStub(t *testing.T, paths ...string) *cacheFlowStub {
t.Helper()
stub := &cacheFlowStub{
@@ -335,6 +487,32 @@ func newCacheFlowStub(t *testing.T, paths ...string) *cacheFlowStub {
return stub
}
func newRegistryK8sStub(t *testing.T) *registryK8sStub {
t.Helper()
stub := &registryK8sStub{
responses: make(map[string]registryK8sResponse),
hits: make(map[string]int),
}
mux := http.NewServeMux()
mux.HandleFunc("/", stub.handle)
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Skipf("unable to start registry.k8s.io stub listener: %v", err)
}
server := &http.Server{Handler: mux}
stub.server = server
stub.listener = listener
stub.URL = "http://" + listener.Addr().String()
go func() {
_ = server.Serve(listener)
}()
return stub
}
func (s *cacheFlowStub) Close() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
@@ -346,6 +524,109 @@ func (s *cacheFlowStub) Close() {
}
}
func (s *registryK8sStub) Close() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if s.server != nil {
_ = s.server.Shutdown(ctx)
}
if s.listener != nil {
_ = s.listener.Close()
}
}
func (s *registryK8sStub) setResponse(path string, status int, body []byte) {
s.mu.Lock()
defer s.mu.Unlock()
s.responses[path] = registryK8sResponse{
status: status,
body: append([]byte(nil), body...),
etag: fmt.Sprintf(`"%s-etag"`, strings.ReplaceAll(strings.Trim(path, "/"), "/", "-")),
}
}
func (s *registryK8sStub) pathHits(path string, method string) int {
s.mu.Lock()
defer s.mu.Unlock()
return s.hits[method+" "+path]
}
func (s *registryK8sStub) handle(w http.ResponseWriter, r *http.Request) {
s.mu.Lock()
s.hits[r.Method+" "+r.URL.Path]++
resp, ok := s.responses[r.URL.Path]
s.mu.Unlock()
if !ok {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(`{"errors":[{"code":"MANIFEST_UNKNOWN","message":"manifest unknown"}]}`))
return
}
w.Header().Set("Content-Type", "application/vnd.docker.distribution.manifest.v2+json")
w.Header().Set("Etag", resp.etag)
w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat))
if r.Method == http.MethodHead {
for _, candidate := range r.Header.Values("If-None-Match") {
if strings.Trim(candidate, `"`) == strings.Trim(resp.etag, `"`) {
w.WriteHeader(http.StatusNotModified)
return
}
}
w.WriteHeader(resp.status)
return
}
w.WriteHeader(resp.status)
_, _ = w.Write(resp.body)
}
func newDockerProxyTestApp(t *testing.T, storageDir string, domain string, upstream string, proxyURLs ...string) *fiber.App {
t.Helper()
proxyURL := ""
if len(proxyURLs) > 0 {
proxyURL = proxyURLs[0]
}
cfg := &config.Config{
Global: config.GlobalConfig{
ListenPort: 5000,
CacheTTL: config.Duration(30 * time.Second),
StoragePath: storageDir,
},
Hubs: []config.HubConfig{
{
Name: "docker",
Domain: domain,
Type: "docker",
Upstream: upstream,
Proxy: proxyURL,
},
},
}
registry, err := server.NewHubRegistry(cfg)
if err != nil {
t.Fatalf("registry error: %v", err)
}
logger := logrus.New()
logger.SetOutput(io.Discard)
store, err := cache.NewStore(storageDir)
if err != nil {
t.Fatalf("store error: %v", err)
}
client := server.NewUpstreamClient(cfg)
handler := proxy.NewHandler(client, logger, store)
app, err := server.NewApp(server.AppOptions{
Logger: logger,
Registry: registry,
Proxy: handler,
ListenPort: 5000,
})
if err != nil {
t.Fatalf("app error: %v", err)
}
return app
}
func (s *cacheFlowStub) handle(w http.ResponseWriter, r *http.Request) {
s.mu.Lock()
etag := s.etag