From dc0476eead13f8e6cd30bcb8fc18cd696714610c Mon Sep 17 00:00:00 2001 From: Nik Afiq Date: Tue, 7 Apr 2026 22:32:29 +0900 Subject: [PATCH] feat: implement structured logging and enhance error handling in HA client and gRPC server --- ha-gateway/cmd/gateway/main.go | 98 +++++++------------ .../adapters/primary/grpc/interceptor.go | 81 +++++++++++++++ .../internal/adapters/secondary/ha/client.go | 36 ++++++- ha-gateway/internal/config/config.go | 13 ++- ha-gateway/internal/logger/logger.go | 35 +++++++ 5 files changed, 196 insertions(+), 67 deletions(-) create mode 100644 ha-gateway/internal/adapters/primary/grpc/interceptor.go create mode 100644 ha-gateway/internal/logger/logger.go diff --git a/ha-gateway/cmd/gateway/main.go b/ha-gateway/cmd/gateway/main.go index a9d9ecb..fe622ca 100644 --- a/ha-gateway/cmd/gateway/main.go +++ b/ha-gateway/cmd/gateway/main.go @@ -2,19 +2,16 @@ package main import ( "context" - "log/slog" "net" "os" "os/signal" "syscall" - "time" "github.com/joho/godotenv" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/health" grpc_health_v1 "google.golang.org/grpc/health/grpc_health_v1" - "google.golang.org/grpc/peer" "google.golang.org/grpc/reflection" hav1 "gitea.nik4nao.com/nik/home-services/gen/ha/v1" @@ -22,6 +19,7 @@ import ( "gitea.nik4nao.com/nik/home-services/ha-gateway/internal/adapters/secondary/ha" "gitea.nik4nao.com/nik/home-services/ha-gateway/internal/app" "gitea.nik4nao.com/nik/home-services/ha-gateway/internal/config" + "gitea.nik4nao.com/nik/home-services/ha-gateway/internal/logger" "gitea.nik4nao.com/nik/home-services/ha-gateway/internal/telemetry" ) @@ -38,53 +36,56 @@ import ( var version = "dev" func main() { - // 1. Load .env if present (ignored in K8s where env vars come from Secrets/ConfigMaps). _ = godotenv.Load() - // 2. Load and validate config. cfg, err := config.Load() if err != nil { - slog.Error("config error", "err", err) + os.Stderr.WriteString("config error: " + err.Error() + "\n") os.Exit(1) } + log := logger.New(cfg.LogFormat, cfg.LogLevel) + log.Info("starting ha-gateway", + "version", version, + "grpc_port", cfg.GRPCPort, + "ha_base_url", cfg.HABaseURL, + "ha_token", redactToken(cfg.HAToken), + "otel_endpoint", cfg.OTELEndpoint, + "log_level", cfg.LogLevel, + "log_format", cfg.LogFormat, + ) + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) defer stop() + ctx = logger.WithLogger(ctx, log) - // 3. Set up telemetry. shutdown, err := telemetry.Setup(ctx, cfg, version) if err != nil { - slog.Error("telemetry setup failed", "err", err) + log.Error("telemetry setup failed", "err", err) os.Exit(1) } - // 4. Build the secondary adapter. - haClient := ha.NewClient(cfg) + haClient := ha.NewClient(cfg, log) - // 5. Build application services. entityApp := app.NewEntityApp(haClient) lightApp := app.NewLightApp(haClient) switchApp := app.NewSwitchApp(haClient) - // 5a. Prime the discovery cache. Non-fatal — ListLights/ListSwitches will - // retry lazily on the first call if this fails (e.g. HA not yet ready). if err := lightApp.Refresh(ctx); err != nil { - slog.Warn("initial light discovery failed, will retry on first request", "err", err) + log.Warn("initial light discovery failed, will retry on first request", "err", err) } if err := switchApp.Refresh(ctx); err != nil { - slog.Warn("initial switch discovery failed, will retry on first request", "err", err) + log.Warn("initial switch discovery failed, will retry on first request", "err", err) } - // 6. Build the gRPC server with OTel stats handler and logging interceptors. srv := grpc.NewServer( grpc.StatsHandler(otelgrpc.NewServerHandler()), - grpc.ChainUnaryInterceptor(loggingUnaryInterceptor), - grpc.ChainStreamInterceptor(loggingStreamInterceptor), + grpc.ChainUnaryInterceptor(grpcadapter.LoggingUnaryInterceptor(log)), + grpc.ChainStreamInterceptor(grpcadapter.LoggingStreamInterceptor(log)), ) healthSrv := health.NewServer() healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) - // 7. Register services. hav1.RegisterEntityServiceServer(srv, grpcadapter.NewEntityGRPC(entityApp)) hav1.RegisterLightServiceServer(srv, grpcadapter.NewLightGRPC(lightApp)) hav1.RegisterSwitchServiceServer(srv, grpcadapter.NewSwitchGRPC(switchApp)) @@ -92,70 +93,37 @@ func main() { grpc_health_v1.RegisterHealthServer(srv, healthSrv) reflection.Register(srv) - // 8. Start listener. lis, err := net.Listen("tcp", ":"+cfg.GRPCPort) if err != nil { - slog.Error("listen failed", "err", err) + log.Error("listen failed", "err", err) os.Exit(1) } - // 9. Serve in background. go func() { - slog.Info("listening", "port", cfg.GRPCPort, "version", version) + log.Info("ha-gateway listening", "addr", lis.Addr().String()) if err := srv.Serve(lis); err != nil { - slog.Error("serve failed", "err", err) + log.Error("serve failed", "err", err) } }() - // 10. Block until signal. <-ctx.Done() - slog.Info("shutting down") + log.Info("shutdown signal received, draining") - // 11. Graceful stop, then flush telemetry. healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) srv.GracefulStop() + log.Info("shutdown complete") - shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - if err := shutdown(shutdownCtx); err != nil { - slog.Error("telemetry shutdown error", "err", err) + if err := shutdown(context.Background()); err != nil { + log.Error("telemetry shutdown error", "err", err) } } -func loggingUnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { - start := time.Now() - resp, err := handler(ctx, req) - attrs := []any{ - "method", info.FullMethod, - "duration", time.Since(start).String(), +func redactToken(token string) string { + if token == "" { + return "[not set]" } - if p, ok := peer.FromContext(ctx); ok { - attrs = append(attrs, "peer", p.Addr.String()) + if len(token) <= 8 { + return token + "..." } - if err != nil { - attrs = append(attrs, "err", err) - slog.ErrorContext(ctx, "rpc", attrs...) - } else { - slog.InfoContext(ctx, "rpc", attrs...) - } - return resp, err -} - -func loggingStreamInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - start := time.Now() - err := handler(srv, ss) - attrs := []any{ - "method", info.FullMethod, - "duration", time.Since(start).String(), - } - if p, ok := peer.FromContext(ss.Context()); ok { - attrs = append(attrs, "peer", p.Addr.String()) - } - if err != nil { - attrs = append(attrs, "err", err) - slog.ErrorContext(ss.Context(), "rpc stream", attrs...) - } else { - slog.InfoContext(ss.Context(), "rpc stream", attrs...) - } - return err + return token[:8] + "..." } diff --git a/ha-gateway/internal/adapters/primary/grpc/interceptor.go b/ha-gateway/internal/adapters/primary/grpc/interceptor.go new file mode 100644 index 0000000..d6e2bb1 --- /dev/null +++ b/ha-gateway/internal/adapters/primary/grpc/interceptor.go @@ -0,0 +1,81 @@ +package grpc + +import ( + "context" + "log/slog" + "time" + + "gitea.nik4nao.com/nik/home-services/ha-gateway/internal/logger" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" +) + +func LoggingUnaryInterceptor(log *slog.Logger) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + method, ok := grpc.Method(ctx) + if !ok { + method = info.FullMethod + } + reqLog := requestLogger(ctx, log, method) + ctx = logger.WithLogger(ctx, reqLog) + + start := time.Now() + resp, err := handler(ctx, req) + logCompletion(reqLog, "grpc call completed", status.Code(err), time.Since(start), err) + return resp, err + } +} + +func LoggingStreamInterceptor(log *slog.Logger) grpc.StreamServerInterceptor { + return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + reqLog := requestLogger(ss.Context(), log, info.FullMethod) + reqLog.Info("grpc stream started") + + wrapped := &loggingServerStream{ + ServerStream: ss, + ctx: logger.WithLogger(ss.Context(), reqLog), + } + start := time.Now() + err := handler(srv, wrapped) + logCompletion(reqLog, "grpc stream completed", status.Code(err), time.Since(start), err) + return err + } +} + +type loggingServerStream struct { + grpc.ServerStream + ctx context.Context +} + +func (s *loggingServerStream) Context() context.Context { + return s.ctx +} + +func requestLogger(ctx context.Context, log *slog.Logger, method string) *slog.Logger { + peerAddr := "" + if p, ok := peer.FromContext(ctx); ok && p.Addr != nil { + peerAddr = p.Addr.String() + } + return log.With("grpc.method", method, "grpc.peer", peerAddr) +} + +func logCompletion(log *slog.Logger, msg string, code codes.Code, duration time.Duration, err error) { + attrs := []any{ + "duration_ms", duration.Milliseconds(), + "grpc.code", code.String(), + } + if err != nil { + attrs = append(attrs, "error", err.Error()) + } + + switch code { + case codes.OK: + log.Info(msg, attrs...) + case codes.NotFound, codes.InvalidArgument, codes.Unimplemented: + log.Warn(msg, attrs...) + default: + log.Error(msg, attrs...) + } +} diff --git a/ha-gateway/internal/adapters/secondary/ha/client.go b/ha-gateway/internal/adapters/secondary/ha/client.go index 3b7da87..358cf63 100644 --- a/ha-gateway/internal/adapters/secondary/ha/client.go +++ b/ha-gateway/internal/adapters/secondary/ha/client.go @@ -5,12 +5,14 @@ import ( "encoding/json" "fmt" "io" + "log/slog" "net/http" "strings" "time" "gitea.nik4nao.com/nik/home-services/ha-gateway/internal/config" "gitea.nik4nao.com/nik/home-services/ha-gateway/internal/core/ports/driven" + "gitea.nik4nao.com/nik/home-services/ha-gateway/internal/logger" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" @@ -22,13 +24,15 @@ type Client struct { baseURL string token string httpClient *http.Client + log *slog.Logger } -func NewClient(cfg *config.Config) *Client { +func NewClient(cfg *config.Config, log *slog.Logger) *Client { return &Client{ baseURL: strings.TrimRight(cfg.HABaseURL, "/"), token: cfg.HAToken, httpClient: &http.Client{Timeout: 10 * time.Second}, + log: log, } } @@ -90,8 +94,14 @@ func (c *Client) CallService(ctx context.Context, domain, service string, payloa req.Header.Set("Authorization", "Bearer "+c.token) req.Header.Set("Content-Type", "application/json") + log := logger.FromContext(ctx).With("ha.method", req.Method, "ha.path", req.URL.Path) + start := time.Now() resp, err := c.httpClient.Do(req) if err != nil { + log.Error("ha request failed", + "duration_ms", time.Since(start).Milliseconds(), + "error", err.Error(), + ) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return nil, fmt.Errorf("call service %s/%s: %w", domain, service, err) @@ -104,11 +114,20 @@ func (c *Client) CallService(ctx context.Context, domain, service string, payloa if len(preview) > 200 { preview = preview[:200] } + log.Error("ha request failed", + "http.status", resp.StatusCode, + "duration_ms", time.Since(start).Milliseconds(), + "error", preview, + ) err := fmt.Errorf("HA returned %d: %s", resp.StatusCode, preview) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return nil, err } + log.Debug("ha request completed", + "http.status", resp.StatusCode, + "duration_ms", time.Since(start).Milliseconds(), + ) var raw []haStateRaw if err := json.Unmarshal(respBody, &raw); err != nil { @@ -134,8 +153,14 @@ func (c *Client) get(ctx context.Context, path string, dst any) error { } req.Header.Set("Authorization", "Bearer "+c.token) + log := logger.FromContext(ctx).With("ha.method", req.Method, "ha.path", req.URL.Path) + start := time.Now() resp, err := c.httpClient.Do(req) if err != nil { + log.Error("ha request failed", + "duration_ms", time.Since(start).Milliseconds(), + "error", err.Error(), + ) return fmt.Errorf("GET %s: %w", path, err) } defer resp.Body.Close() @@ -146,8 +171,17 @@ func (c *Client) get(ctx context.Context, path string, dst any) error { if len(preview) > 200 { preview = preview[:200] } + log.Error("ha request failed", + "http.status", resp.StatusCode, + "duration_ms", time.Since(start).Milliseconds(), + "error", preview, + ) return fmt.Errorf("HA returned %d for GET %s: %s", resp.StatusCode, path, preview) } + log.Debug("ha request completed", + "http.status", resp.StatusCode, + "duration_ms", time.Since(start).Milliseconds(), + ) if err := json.Unmarshal(body, dst); err != nil { return fmt.Errorf("decode response for GET %s: %w", path, err) diff --git a/ha-gateway/internal/config/config.go b/ha-gateway/internal/config/config.go index c90b615..ec1ac7e 100644 --- a/ha-gateway/internal/config/config.go +++ b/ha-gateway/internal/config/config.go @@ -10,7 +10,9 @@ type Config struct { HABaseURL string // HA_BASE_URL, e.g. "http://ha.home.arpa:8123" HAToken string // HA_TOKEN — long-lived access token (required) OTELEndpoint string // OTEL_ENDPOINT, e.g. "otel-collector.monitoring.svc:4317" - // empty = telemetry disabled (local dev default) + LogLevel string // LOG_LEVEL, default "info" + LogFormat string // LOG_FORMAT, default "json" + // empty = telemetry disabled (local dev default) } func Load() (*Config, error) { @@ -29,5 +31,14 @@ func Load() (*Config, error) { HABaseURL: os.Getenv("HA_BASE_URL"), HAToken: token, OTELEndpoint: os.Getenv("OTEL_ENDPOINT"), + LogLevel: getenvDefault("LOG_LEVEL", "info"), + LogFormat: getenvDefault("LOG_FORMAT", "json"), }, nil } + +func getenvDefault(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +} diff --git a/ha-gateway/internal/logger/logger.go b/ha-gateway/internal/logger/logger.go new file mode 100644 index 0000000..2232a97 --- /dev/null +++ b/ha-gateway/internal/logger/logger.go @@ -0,0 +1,35 @@ +package logger + +import ( + "context" + "fmt" + "log/slog" + "os" +) + +type contextKey struct{} + +func New(format, level string) *slog.Logger { + var parsed slog.Level + if err := parsed.UnmarshalText([]byte(level)); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "invalid log level %q, falling back to info\n", level) + parsed = slog.LevelInfo + } + + opts := &slog.HandlerOptions{Level: parsed} + if format == "json" { + return slog.New(slog.NewJSONHandler(os.Stdout, opts)) + } + return slog.New(slog.NewTextHandler(os.Stdout, opts)) +} + +func WithLogger(ctx context.Context, l *slog.Logger) context.Context { + return context.WithValue(ctx, contextKey{}, l) +} + +func FromContext(ctx context.Context) *slog.Logger { + if l, ok := ctx.Value(contextKey{}).(*slog.Logger); ok && l != nil { + return l + } + return slog.Default() +}