diff --git a/modules/nixos/services/media/servarr/default.nix b/modules/nixos/services/media/servarr/default.nix index 23255c0..47461ef 100644 --- a/modules/nixos/services/media/servarr/default.nix +++ b/modules/nixos/services/media/servarr/default.nix @@ -217,7 +217,7 @@ in { { method = 1; # HTTP METHOD 1=POST, 2=PUT name = "Arrtrix"; - url = "http://[::1]${config'.services.arrtrix.settings.appservice.port}"; + url = "http://[::1]${toString config'.services.arrtrix.settings.appservice.port}"; } // (lib.optionalAttrs (lib.elem service ["radarr" "whisparr"]) { onMovieDelete = true; diff --git a/modules/nixos/services/observability/alloy/default.nix b/modules/nixos/services/observability/alloy/default.nix new file mode 100644 index 0000000..8385f8f --- /dev/null +++ b/modules/nixos/services/observability/alloy/default.nix @@ -0,0 +1,80 @@ +{ config, lib, namespace, ... }: +let + inherit (builtins) toString; + inherit (lib) mkEnableOption mkIf; + + cfg = config.${namespace}.services.observability.alloy; + + httpPort = 9007; + otlpGrpcPort = 9010; + otlpHttpPort = 9011; + tempoOtlpGrpcPort = 9009; +in +{ + options.${namespace}.services.observability.alloy = { + enable = mkEnableOption "enable Grafana Alloy"; + }; + + config = mkIf cfg.enable { + services.alloy = { + enable = true; + configPath = "/etc/alloy"; + extraFlags = [ + "--disable-reporting" + "--server.http.listen-addr=0.0.0.0:${toString httpPort}" + "--storage.path=/var/lib/alloy" + ]; + }; + + environment.etc."alloy/config.alloy".text = '' + otelcol.receiver.otlp "default" { + grpc { + endpoint = "127.0.0.1:${toString otlpGrpcPort}" + } + + http { + endpoint = "127.0.0.1:${toString otlpHttpPort}" + } + + output { + metrics = [otelcol.processor.batch.metrics.input] + traces = [otelcol.processor.batch.traces.input] + } + } + + otelcol.processor.batch "metrics" { + output { + metrics = [otelcol.exporter.prometheus.default.input] + } + } + + otelcol.processor.batch "traces" { + output { + traces = [otelcol.exporter.otlp.tempo.input] + } + } + + otelcol.exporter.prometheus "default" { + forward_to = [prometheus.remote_write.local.receiver] + } + + prometheus.remote_write "local" { + endpoint { + url = "http://127.0.0.1:${toString config.services.prometheus.port}/api/v1/write" + } + } + + otelcol.exporter.otlp "tempo" { + client { + endpoint = "127.0.0.1:${toString tempoOtlpGrpcPort}" + + tls { + insecure = true + } + } + } + ''; + + networking.firewall.allowedTCPPorts = [ httpPort ]; + }; +} diff --git a/modules/nixos/services/observability/grafana/default.nix b/modules/nixos/services/observability/grafana/default.nix index a867351..d2ed0e7 100644 --- a/modules/nixos/services/observability/grafana/default.nix +++ b/modules/nixos/services/observability/grafana/default.nix @@ -102,23 +102,43 @@ in { }; datasources.settings.datasources = [ - { - name = "Prometheus"; - type = "prometheus"; - url = "http://localhost:9005"; - isDefault = true; - editable = false; - } + { + name = "Prometheus"; + uid = "prometheus"; + type = "prometheus"; + url = "http://localhost:9002"; + isDefault = true; + editable = false; + } - { - name = "Loki"; - type = "loki"; - url = "http://localhost:9003"; - editable = false; - } - ]; - }; - }; + { + name = "Loki"; + uid = "loki"; + type = "loki"; + url = "http://localhost:9003"; + editable = false; + } + + { + name = "Tempo"; + uid = "tempo"; + type = "tempo"; + url = "http://localhost:9006"; + editable = false; + jsonData = { + nodeGraph.enabled = true; + serviceMap.datasourceUid = "prometheus"; + tracesToLogsV2 = { + datasourceUid = "loki"; + filterByTraceID = true; + spanStartTimeShift = "-1h"; + spanEndTimeShift = "1h"; + }; + }; + } + ]; + }; + }; postgresql = { enable = true; diff --git a/modules/nixos/services/observability/prometheus/default.nix b/modules/nixos/services/observability/prometheus/default.nix index af5ee9d..3faa278 100644 --- a/modules/nixos/services/observability/prometheus/default.nix +++ b/modules/nixos/services/observability/prometheus/default.nix @@ -1,7 +1,7 @@ { pkgs, config, lib, namespace, ... }: let inherit (builtins) toString; - inherit (lib) mkIf mkEnableOption; + inherit (lib) mkEnableOption mkIf optionals; cfg = config.${namespace}.services.observability.prometheus; in @@ -14,6 +14,9 @@ in services.prometheus = { enable = true; port = 9002; + extraFlags = optionals config.${namespace}.services.observability.alloy.enable [ + "--web.enable-remote-write-receiver" + ]; globalConfig.scrape_interval = "15s"; @@ -31,6 +34,22 @@ in { targets = [ "localhost:${toString config.services.prometheus.exporters.node.port}" ]; } ]; } + ] + ++ optionals config.${namespace}.services.observability.alloy.enable [ + { + job_name = "alloy"; + static_configs = [ + { targets = [ "localhost:9007" ]; } + ]; + } + ] + ++ optionals config.${namespace}.services.observability.tempo.enable [ + { + job_name = "tempo"; + static_configs = [ + { targets = [ "localhost:9006" ]; } + ]; + } ]; exporters = { diff --git a/modules/nixos/services/observability/tempo/default.nix b/modules/nixos/services/observability/tempo/default.nix new file mode 100644 index 0000000..10b07d7 --- /dev/null +++ b/modules/nixos/services/observability/tempo/default.nix @@ -0,0 +1,48 @@ +{ config, lib, namespace, ... }: +let + inherit (lib) mkEnableOption mkIf; + + cfg = config.${namespace}.services.observability.tempo; + + httpPort = 9006; + grpcPort = 9008; + otlpGrpcPort = 9009; + otlpHttpPort = 9012; +in +{ + options.${namespace}.services.observability.tempo = { + enable = mkEnableOption "enable Grafana Tempo"; + }; + + config = mkIf cfg.enable { + services.tempo = { + enable = true; + settings = { + auth_enabled = false; + search_enabled = true; + + server = { + http_listen_address = "0.0.0.0"; + http_listen_port = httpPort; + grpc_listen_address = "127.0.0.1"; + grpc_listen_port = grpcPort; + }; + + distributor.receivers.otlp.protocols = { + grpc.endpoint = "127.0.0.1:${builtins.toString otlpGrpcPort}"; + http.endpoint = "127.0.0.1:${builtins.toString otlpHttpPort}"; + }; + + storage.trace = { + backend = "local"; + wal.path = "/var/lib/tempo/wal"; + local.path = "/var/lib/tempo/traces"; + }; + + compactor.compaction.block_retention = "168h"; + }; + }; + + networking.firewall.allowedTCPPorts = [ httpPort ]; + }; +} diff --git a/packages/arrtrix/pkg/observability/config.go b/packages/arrtrix/pkg/observability/config.go new file mode 100644 index 0000000..187c5b5 --- /dev/null +++ b/packages/arrtrix/pkg/observability/config.go @@ -0,0 +1,22 @@ +package observability + +import "strings" + +type Config struct { + OTLPGRPCEndpoint string `yaml:"otlp_grpc_endpoint"` + ServiceName string `yaml:"service_name"` + ResourceAttributes map[string]string `yaml:"resource_attributes"` +} + +func (c *Config) ApplyDefaults() { + if c.ServiceName == "" { + c.ServiceName = "arrtrix" + } + if c.ResourceAttributes == nil { + c.ResourceAttributes = map[string]string{} + } +} + +func (c Config) Enabled() bool { + return strings.TrimSpace(c.OTLPGRPCEndpoint) != "" +} diff --git a/packages/arrtrix/pkg/observability/otel.go b/packages/arrtrix/pkg/observability/otel.go new file mode 100644 index 0000000..2fe46ef --- /dev/null +++ b/packages/arrtrix/pkg/observability/otel.go @@ -0,0 +1,397 @@ +package observability + +import ( + "context" + "errors" + "fmt" + "net/url" + "strings" + "sync" + "time" + + "github.com/rs/zerolog" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + otellog "go.opentelemetry.io/otel/log" + logglobal "go.opentelemetry.io/otel/log/global" + otelmetric "go.opentelemetry.io/otel/metric" + sdklog "go.opentelemetry.io/otel/sdk/log" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" +) + +const ( + instrumentationScope = "sneeuwvlok/packages/arrtrix" + logScope = instrumentationScope + "/logs" +) + +type Runtime struct { + traceProvider *sdktrace.TracerProvider + meterProvider *sdkmetric.MeterProvider + logProvider *sdklog.LoggerProvider + logHook zerolog.Hook +} + +type exporterEndpoint struct { + raw string + insecure bool +} + +type instruments struct { + webhookRequests otelCounter + webhookLatency otelHistogram + commandInvocations otelCounter + inviteEvents otelCounter + startupDuration otelHistogram +} + +type otelCounter interface { + Add(context.Context, int64, ...otelmetric.AddOption) +} + +type otelHistogram interface { + Record(context.Context, float64, ...otelmetric.RecordOption) +} + +var ( + mu sync.RWMutex + current instruments + tracer = otel.Tracer(instrumentationScope) + currentReady bool +) + +func Setup(ctx context.Context, cfg Config, version string) (*Runtime, error) { + cfg.ApplyDefaults() + if !cfg.Enabled() { + resetInstruments() + return &Runtime{}, nil + } + + res, err := buildResource(cfg, version) + if err != nil { + return nil, err + } + endpoint, err := parseEndpoint(cfg.OTLPGRPCEndpoint) + if err != nil { + return nil, err + } + + traceExporter, err := otlptracegrpc.New(ctx, traceOptions(endpoint)...) + if err != nil { + return nil, fmt.Errorf("create trace exporter: %w", err) + } + metricExporter, err := otlpmetricgrpc.New(ctx, metricOptions(endpoint)...) + if err != nil { + return nil, fmt.Errorf("create metric exporter: %w", err) + } + logExporter, err := otlploggrpc.New(ctx, logOptions(endpoint)...) + if err != nil { + return nil, fmt.Errorf("create log exporter: %w", err) + } + + traceProvider := sdktrace.NewTracerProvider( + sdktrace.WithResource(res), + sdktrace.WithBatcher(traceExporter), + ) + meterProvider := sdkmetric.NewMeterProvider( + sdkmetric.WithResource(res), + sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter, sdkmetric.WithInterval(30*time.Second))), + ) + logProvider := sdklog.NewLoggerProvider( + sdklog.WithResource(res), + sdklog.WithProcessor(sdklog.NewBatchProcessor(logExporter)), + ) + + otel.SetTracerProvider(traceProvider) + otel.SetMeterProvider(meterProvider) + logglobal.SetLoggerProvider(logProvider) + + if err = setInstruments(meterProvider); err != nil { + _ = traceProvider.Shutdown(ctx) + _ = meterProvider.Shutdown(ctx) + _ = logProvider.Shutdown(ctx) + return nil, err + } + + tracer = otel.Tracer(instrumentationScope) + return &Runtime{ + traceProvider: traceProvider, + meterProvider: meterProvider, + logProvider: logProvider, + logHook: newLogHook(logglobal.Logger(logScope)), + }, nil +} + +func (r *Runtime) Enabled() bool { + return r != nil && r.traceProvider != nil +} + +func (r *Runtime) LoggerHook() zerolog.Hook { + if r == nil { + return nil + } + return r.logHook +} + +func (r *Runtime) Shutdown(ctx context.Context) error { + if r == nil || !r.Enabled() { + resetInstruments() + return nil + } + + var errs []error + if err := r.logProvider.Shutdown(ctx); err != nil { + errs = append(errs, fmt.Errorf("shutdown log provider: %w", err)) + } + if err := r.meterProvider.Shutdown(ctx); err != nil { + errs = append(errs, fmt.Errorf("shutdown meter provider: %w", err)) + } + if err := r.traceProvider.Shutdown(ctx); err != nil { + errs = append(errs, fmt.Errorf("shutdown trace provider: %w", err)) + } + resetInstruments() + return errors.Join(errs...) +} + +func StartSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { + return tracer.Start(ctx, name, opts...) +} + +func RecordWebhook(ctx context.Context, eventType, outcome string, statusCode int, duration time.Duration) { + mu.RLock() + inst := current + ready := currentReady + mu.RUnlock() + if !ready { + return + } + attrs := otelmetric.WithAttributes( + attribute.String("event_type", eventType), + attribute.String("outcome", outcome), + attribute.Int("http.status_code", statusCode), + ) + inst.webhookRequests.Add(ctx, 1, attrs) + inst.webhookLatency.Record(ctx, duration.Seconds(), attrs) +} + +func RecordCommand(ctx context.Context, name, outcome string) { + mu.RLock() + inst := current + ready := currentReady + mu.RUnlock() + if !ready { + return + } + inst.commandInvocations.Add(ctx, 1, otelmetric.WithAttributes( + attribute.String("command", name), + attribute.String("outcome", outcome), + )) +} + +func RecordInvite(ctx context.Context, outcome string) { + mu.RLock() + inst := current + ready := currentReady + mu.RUnlock() + if !ready { + return + } + inst.inviteEvents.Add(ctx, 1, otelmetric.WithAttributes(attribute.String("outcome", outcome))) +} + +func RecordStartupPhase(ctx context.Context, phase, outcome string, duration time.Duration) { + mu.RLock() + inst := current + ready := currentReady + mu.RUnlock() + if !ready { + return + } + inst.startupDuration.Record(ctx, duration.Seconds(), otelmetric.WithAttributes( + attribute.String("phase", phase), + attribute.String("outcome", outcome), + )) +} + +func parseEndpoint(raw string) (exporterEndpoint, error) { + raw = strings.TrimSpace(raw) + if raw == "" { + return exporterEndpoint{}, errors.New("observability.otlp_grpc_endpoint must not be empty when observability is enabled") + } + if strings.Contains(raw, "://") { + u, err := url.Parse(raw) + if err != nil { + return exporterEndpoint{}, fmt.Errorf("parse observability.otlp_grpc_endpoint: %w", err) + } + if u.Scheme == "" || u.Host == "" { + return exporterEndpoint{}, fmt.Errorf("invalid observability.otlp_grpc_endpoint %q", raw) + } + return exporterEndpoint{raw: raw, insecure: u.Scheme == "http"}, nil + } + return exporterEndpoint{raw: "http://" + raw, insecure: true}, nil +} + +func buildResource(cfg Config, version string) (*resource.Resource, error) { + attrs := []attribute.KeyValue{ + attribute.String("service.name", cfg.ServiceName), + } + if version != "" { + attrs = append(attrs, attribute.String("service.version", version)) + } + for key, value := range cfg.ResourceAttributes { + attrs = append(attrs, attribute.String(key, value)) + } + return resource.Merge(resource.Default(), resource.NewWithAttributes("", attrs...)) +} + +func setInstruments(provider *sdkmetric.MeterProvider) error { + meter := provider.Meter(instrumentationScope) + + webhookRequests, err := meter.Int64Counter( + "arrtrix.webhook.requests", + otelmetric.WithDescription("Number of Arr webhook requests handled by arrtrix."), + ) + if err != nil { + return fmt.Errorf("create webhook request counter: %w", err) + } + webhookLatency, err := meter.Float64Histogram( + "arrtrix.webhook.duration.seconds", + otelmetric.WithDescription("Duration of Arr webhook request handling."), + otelmetric.WithUnit("s"), + ) + if err != nil { + return fmt.Errorf("create webhook duration histogram: %w", err) + } + commandInvocations, err := meter.Int64Counter( + "arrtrix.matrix.commands", + otelmetric.WithDescription("Number of Matrix management-room commands handled by arrtrix."), + ) + if err != nil { + return fmt.Errorf("create command counter: %w", err) + } + inviteEvents, err := meter.Int64Counter( + "arrtrix.matrix.invites", + otelmetric.WithDescription("Number of management-room invite flows observed by arrtrix."), + ) + if err != nil { + return fmt.Errorf("create invite counter: %w", err) + } + startupDuration, err := meter.Float64Histogram( + "arrtrix.runtime.phase.duration.seconds", + otelmetric.WithDescription("Duration of arrtrix runtime startup and shutdown phases."), + otelmetric.WithUnit("s"), + ) + if err != nil { + return fmt.Errorf("create runtime duration histogram: %w", err) + } + + mu.Lock() + current = instruments{ + webhookRequests: webhookRequests, + webhookLatency: webhookLatency, + commandInvocations: commandInvocations, + inviteEvents: inviteEvents, + startupDuration: startupDuration, + } + currentReady = true + mu.Unlock() + return nil +} + +func resetInstruments() { + mu.Lock() + current = instruments{} + currentReady = false + mu.Unlock() +} + +func traceOptions(endpoint exporterEndpoint) []otlptracegrpc.Option { + opts := []otlptracegrpc.Option{otlptracegrpc.WithEndpointURL(endpoint.raw)} + if endpoint.insecure { + opts = append(opts, otlptracegrpc.WithInsecure()) + } + return opts +} + +func metricOptions(endpoint exporterEndpoint) []otlpmetricgrpc.Option { + opts := []otlpmetricgrpc.Option{otlpmetricgrpc.WithEndpointURL(endpoint.raw)} + if endpoint.insecure { + opts = append(opts, otlpmetricgrpc.WithInsecure()) + } + return opts +} + +func logOptions(endpoint exporterEndpoint) []otlploggrpc.Option { + opts := []otlploggrpc.Option{otlploggrpc.WithEndpointURL(endpoint.raw)} + if endpoint.insecure { + opts = append(opts, otlploggrpc.WithInsecure()) + } + return opts +} + +type otelLogHook struct { + logger otellog.Logger +} + +func newLogHook(logger otellog.Logger) zerolog.Hook { + return otelLogHook{logger: logger} +} + +func (h otelLogHook) Run(e *zerolog.Event, level zerolog.Level, message string) { + if h.logger == nil { + return + } + ctx := e.GetCtx() + if ctx == nil { + ctx = context.Background() + } + + severity := mapSeverity(level) + if !h.logger.Enabled(ctx, otellog.EnabledParameters{Severity: severity}) { + return + } + + now := time.Now() + record := otellog.Record{} + record.SetTimestamp(now) + record.SetObservedTimestamp(now) + record.SetSeverity(severity) + record.SetSeverityText(strings.ToUpper(level.String())) + record.SetBody(otellog.StringValue(message)) + record.AddAttributes(otellog.String("log.scope", logScope)) + + if spanCtx := trace.SpanContextFromContext(ctx); spanCtx.IsValid() { + record.AddAttributes( + otellog.String("trace_id", spanCtx.TraceID().String()), + otellog.String("span_id", spanCtx.SpanID().String()), + ) + } + + h.logger.Emit(ctx, record) +} + +func mapSeverity(level zerolog.Level) otellog.Severity { + switch level { + case zerolog.TraceLevel: + return otellog.SeverityTrace + case zerolog.DebugLevel: + return otellog.SeverityDebug + case zerolog.InfoLevel: + return otellog.SeverityInfo + case zerolog.WarnLevel: + return otellog.SeverityWarn + case zerolog.ErrorLevel: + return otellog.SeverityError + case zerolog.FatalLevel: + return otellog.SeverityFatal + case zerolog.PanicLevel: + return otellog.SeverityFatal4 + default: + return otellog.SeverityUndefined + } +} diff --git a/packages/arrtrix/pkg/observability/otel_test.go b/packages/arrtrix/pkg/observability/otel_test.go new file mode 100644 index 0000000..4dd8e3e --- /dev/null +++ b/packages/arrtrix/pkg/observability/otel_test.go @@ -0,0 +1,54 @@ +package observability + +import "testing" + +func TestConfigDefaults(t *testing.T) { + var cfg Config + cfg.ApplyDefaults() + + if cfg.ServiceName != "arrtrix" { + t.Fatalf("expected default service name arrtrix, got %q", cfg.ServiceName) + } + if cfg.ResourceAttributes == nil { + t.Fatal("expected resource attributes map to be initialized") + } + if cfg.Enabled() { + t.Fatal("expected observability to be disabled by default") + } +} + +func TestParseEndpointSupportsURLAndBareHost(t *testing.T) { + tests := []struct { + name string + input string + wantRaw string + insecure bool + wantError bool + }{ + {name: "https url", input: "https://otel.example:4317", wantRaw: "https://otel.example:4317"}, + {name: "http url", input: "http://127.0.0.1:4317", wantRaw: "http://127.0.0.1:4317", insecure: true}, + {name: "bare host", input: "collector:4317", wantRaw: "http://collector:4317", insecure: true}, + {name: "invalid", input: "://bad", wantError: true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseEndpoint(tt.input) + if tt.wantError { + if err == nil { + t.Fatal("expected error") + } + return + } + if err != nil { + t.Fatalf("parseEndpoint returned error: %v", err) + } + if got.raw != tt.wantRaw { + t.Fatalf("expected raw endpoint %q, got %q", tt.wantRaw, got.raw) + } + if got.insecure != tt.insecure { + t.Fatalf("expected insecure=%t, got %t", tt.insecure, got.insecure) + } + }) + } +} diff --git a/systems/x86_64-linux/ulmo/default.nix b/systems/x86_64-linux/ulmo/default.nix index 7c20a11..57f57d3 100644 --- a/systems/x86_64-linux/ulmo/default.nix +++ b/systems/x86_64-linux/ulmo/default.nix @@ -256,10 +256,12 @@ }; observability = { + alloy.enable = true; grafana.enable = true; - prometheus.enable = true; loki.enable = true; + prometheus.enable = true; promtail.enable = true; + tempo.enable = true; # uptime-kuma.enable = true; };