stage 1
This commit is contained in:
@@ -47,6 +47,7 @@ func (h *Handler) Handle(c fiber.Ctx, route *server.HubRoute) error {
|
||||
requestID := server.RequestID(c)
|
||||
locator := buildLocator(route, c)
|
||||
policy := determineCachePolicy(route, locator, c.Method())
|
||||
strategyWriter := cache.NewStrategyWriter(h.store, route.CacheStrategy)
|
||||
|
||||
if err := ensureProxyHubType(route); err != nil {
|
||||
h.logger.WithFields(logrus.Fields{
|
||||
@@ -62,7 +63,7 @@ func (h *Handler) Handle(c fiber.Ctx, route *server.HubRoute) error {
|
||||
}
|
||||
|
||||
var cached *cache.ReadResult
|
||||
if h.store != nil && policy.allowCache {
|
||||
if strategyWriter.Enabled() && policy.allowCache {
|
||||
result, err := h.store.Get(ctx, locator)
|
||||
switch {
|
||||
case err == nil:
|
||||
@@ -79,13 +80,19 @@ func (h *Handler) Handle(c fiber.Ctx, route *server.HubRoute) error {
|
||||
if cached != nil {
|
||||
serve := true
|
||||
if policy.requireRevalidate {
|
||||
fresh, err := h.isCacheFresh(c, route, locator, cached.Entry)
|
||||
if err != nil {
|
||||
h.logger.WithError(err).
|
||||
WithFields(logrus.Fields{"hub": route.Config.Name, "module_key": route.ModuleKey}).
|
||||
Warn("cache_revalidate_failed")
|
||||
serve = false
|
||||
} else if !fresh {
|
||||
if strategyWriter.ShouldBypassValidation(cached.Entry) {
|
||||
serve = true
|
||||
} else if strategyWriter.SupportsValidation() {
|
||||
fresh, err := h.isCacheFresh(c, route, locator, cached.Entry)
|
||||
if err != nil {
|
||||
h.logger.WithError(err).
|
||||
WithFields(logrus.Fields{"hub": route.Config.Name, "module_key": route.ModuleKey}).
|
||||
Warn("cache_revalidate_failed")
|
||||
serve = false
|
||||
} else if !fresh {
|
||||
serve = false
|
||||
}
|
||||
} else {
|
||||
serve = false
|
||||
}
|
||||
}
|
||||
@@ -96,7 +103,7 @@ func (h *Handler) Handle(c fiber.Ctx, route *server.HubRoute) error {
|
||||
cached.Reader.Close()
|
||||
}
|
||||
|
||||
return h.fetchAndStream(c, route, locator, policy, requestID, started, ctx)
|
||||
return h.fetchAndStream(c, route, locator, policy, strategyWriter, requestID, started, ctx)
|
||||
}
|
||||
|
||||
func (h *Handler) serveCache(c fiber.Ctx, route *server.HubRoute, result *cache.ReadResult, requestID string, started time.Time) error {
|
||||
@@ -144,7 +151,7 @@ func (h *Handler) serveCache(c fiber.Ctx, route *server.HubRoute, result *cache.
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *Handler) fetchAndStream(c fiber.Ctx, route *server.HubRoute, locator cache.Locator, policy cachePolicy, requestID string, started time.Time, ctx context.Context) error {
|
||||
func (h *Handler) fetchAndStream(c fiber.Ctx, route *server.HubRoute, locator cache.Locator, policy cachePolicy, writer cache.StrategyWriter, requestID string, started time.Time, ctx context.Context) error {
|
||||
resp, upstreamURL, err := h.executeRequest(c, route)
|
||||
if err != nil {
|
||||
h.logResult(route, upstreamURL.String(), requestID, 0, false, started, err)
|
||||
@@ -158,17 +165,17 @@ func (h *Handler) fetchAndStream(c fiber.Ctx, route *server.HubRoute, locator ca
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
shouldStore := policy.allowStore && h.store != nil && isCacheableStatus(resp.StatusCode) && c.Method() == http.MethodGet
|
||||
return h.consumeUpstream(c, route, locator, resp, shouldStore, requestID, started, ctx)
|
||||
shouldStore := policy.allowStore && writer.Enabled() && isCacheableStatus(resp.StatusCode) && c.Method() == http.MethodGet
|
||||
return h.consumeUpstream(c, route, locator, resp, shouldStore, writer, requestID, started, ctx)
|
||||
}
|
||||
|
||||
func (h *Handler) consumeUpstream(c fiber.Ctx, route *server.HubRoute, locator cache.Locator, resp *http.Response, shouldStore bool, requestID string, started time.Time, ctx context.Context) error {
|
||||
func (h *Handler) consumeUpstream(c fiber.Ctx, route *server.HubRoute, locator cache.Locator, resp *http.Response, shouldStore bool, writer cache.StrategyWriter, requestID string, started time.Time, ctx context.Context) error {
|
||||
upstreamURL := resp.Request.URL.String()
|
||||
method := c.Method()
|
||||
authFailure := isAuthFailure(resp.StatusCode) && route.Config.HasCredentials()
|
||||
|
||||
if shouldStore {
|
||||
return h.cacheAndStream(c, route, locator, resp, requestID, started, ctx, upstreamURL)
|
||||
return h.cacheAndStream(c, route, locator, resp, writer, requestID, started, ctx, upstreamURL)
|
||||
}
|
||||
|
||||
copyResponseHeaders(c, resp.Header)
|
||||
@@ -196,7 +203,7 @@ func (h *Handler) consumeUpstream(c fiber.Ctx, route *server.HubRoute, locator c
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *Handler) cacheAndStream(c fiber.Ctx, route *server.HubRoute, locator cache.Locator, resp *http.Response, requestID string, started time.Time, ctx context.Context, upstreamURL string) error {
|
||||
func (h *Handler) cacheAndStream(c fiber.Ctx, route *server.HubRoute, locator cache.Locator, resp *http.Response, writer cache.StrategyWriter, requestID string, started time.Time, ctx context.Context, upstreamURL string) error {
|
||||
copyResponseHeaders(c, resp.Header)
|
||||
c.Set("X-Any-Hub-Upstream", upstreamURL)
|
||||
c.Set("X-Any-Hub-Cache-Hit", "false")
|
||||
@@ -208,7 +215,7 @@ func (h *Handler) cacheAndStream(c fiber.Ctx, route *server.HubRoute, locator ca
|
||||
reader := io.TeeReader(resp.Body, c.Response().BodyWriter())
|
||||
|
||||
opts := cache.PutOptions{ModTime: extractModTime(resp.Header)}
|
||||
entry, err := h.store.Put(ctx, locator, reader, opts)
|
||||
entry, err := writer.Put(ctx, locator, reader, opts)
|
||||
h.logResult(route, upstreamURL, requestID, resp.StatusCode, false, started, err)
|
||||
if err != nil {
|
||||
return fiber.NewError(fiber.StatusBadGateway, fmt.Sprintf("cache_write_failed: %v", err))
|
||||
@@ -323,7 +330,7 @@ func (h *Handler) writeError(c fiber.Ctx, status int, code string) error {
|
||||
}
|
||||
|
||||
func (h *Handler) logResult(route *server.HubRoute, upstream string, requestID string, status int, cacheHit bool, started time.Time, err error) {
|
||||
fields := logging.RequestFields(route.Config.Name, route.Config.Domain, route.Config.Type, route.Config.AuthMode(), route.ModuleKey, cacheHit)
|
||||
fields := logging.RequestFields(route.Config.Name, route.Config.Domain, route.Config.Type, route.Config.AuthMode(), route.ModuleKey, string(route.RolloutFlag), cacheHit)
|
||||
fields["action"] = "proxy"
|
||||
fields["upstream"] = upstream
|
||||
fields["upstream_status"] = status
|
||||
@@ -399,10 +406,14 @@ func buildLocator(route *server.HubRoute, c fiber.Ctx) cache.Locator {
|
||||
sum := sha1.Sum(query)
|
||||
clean = fmt.Sprintf("%s/__qs/%s", clean, hex.EncodeToString(sum[:]))
|
||||
}
|
||||
return cache.Locator{
|
||||
loc := cache.Locator{
|
||||
HubName: route.Config.Name,
|
||||
Path: clean,
|
||||
}
|
||||
if route.Module.LocatorRewrite != nil {
|
||||
loc = route.Module.LocatorRewrite(loc)
|
||||
}
|
||||
return loc
|
||||
}
|
||||
|
||||
func stripQueryMarker(p string) string {
|
||||
@@ -807,7 +818,7 @@ func isAuthFailure(status int) bool {
|
||||
}
|
||||
|
||||
func (h *Handler) logAuthRetry(route *server.HubRoute, upstream string, requestID string, status int) {
|
||||
fields := logging.RequestFields(route.Config.Name, route.Config.Domain, route.Config.Type, route.Config.AuthMode(), route.ModuleKey, false)
|
||||
fields := logging.RequestFields(route.Config.Name, route.Config.Domain, route.Config.Type, route.Config.AuthMode(), route.ModuleKey, string(route.RolloutFlag), false)
|
||||
fields["action"] = "proxy_retry"
|
||||
fields["upstream"] = upstream
|
||||
fields["upstream_status"] = status
|
||||
@@ -819,7 +830,7 @@ func (h *Handler) logAuthRetry(route *server.HubRoute, upstream string, requestI
|
||||
}
|
||||
|
||||
func (h *Handler) logAuthFailure(route *server.HubRoute, upstream string, requestID string, status int) {
|
||||
fields := logging.RequestFields(route.Config.Name, route.Config.Domain, route.Config.Type, route.Config.AuthMode(), route.ModuleKey, false)
|
||||
fields := logging.RequestFields(route.Config.Name, route.Config.Domain, route.Config.Type, route.Config.AuthMode(), route.ModuleKey, string(route.RolloutFlag), false)
|
||||
fields["action"] = "proxy"
|
||||
fields["upstream"] = upstream
|
||||
fields["upstream_status"] = status
|
||||
|
||||
Reference in New Issue
Block a user