feat: implement structured logging and enhance error handling in HA client and gRPC server
This commit is contained in:
parent
d6236d70b9
commit
dc0476eead
@ -2,19 +2,16 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"net"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/joho/godotenv"
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/health"
|
||||
grpc_health_v1 "google.golang.org/grpc/health/grpc_health_v1"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/reflection"
|
||||
|
||||
hav1 "gitea.nik4nao.com/nik/home-services/gen/ha/v1"
|
||||
@ -22,6 +19,7 @@ import (
|
||||
"gitea.nik4nao.com/nik/home-services/ha-gateway/internal/adapters/secondary/ha"
|
||||
"gitea.nik4nao.com/nik/home-services/ha-gateway/internal/app"
|
||||
"gitea.nik4nao.com/nik/home-services/ha-gateway/internal/config"
|
||||
"gitea.nik4nao.com/nik/home-services/ha-gateway/internal/logger"
|
||||
"gitea.nik4nao.com/nik/home-services/ha-gateway/internal/telemetry"
|
||||
)
|
||||
|
||||
@ -38,53 +36,56 @@ import (
|
||||
var version = "dev"
|
||||
|
||||
func main() {
|
||||
// 1. Load .env if present (ignored in K8s where env vars come from Secrets/ConfigMaps).
|
||||
_ = godotenv.Load()
|
||||
|
||||
// 2. Load and validate config.
|
||||
cfg, err := config.Load()
|
||||
if err != nil {
|
||||
slog.Error("config error", "err", err)
|
||||
os.Stderr.WriteString("config error: " + err.Error() + "\n")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
log := logger.New(cfg.LogFormat, cfg.LogLevel)
|
||||
log.Info("starting ha-gateway",
|
||||
"version", version,
|
||||
"grpc_port", cfg.GRPCPort,
|
||||
"ha_base_url", cfg.HABaseURL,
|
||||
"ha_token", redactToken(cfg.HAToken),
|
||||
"otel_endpoint", cfg.OTELEndpoint,
|
||||
"log_level", cfg.LogLevel,
|
||||
"log_format", cfg.LogFormat,
|
||||
)
|
||||
|
||||
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
|
||||
defer stop()
|
||||
ctx = logger.WithLogger(ctx, log)
|
||||
|
||||
// 3. Set up telemetry.
|
||||
shutdown, err := telemetry.Setup(ctx, cfg, version)
|
||||
if err != nil {
|
||||
slog.Error("telemetry setup failed", "err", err)
|
||||
log.Error("telemetry setup failed", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// 4. Build the secondary adapter.
|
||||
haClient := ha.NewClient(cfg)
|
||||
haClient := ha.NewClient(cfg, log)
|
||||
|
||||
// 5. Build application services.
|
||||
entityApp := app.NewEntityApp(haClient)
|
||||
lightApp := app.NewLightApp(haClient)
|
||||
switchApp := app.NewSwitchApp(haClient)
|
||||
|
||||
// 5a. Prime the discovery cache. Non-fatal — ListLights/ListSwitches will
|
||||
// retry lazily on the first call if this fails (e.g. HA not yet ready).
|
||||
if err := lightApp.Refresh(ctx); err != nil {
|
||||
slog.Warn("initial light discovery failed, will retry on first request", "err", err)
|
||||
log.Warn("initial light discovery failed, will retry on first request", "err", err)
|
||||
}
|
||||
if err := switchApp.Refresh(ctx); err != nil {
|
||||
slog.Warn("initial switch discovery failed, will retry on first request", "err", err)
|
||||
log.Warn("initial switch discovery failed, will retry on first request", "err", err)
|
||||
}
|
||||
|
||||
// 6. Build the gRPC server with OTel stats handler and logging interceptors.
|
||||
srv := grpc.NewServer(
|
||||
grpc.StatsHandler(otelgrpc.NewServerHandler()),
|
||||
grpc.ChainUnaryInterceptor(loggingUnaryInterceptor),
|
||||
grpc.ChainStreamInterceptor(loggingStreamInterceptor),
|
||||
grpc.ChainUnaryInterceptor(grpcadapter.LoggingUnaryInterceptor(log)),
|
||||
grpc.ChainStreamInterceptor(grpcadapter.LoggingStreamInterceptor(log)),
|
||||
)
|
||||
healthSrv := health.NewServer()
|
||||
healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
|
||||
|
||||
// 7. Register services.
|
||||
hav1.RegisterEntityServiceServer(srv, grpcadapter.NewEntityGRPC(entityApp))
|
||||
hav1.RegisterLightServiceServer(srv, grpcadapter.NewLightGRPC(lightApp))
|
||||
hav1.RegisterSwitchServiceServer(srv, grpcadapter.NewSwitchGRPC(switchApp))
|
||||
@ -92,70 +93,37 @@ func main() {
|
||||
grpc_health_v1.RegisterHealthServer(srv, healthSrv)
|
||||
reflection.Register(srv)
|
||||
|
||||
// 8. Start listener.
|
||||
lis, err := net.Listen("tcp", ":"+cfg.GRPCPort)
|
||||
if err != nil {
|
||||
slog.Error("listen failed", "err", err)
|
||||
log.Error("listen failed", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// 9. Serve in background.
|
||||
go func() {
|
||||
slog.Info("listening", "port", cfg.GRPCPort, "version", version)
|
||||
log.Info("ha-gateway listening", "addr", lis.Addr().String())
|
||||
if err := srv.Serve(lis); err != nil {
|
||||
slog.Error("serve failed", "err", err)
|
||||
log.Error("serve failed", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// 10. Block until signal.
|
||||
<-ctx.Done()
|
||||
slog.Info("shutting down")
|
||||
log.Info("shutdown signal received, draining")
|
||||
|
||||
// 11. Graceful stop, then flush telemetry.
|
||||
healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
|
||||
srv.GracefulStop()
|
||||
log.Info("shutdown complete")
|
||||
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
if err := shutdown(shutdownCtx); err != nil {
|
||||
slog.Error("telemetry shutdown error", "err", err)
|
||||
if err := shutdown(context.Background()); err != nil {
|
||||
log.Error("telemetry shutdown error", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
func loggingUnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
|
||||
start := time.Now()
|
||||
resp, err := handler(ctx, req)
|
||||
attrs := []any{
|
||||
"method", info.FullMethod,
|
||||
"duration", time.Since(start).String(),
|
||||
func redactToken(token string) string {
|
||||
if token == "" {
|
||||
return "[not set]"
|
||||
}
|
||||
if p, ok := peer.FromContext(ctx); ok {
|
||||
attrs = append(attrs, "peer", p.Addr.String())
|
||||
if len(token) <= 8 {
|
||||
return token + "..."
|
||||
}
|
||||
if err != nil {
|
||||
attrs = append(attrs, "err", err)
|
||||
slog.ErrorContext(ctx, "rpc", attrs...)
|
||||
} else {
|
||||
slog.InfoContext(ctx, "rpc", attrs...)
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func loggingStreamInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||
start := time.Now()
|
||||
err := handler(srv, ss)
|
||||
attrs := []any{
|
||||
"method", info.FullMethod,
|
||||
"duration", time.Since(start).String(),
|
||||
}
|
||||
if p, ok := peer.FromContext(ss.Context()); ok {
|
||||
attrs = append(attrs, "peer", p.Addr.String())
|
||||
}
|
||||
if err != nil {
|
||||
attrs = append(attrs, "err", err)
|
||||
slog.ErrorContext(ss.Context(), "rpc stream", attrs...)
|
||||
} else {
|
||||
slog.InfoContext(ss.Context(), "rpc stream", attrs...)
|
||||
}
|
||||
return err
|
||||
return token[:8] + "..."
|
||||
}
|
||||
|
||||
81
ha-gateway/internal/adapters/primary/grpc/interceptor.go
Normal file
81
ha-gateway/internal/adapters/primary/grpc/interceptor.go
Normal file
@ -0,0 +1,81 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"gitea.nik4nao.com/nik/home-services/ha-gateway/internal/logger"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func LoggingUnaryInterceptor(log *slog.Logger) grpc.UnaryServerInterceptor {
|
||||
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
|
||||
method, ok := grpc.Method(ctx)
|
||||
if !ok {
|
||||
method = info.FullMethod
|
||||
}
|
||||
reqLog := requestLogger(ctx, log, method)
|
||||
ctx = logger.WithLogger(ctx, reqLog)
|
||||
|
||||
start := time.Now()
|
||||
resp, err := handler(ctx, req)
|
||||
logCompletion(reqLog, "grpc call completed", status.Code(err), time.Since(start), err)
|
||||
return resp, err
|
||||
}
|
||||
}
|
||||
|
||||
func LoggingStreamInterceptor(log *slog.Logger) grpc.StreamServerInterceptor {
|
||||
return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||
reqLog := requestLogger(ss.Context(), log, info.FullMethod)
|
||||
reqLog.Info("grpc stream started")
|
||||
|
||||
wrapped := &loggingServerStream{
|
||||
ServerStream: ss,
|
||||
ctx: logger.WithLogger(ss.Context(), reqLog),
|
||||
}
|
||||
start := time.Now()
|
||||
err := handler(srv, wrapped)
|
||||
logCompletion(reqLog, "grpc stream completed", status.Code(err), time.Since(start), err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
type loggingServerStream struct {
|
||||
grpc.ServerStream
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (s *loggingServerStream) Context() context.Context {
|
||||
return s.ctx
|
||||
}
|
||||
|
||||
func requestLogger(ctx context.Context, log *slog.Logger, method string) *slog.Logger {
|
||||
peerAddr := ""
|
||||
if p, ok := peer.FromContext(ctx); ok && p.Addr != nil {
|
||||
peerAddr = p.Addr.String()
|
||||
}
|
||||
return log.With("grpc.method", method, "grpc.peer", peerAddr)
|
||||
}
|
||||
|
||||
func logCompletion(log *slog.Logger, msg string, code codes.Code, duration time.Duration, err error) {
|
||||
attrs := []any{
|
||||
"duration_ms", duration.Milliseconds(),
|
||||
"grpc.code", code.String(),
|
||||
}
|
||||
if err != nil {
|
||||
attrs = append(attrs, "error", err.Error())
|
||||
}
|
||||
|
||||
switch code {
|
||||
case codes.OK:
|
||||
log.Info(msg, attrs...)
|
||||
case codes.NotFound, codes.InvalidArgument, codes.Unimplemented:
|
||||
log.Warn(msg, attrs...)
|
||||
default:
|
||||
log.Error(msg, attrs...)
|
||||
}
|
||||
}
|
||||
@ -5,12 +5,14 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"gitea.nik4nao.com/nik/home-services/ha-gateway/internal/config"
|
||||
"gitea.nik4nao.com/nik/home-services/ha-gateway/internal/core/ports/driven"
|
||||
"gitea.nik4nao.com/nik/home-services/ha-gateway/internal/logger"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
@ -22,13 +24,15 @@ type Client struct {
|
||||
baseURL string
|
||||
token string
|
||||
httpClient *http.Client
|
||||
log *slog.Logger
|
||||
}
|
||||
|
||||
func NewClient(cfg *config.Config) *Client {
|
||||
func NewClient(cfg *config.Config, log *slog.Logger) *Client {
|
||||
return &Client{
|
||||
baseURL: strings.TrimRight(cfg.HABaseURL, "/"),
|
||||
token: cfg.HAToken,
|
||||
httpClient: &http.Client{Timeout: 10 * time.Second},
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
@ -90,8 +94,14 @@ func (c *Client) CallService(ctx context.Context, domain, service string, payloa
|
||||
req.Header.Set("Authorization", "Bearer "+c.token)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
log := logger.FromContext(ctx).With("ha.method", req.Method, "ha.path", req.URL.Path)
|
||||
start := time.Now()
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
log.Error("ha request failed",
|
||||
"duration_ms", time.Since(start).Milliseconds(),
|
||||
"error", err.Error(),
|
||||
)
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
return nil, fmt.Errorf("call service %s/%s: %w", domain, service, err)
|
||||
@ -104,11 +114,20 @@ func (c *Client) CallService(ctx context.Context, domain, service string, payloa
|
||||
if len(preview) > 200 {
|
||||
preview = preview[:200]
|
||||
}
|
||||
log.Error("ha request failed",
|
||||
"http.status", resp.StatusCode,
|
||||
"duration_ms", time.Since(start).Milliseconds(),
|
||||
"error", preview,
|
||||
)
|
||||
err := fmt.Errorf("HA returned %d: %s", resp.StatusCode, preview)
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
log.Debug("ha request completed",
|
||||
"http.status", resp.StatusCode,
|
||||
"duration_ms", time.Since(start).Milliseconds(),
|
||||
)
|
||||
|
||||
var raw []haStateRaw
|
||||
if err := json.Unmarshal(respBody, &raw); err != nil {
|
||||
@ -134,8 +153,14 @@ func (c *Client) get(ctx context.Context, path string, dst any) error {
|
||||
}
|
||||
req.Header.Set("Authorization", "Bearer "+c.token)
|
||||
|
||||
log := logger.FromContext(ctx).With("ha.method", req.Method, "ha.path", req.URL.Path)
|
||||
start := time.Now()
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
log.Error("ha request failed",
|
||||
"duration_ms", time.Since(start).Milliseconds(),
|
||||
"error", err.Error(),
|
||||
)
|
||||
return fmt.Errorf("GET %s: %w", path, err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
@ -146,8 +171,17 @@ func (c *Client) get(ctx context.Context, path string, dst any) error {
|
||||
if len(preview) > 200 {
|
||||
preview = preview[:200]
|
||||
}
|
||||
log.Error("ha request failed",
|
||||
"http.status", resp.StatusCode,
|
||||
"duration_ms", time.Since(start).Milliseconds(),
|
||||
"error", preview,
|
||||
)
|
||||
return fmt.Errorf("HA returned %d for GET %s: %s", resp.StatusCode, path, preview)
|
||||
}
|
||||
log.Debug("ha request completed",
|
||||
"http.status", resp.StatusCode,
|
||||
"duration_ms", time.Since(start).Milliseconds(),
|
||||
)
|
||||
|
||||
if err := json.Unmarshal(body, dst); err != nil {
|
||||
return fmt.Errorf("decode response for GET %s: %w", path, err)
|
||||
|
||||
@ -10,6 +10,8 @@ type Config struct {
|
||||
HABaseURL string // HA_BASE_URL, e.g. "http://ha.home.arpa:8123"
|
||||
HAToken string // HA_TOKEN — long-lived access token (required)
|
||||
OTELEndpoint string // OTEL_ENDPOINT, e.g. "otel-collector.monitoring.svc:4317"
|
||||
LogLevel string // LOG_LEVEL, default "info"
|
||||
LogFormat string // LOG_FORMAT, default "json"
|
||||
// empty = telemetry disabled (local dev default)
|
||||
}
|
||||
|
||||
@ -29,5 +31,14 @@ func Load() (*Config, error) {
|
||||
HABaseURL: os.Getenv("HA_BASE_URL"),
|
||||
HAToken: token,
|
||||
OTELEndpoint: os.Getenv("OTEL_ENDPOINT"),
|
||||
LogLevel: getenvDefault("LOG_LEVEL", "info"),
|
||||
LogFormat: getenvDefault("LOG_FORMAT", "json"),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func getenvDefault(key, fallback string) string {
|
||||
if v := os.Getenv(key); v != "" {
|
||||
return v
|
||||
}
|
||||
return fallback
|
||||
}
|
||||
|
||||
35
ha-gateway/internal/logger/logger.go
Normal file
35
ha-gateway/internal/logger/logger.go
Normal file
@ -0,0 +1,35 @@
|
||||
package logger
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
)
|
||||
|
||||
type contextKey struct{}
|
||||
|
||||
func New(format, level string) *slog.Logger {
|
||||
var parsed slog.Level
|
||||
if err := parsed.UnmarshalText([]byte(level)); err != nil {
|
||||
_, _ = fmt.Fprintf(os.Stderr, "invalid log level %q, falling back to info\n", level)
|
||||
parsed = slog.LevelInfo
|
||||
}
|
||||
|
||||
opts := &slog.HandlerOptions{Level: parsed}
|
||||
if format == "json" {
|
||||
return slog.New(slog.NewJSONHandler(os.Stdout, opts))
|
||||
}
|
||||
return slog.New(slog.NewTextHandler(os.Stdout, opts))
|
||||
}
|
||||
|
||||
func WithLogger(ctx context.Context, l *slog.Logger) context.Context {
|
||||
return context.WithValue(ctx, contextKey{}, l)
|
||||
}
|
||||
|
||||
func FromContext(ctx context.Context) *slog.Logger {
|
||||
if l, ok := ctx.Value(contextKey{}).(*slog.Logger); ok && l != nil {
|
||||
return l
|
||||
}
|
||||
return slog.Default()
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user