feat: 004/phase 1
This commit is contained in:
68
internal/proxy/forwarder.go
Normal file
68
internal/proxy/forwarder.go
Normal file
@@ -0,0 +1,68 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/gofiber/fiber/v3"
|
||||
|
||||
"github.com/any-hub/any-hub/internal/server"
|
||||
)
|
||||
|
||||
// Forwarder 根据 HubRoute 的 module_key 选择对应的 ProxyHandler,默认回退到构造时注入的 handler。
|
||||
type Forwarder struct {
|
||||
defaultHandler server.ProxyHandler
|
||||
}
|
||||
|
||||
// NewForwarder 创建 Forwarder,defaultHandler 不能为空。
|
||||
func NewForwarder(defaultHandler server.ProxyHandler) *Forwarder {
|
||||
return &Forwarder{defaultHandler: defaultHandler}
|
||||
}
|
||||
|
||||
var (
|
||||
moduleHandlers sync.Map
|
||||
)
|
||||
|
||||
// RegisterModuleHandler 将特定 module_key 映射到 ProxyHandler,重复注册会覆盖旧值。
|
||||
func RegisterModuleHandler(key string, handler server.ProxyHandler) {
|
||||
normalized := normalizeModuleKey(key)
|
||||
if normalized == "" || handler == nil {
|
||||
return
|
||||
}
|
||||
moduleHandlers.Store(normalized, handler)
|
||||
}
|
||||
|
||||
// Handle 实现 server.ProxyHandler,根据 route.ModuleKey 选择 handler。
|
||||
func (f *Forwarder) Handle(c fiber.Ctx, route *server.HubRoute) error {
|
||||
handler := f.lookup(route)
|
||||
if handler == nil {
|
||||
return fiber.NewError(fiber.StatusInternalServerError, "proxy handler unavailable")
|
||||
}
|
||||
return handler.Handle(c, route)
|
||||
}
|
||||
|
||||
func (f *Forwarder) lookup(route *server.HubRoute) server.ProxyHandler {
|
||||
if route != nil {
|
||||
if handler := lookupModuleHandler(route.ModuleKey); handler != nil {
|
||||
return handler
|
||||
}
|
||||
}
|
||||
return f.defaultHandler
|
||||
}
|
||||
|
||||
func lookupModuleHandler(key string) server.ProxyHandler {
|
||||
normalized := normalizeModuleKey(key)
|
||||
if normalized == "" {
|
||||
return nil
|
||||
}
|
||||
if value, ok := moduleHandlers.Load(normalized); ok {
|
||||
if handler, ok := value.(server.ProxyHandler); ok {
|
||||
return handler
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func normalizeModuleKey(key string) string {
|
||||
return strings.ToLower(strings.TrimSpace(key))
|
||||
}
|
||||
@@ -49,7 +49,10 @@ func (h *Handler) Handle(c fiber.Ctx, route *server.HubRoute) error {
|
||||
policy := determineCachePolicy(route, locator, c.Method())
|
||||
|
||||
if err := ensureProxyHubType(route); err != nil {
|
||||
h.logger.WithField("hub", route.Config.Name).WithError(err).Error("hub_type_unsupported")
|
||||
h.logger.WithFields(logrus.Fields{
|
||||
"hub": route.Config.Name,
|
||||
"module_key": route.ModuleKey,
|
||||
}).WithError(err).Error("hub_type_unsupported")
|
||||
return h.writeError(c, fiber.StatusNotImplemented, "hub_type_unsupported")
|
||||
}
|
||||
|
||||
@@ -67,7 +70,9 @@ func (h *Handler) Handle(c fiber.Ctx, route *server.HubRoute) error {
|
||||
case errors.Is(err, cache.ErrNotFound):
|
||||
// miss, continue
|
||||
default:
|
||||
h.logger.WithError(err).WithField("hub", route.Config.Name).Warn("cache_get_failed")
|
||||
h.logger.WithError(err).
|
||||
WithFields(logrus.Fields{"hub": route.Config.Name, "module_key": route.ModuleKey}).
|
||||
Warn("cache_get_failed")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -76,7 +81,9 @@ func (h *Handler) Handle(c fiber.Ctx, route *server.HubRoute) error {
|
||||
if policy.requireRevalidate {
|
||||
fresh, err := h.isCacheFresh(c, route, locator, cached.Entry)
|
||||
if err != nil {
|
||||
h.logger.WithError(err).WithField("hub", route.Config.Name).Warn("cache_revalidate_failed")
|
||||
h.logger.WithError(err).
|
||||
WithFields(logrus.Fields{"hub": route.Config.Name, "module_key": route.ModuleKey}).
|
||||
Warn("cache_revalidate_failed")
|
||||
serve = false
|
||||
} else if !fresh {
|
||||
serve = false
|
||||
@@ -316,7 +323,7 @@ func (h *Handler) writeError(c fiber.Ctx, status int, code string) error {
|
||||
}
|
||||
|
||||
func (h *Handler) logResult(route *server.HubRoute, upstream string, requestID string, status int, cacheHit bool, started time.Time, err error) {
|
||||
fields := logging.RequestFields(route.Config.Name, route.Config.Domain, route.Config.Type, route.Config.AuthMode(), cacheHit)
|
||||
fields := logging.RequestFields(route.Config.Name, route.Config.Domain, route.Config.Type, route.Config.AuthMode(), route.ModuleKey, cacheHit)
|
||||
fields["action"] = "proxy"
|
||||
fields["upstream"] = upstream
|
||||
fields["upstream_status"] = status
|
||||
@@ -800,7 +807,7 @@ func isAuthFailure(status int) bool {
|
||||
}
|
||||
|
||||
func (h *Handler) logAuthRetry(route *server.HubRoute, upstream string, requestID string, status int) {
|
||||
fields := logging.RequestFields(route.Config.Name, route.Config.Domain, route.Config.Type, route.Config.AuthMode(), false)
|
||||
fields := logging.RequestFields(route.Config.Name, route.Config.Domain, route.Config.Type, route.Config.AuthMode(), route.ModuleKey, false)
|
||||
fields["action"] = "proxy_retry"
|
||||
fields["upstream"] = upstream
|
||||
fields["upstream_status"] = status
|
||||
@@ -812,7 +819,7 @@ func (h *Handler) logAuthRetry(route *server.HubRoute, upstream string, requestI
|
||||
}
|
||||
|
||||
func (h *Handler) logAuthFailure(route *server.HubRoute, upstream string, requestID string, status int) {
|
||||
fields := logging.RequestFields(route.Config.Name, route.Config.Domain, route.Config.Type, route.Config.AuthMode(), false)
|
||||
fields := logging.RequestFields(route.Config.Name, route.Config.Domain, route.Config.Type, route.Config.AuthMode(), route.ModuleKey, false)
|
||||
fields["action"] = "proxy"
|
||||
fields["upstream"] = upstream
|
||||
fields["upstream_status"] = status
|
||||
|
||||
Reference in New Issue
Block a user