156 lines
4.9 KiB
Go

package main
import (
"context"
"log/slog"
"net"
"os"
"os/signal"
"syscall"
"time"
"github.com/joho/godotenv"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/reflection"
hav1 "gitea.nik4nao.com/nik/home-services/gen/ha/v1"
grpcadapter "gitea.nik4nao.com/nik/home-services/ha-gateway/internal/adapters/primary/grpc"
"gitea.nik4nao.com/nik/home-services/ha-gateway/internal/adapters/secondary/ha"
"gitea.nik4nao.com/nik/home-services/ha-gateway/internal/app"
"gitea.nik4nao.com/nik/home-services/ha-gateway/internal/config"
"gitea.nik4nao.com/nik/home-services/ha-gateway/internal/telemetry"
)
// MEMO: auth is not implemented.
// Add one of the following before exposing this service to any untrusted network:
// Option A — shared API key per client: unary + stream interceptors read
// "authorization" from gRPC metadata and compare to a secret
// from config. Good for small number of known clients.
// Option B — mTLS (recommended): tls.Config with ClientAuth: RequireAndVerifyClientCert,
// cert pool from the internal CA. Each client gets a cert from
// cert-manager. No runtime auth dependency, identity in the cert CN/SAN.
// version is set at build time via -ldflags "-X main.version=<tag>".
var version = "dev"
func main() {
// 1. Load .env if present (ignored in K8s where env vars come from Secrets/ConfigMaps).
_ = godotenv.Load()
// 2. Load and validate config.
cfg, err := config.Load()
if err != nil {
slog.Error("config error", "err", err)
os.Exit(1)
}
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop()
// 3. Set up telemetry.
shutdown, err := telemetry.Setup(ctx, cfg, version)
if err != nil {
slog.Error("telemetry setup failed", "err", err)
os.Exit(1)
}
// 4. Build the secondary adapter.
haClient := ha.NewClient(cfg)
// 5. Build application services.
entityApp := app.NewEntityApp(haClient)
lightApp := app.NewLightApp(haClient)
switchApp := app.NewSwitchApp(haClient)
// 5a. Prime the discovery cache. Non-fatal — ListLights/ListSwitches will
// retry lazily on the first call if this fails (e.g. HA not yet ready).
if err := lightApp.Refresh(ctx); err != nil {
slog.Warn("initial light discovery failed, will retry on first request", "err", err)
}
if err := switchApp.Refresh(ctx); err != nil {
slog.Warn("initial switch discovery failed, will retry on first request", "err", err)
}
// 6. Build the gRPC server with OTel stats handler and logging interceptors.
srv := grpc.NewServer(
grpc.StatsHandler(otelgrpc.NewServerHandler()),
grpc.ChainUnaryInterceptor(loggingUnaryInterceptor),
grpc.ChainStreamInterceptor(loggingStreamInterceptor),
)
// 7. Register services.
hav1.RegisterEntityServiceServer(srv, grpcadapter.NewEntityGRPC(entityApp))
hav1.RegisterLightServiceServer(srv, grpcadapter.NewLightGRPC(lightApp))
hav1.RegisterSwitchServiceServer(srv, grpcadapter.NewSwitchGRPC(switchApp))
hav1.RegisterEventServiceServer(srv, &grpcadapter.EventGRPC{})
reflection.Register(srv)
// 8. Start listener.
lis, err := net.Listen("tcp", ":"+cfg.GRPCPort)
if err != nil {
slog.Error("listen failed", "err", err)
os.Exit(1)
}
// 9. Serve in background.
go func() {
slog.Info("listening", "port", cfg.GRPCPort, "version", version)
if err := srv.Serve(lis); err != nil {
slog.Error("serve failed", "err", err)
}
}()
// 10. Block until signal.
<-ctx.Done()
slog.Info("shutting down")
// 11. Graceful stop, then flush telemetry.
srv.GracefulStop()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := shutdown(shutdownCtx); err != nil {
slog.Error("telemetry shutdown error", "err", err)
}
}
func loggingUnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
start := time.Now()
resp, err := handler(ctx, req)
attrs := []any{
"method", info.FullMethod,
"duration", time.Since(start).String(),
}
if p, ok := peer.FromContext(ctx); ok {
attrs = append(attrs, "peer", p.Addr.String())
}
if err != nil {
attrs = append(attrs, "err", err)
slog.ErrorContext(ctx, "rpc", attrs...)
} else {
slog.InfoContext(ctx, "rpc", attrs...)
}
return resp, err
}
func loggingStreamInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
start := time.Now()
err := handler(srv, ss)
attrs := []any{
"method", info.FullMethod,
"duration", time.Since(start).String(),
}
if p, ok := peer.FromContext(ss.Context()); ok {
attrs = append(attrs, "peer", p.Addr.String())
}
if err != nil {
attrs = append(attrs, "err", err)
slog.ErrorContext(ss.Context(), "rpc stream", attrs...)
} else {
slog.InfoContext(ss.Context(), "rpc stream", attrs...)
}
return err
}