From e26e25b566a5d48865d413f7f87302054affcdfa Mon Sep 17 00:00:00 2001 From: Chris Kruining Date: Thu, 16 Apr 2026 10:41:16 +0200 Subject: [PATCH] Change observability service ports and add Arrtrix content management - Update ports for Alloy, Grafana, Loki, Prometheus, Promtail, Tempo, and Uptime Kuma to new ranges - Add Arrtrix content management commands and subscriptions - Implement Radarr and Sonarr client logic for movie and series management - Add matrix commands for download and subscription management - Add subscription repository with database schema and logic - Update Arrtrix config and example config for content section - Update help text and command processor to include new commands - Update vendor hash for Arrtrix package --- .../services/observability/alloy/default.nix | 8 +- .../observability/grafana/default.nix | 8 +- .../services/observability/loki/default.nix | 4 +- .../observability/prometheus/default.nix | 12 +- .../observability/promtail/default.nix | 6 +- .../services/observability/tempo/default.nix | 8 +- .../observability/uptime-kuma/default.nix | 4 +- packages/arrtrix/default.nix | 2 +- packages/arrtrix/pkg/arr/catalog.go | 76 ++++++ packages/arrtrix/pkg/arrclient/client.go | 211 +++++++++++++++++ packages/arrtrix/pkg/arrclient/radarr.go | 164 +++++++++++++ packages/arrtrix/pkg/arrclient/sonarr.go | 149 ++++++++++++ packages/arrtrix/pkg/connector/config.go | 42 +++- packages/arrtrix/pkg/connector/connector.go | 30 ++- .../arrtrix/pkg/connector/example-config.yaml | 23 +- packages/arrtrix/pkg/matrixcmd/download.go | 222 ++++++++++++++++++ packages/arrtrix/pkg/matrixcmd/help_test.go | 2 + packages/arrtrix/pkg/matrixcmd/processor.go | 2 + .../arrtrix/pkg/matrixcmd/subscriptions.go | 107 +++++++++ packages/arrtrix/pkg/runtime/main.go | 5 + packages/arrtrix/pkg/subscriptions/repo.go | 141 +++++++++++ packages/arrtrix/pkg/webhook/arr.go | 180 ++++++++++---- packages/arrtrix/pkg/webhook/arr_test.go | 14 +- systems/x86_64-linux/ulmo/default.nix | 2 +- 24 files changed, 1340 insertions(+), 82 deletions(-) create mode 100644 packages/arrtrix/pkg/arr/catalog.go create mode 100644 packages/arrtrix/pkg/arrclient/client.go create mode 100644 packages/arrtrix/pkg/arrclient/radarr.go create mode 100644 packages/arrtrix/pkg/arrclient/sonarr.go create mode 100644 packages/arrtrix/pkg/matrixcmd/download.go create mode 100644 packages/arrtrix/pkg/matrixcmd/subscriptions.go create mode 100644 packages/arrtrix/pkg/subscriptions/repo.go diff --git a/modules/nixos/services/observability/alloy/default.nix b/modules/nixos/services/observability/alloy/default.nix index 8385f8f..4b6d787 100644 --- a/modules/nixos/services/observability/alloy/default.nix +++ b/modules/nixos/services/observability/alloy/default.nix @@ -5,10 +5,10 @@ let cfg = config.${namespace}.services.observability.alloy; - httpPort = 9007; - otlpGrpcPort = 9010; - otlpHttpPort = 9011; - tempoOtlpGrpcPort = 9009; + httpPort = 9700; + otlpGrpcPort = 9701; + otlpHttpPort = 9702; + tempoOtlpGrpcPort = 9602; in { options.${namespace}.services.observability.alloy = { diff --git a/modules/nixos/services/observability/grafana/default.nix b/modules/nixos/services/observability/grafana/default.nix index d2ed0e7..05fb1da 100644 --- a/modules/nixos/services/observability/grafana/default.nix +++ b/modules/nixos/services/observability/grafana/default.nix @@ -25,7 +25,7 @@ in { settings = { server = { - http_port = 9001; + http_port = 9100; http_addr = "0.0.0.0"; domain = "ulmo"; }; @@ -106,7 +106,7 @@ in { name = "Prometheus"; uid = "prometheus"; type = "prometheus"; - url = "http://localhost:9002"; + url = "http://localhost:9200"; isDefault = true; editable = false; } @@ -115,7 +115,7 @@ in { name = "Loki"; uid = "loki"; type = "loki"; - url = "http://localhost:9003"; + url = "http://localhost:9300"; editable = false; } @@ -123,7 +123,7 @@ in { name = "Tempo"; uid = "tempo"; type = "tempo"; - url = "http://localhost:9006"; + url = "http://localhost:9600"; editable = false; jsonData = { nodeGraph.enabled = true; diff --git a/modules/nixos/services/observability/loki/default.nix b/modules/nixos/services/observability/loki/default.nix index d4774ac..e99448e 100644 --- a/modules/nixos/services/observability/loki/default.nix +++ b/modules/nixos/services/observability/loki/default.nix @@ -17,7 +17,7 @@ in auth_enabled = false; server = { - http_listen_port = 9003; + http_listen_port = 9300; }; common = { @@ -44,6 +44,6 @@ in }; }; - networking.firewall.allowedTCPPorts = [ 9003 ]; + networking.firewall.allowedTCPPorts = [ 9300 ]; }; } diff --git a/modules/nixos/services/observability/prometheus/default.nix b/modules/nixos/services/observability/prometheus/default.nix index 3faa278..fc09e01 100644 --- a/modules/nixos/services/observability/prometheus/default.nix +++ b/modules/nixos/services/observability/prometheus/default.nix @@ -13,7 +13,7 @@ in config = mkIf cfg.enable { services.prometheus = { enable = true; - port = 9002; + port = 9200; extraFlags = optionals config.${namespace}.services.observability.alloy.enable [ "--web.enable-remote-write-receiver" ]; @@ -24,7 +24,7 @@ in { job_name = "prometheus"; static_configs = [ - { targets = [ "localhost:9002" ]; } + { targets = [ "localhost:9200" ]; } ]; } @@ -39,7 +39,7 @@ in { job_name = "alloy"; static_configs = [ - { targets = [ "localhost:9007" ]; } + { targets = [ "localhost:9700" ]; } ]; } ] @@ -47,7 +47,7 @@ in { job_name = "tempo"; static_configs = [ - { targets = [ "localhost:9006" ]; } + { targets = [ "localhost:9600" ]; } ]; } ]; @@ -55,13 +55,13 @@ in exporters = { node = { enable = true; - port = 9005; + port = 9201; enabledCollectors = [ "systemd" ]; openFirewall = true; }; }; }; - networking.firewall.allowedTCPPorts = [ 9002 ]; + networking.firewall.allowedTCPPorts = [ 9200 ]; }; } diff --git a/modules/nixos/services/observability/promtail/default.nix b/modules/nixos/services/observability/promtail/default.nix index 38dbbab..40a1b87 100644 --- a/modules/nixos/services/observability/promtail/default.nix +++ b/modules/nixos/services/observability/promtail/default.nix @@ -25,7 +25,7 @@ in { configuration = { server = { - http_listen_port = 9004; + http_listen_port = 9400; grpc_listen_port = 0; }; @@ -35,7 +35,7 @@ in { clients = [ { - url = "http://[::1]:9003/loki/api/v1/push"; + url = "http://[::1]:9300/loki/api/v1/push"; } ]; @@ -60,6 +60,6 @@ in { }; }; - networking.firewall.allowedTCPPorts = [9004]; + networking.firewall.allowedTCPPorts = [9400]; }; } diff --git a/modules/nixos/services/observability/tempo/default.nix b/modules/nixos/services/observability/tempo/default.nix index 10b07d7..9a6bd89 100644 --- a/modules/nixos/services/observability/tempo/default.nix +++ b/modules/nixos/services/observability/tempo/default.nix @@ -4,10 +4,10 @@ let cfg = config.${namespace}.services.observability.tempo; - httpPort = 9006; - grpcPort = 9008; - otlpGrpcPort = 9009; - otlpHttpPort = 9012; + httpPort = 9600; + grpcPort = 9601; + otlpGrpcPort = 9602; + otlpHttpPort = 9603; in { options.${namespace}.services.observability.tempo = { diff --git a/modules/nixos/services/observability/uptime-kuma/default.nix b/modules/nixos/services/observability/uptime-kuma/default.nix index c23977b..f4dcde4 100644 --- a/modules/nixos/services/observability/uptime-kuma/default.nix +++ b/modules/nixos/services/observability/uptime-kuma/default.nix @@ -15,11 +15,11 @@ in enable = true; settings = { - PORT = toString 9006; + PORT = toString 9500; HOST = "0.0.0.0"; }; }; - networking.firewall.allowedTCPPorts = [ 9006 ]; + networking.firewall.allowedTCPPorts = [ 9500 ]; }; } diff --git a/packages/arrtrix/default.nix b/packages/arrtrix/default.nix index 81950f9..0113edb 100644 --- a/packages/arrtrix/default.nix +++ b/packages/arrtrix/default.nix @@ -11,7 +11,7 @@ buildGoModule rec { src = lib.cleanSource ./.; - vendorHash = "sha256-FbatoXcxZcnqVUmoj/jeSMFO/iTmD8uga47MoTdGcRw="; + vendorHash = "sha256-UYRit+v41djnCx+GFdEl/8WQsp2DzF4ywT9iv3m1pSc="; subPackages = ["cmd/arrtrix"]; buildInputs = [olm]; diff --git a/packages/arrtrix/pkg/arr/catalog.go b/packages/arrtrix/pkg/arr/catalog.go new file mode 100644 index 0000000..eb2f833 --- /dev/null +++ b/packages/arrtrix/pkg/arr/catalog.go @@ -0,0 +1,76 @@ +package arr + +import ( + "fmt" + "slices" + "strings" +) + +type ContentType string + +const ( + ContentTypeMovies ContentType = "movies" + ContentTypeSeries ContentType = "series" +) + +var supportedContentTypes = []ContentType{ + ContentTypeMovies, + ContentTypeSeries, +} + +var supportedEvents = map[ContentType][]string{ + ContentTypeMovies: {"Test", "Grab", "Download", "Rename", "MovieFileDelete", "MovieDelete"}, + ContentTypeSeries: {"Test", "Grab", "Download", "Rename", "EpisodeFileDelete", "SeriesDelete"}, +} + +func SupportedContentTypes() []ContentType { + return append([]ContentType(nil), supportedContentTypes...) +} + +func SupportedEventTypes(contentType ContentType) []string { + return append([]string(nil), supportedEvents[contentType]...) +} + +func ParseContentType(value string) (ContentType, error) { + contentType := ContentType(strings.ToLower(strings.TrimSpace(value))) + if slices.Contains(supportedContentTypes, contentType) { + return contentType, nil + } + return "", fmt.Errorf("unsupported content type %q (expected one of: %s)", value, Strings()) +} + +func ParseEventType(contentType ContentType, value string) (string, error) { + value = strings.TrimSpace(value) + if strings.EqualFold(value, "all") { + return "all", nil + } + for _, eventType := range supportedEvents[contentType] { + if strings.EqualFold(eventType, value) { + return eventType, nil + } + } + return "", fmt.Errorf("unsupported event type %q for %s", value, contentType) +} + +func SupportsEventType(contentType ContentType, eventType string) bool { + return slices.Contains(supportedEvents[contentType], strings.TrimSpace(eventType)) +} + +func (c ContentType) Label() string { + switch c { + case ContentTypeMovies: + return "movies" + case ContentTypeSeries: + return "series" + default: + return string(c) + } +} + +func Strings() string { + values := make([]string, 0, len(supportedContentTypes)) + for _, contentType := range supportedContentTypes { + values = append(values, string(contentType)) + } + return strings.Join(values, ", ") +} diff --git a/packages/arrtrix/pkg/arrclient/client.go b/packages/arrtrix/pkg/arrclient/client.go new file mode 100644 index 0000000..558dc52 --- /dev/null +++ b/packages/arrtrix/pkg/arrclient/client.go @@ -0,0 +1,211 @@ +package arrclient + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "path" + "strings" + + "sneeuwvlok/packages/arrtrix/pkg/arr" +) + +type Client interface { + ContentType() arr.ContentType + Search(context.Context, string) ([]SearchResult, error) + List(context.Context, string) ([]ManagedItem, error) + Add(context.Context, SearchResult) (*ManagedItem, error) + SetMonitored(context.Context, int64, bool) (*ManagedItem, error) + Delete(context.Context, int64) error +} + +type SearchResult struct { + LookupID int64 + Title string + Year int + Overview string +} + +type ManagedItem struct { + ID int64 + LookupID int64 + Title string + Year int + Monitored bool + Path string +} + +type RadarrConfig struct { + URL string `yaml:"url"` + APIKey string `yaml:"api_key"` + RootFolderPath string `yaml:"root_folder_path"` + QualityProfileID int64 `yaml:"quality_profile_id"` + MinimumAvailability string `yaml:"minimum_availability"` + SearchOnAdd *bool `yaml:"search_on_add"` +} + +type SonarrConfig struct { + URL string `yaml:"url"` + APIKey string `yaml:"api_key"` + RootFolderPath string `yaml:"root_folder_path"` + QualityProfileID int64 `yaml:"quality_profile_id"` + LanguageProfileID int64 `yaml:"language_profile_id"` + SeasonFolder *bool `yaml:"season_folder"` + SeriesType string `yaml:"series_type"` + SearchOnAdd *bool `yaml:"search_on_add"` +} + +type httpClient struct { + baseURL *url.URL + apiKey string + httpClient *http.Client +} + +func (c *RadarrConfig) ApplyDefaults() { + if c.MinimumAvailability == "" { + c.MinimumAvailability = "released" + } +} + +func (c RadarrConfig) Enabled() bool { + return strings.TrimSpace(c.URL) != "" || strings.TrimSpace(c.APIKey) != "" +} + +func (c RadarrConfig) Validate() error { + if !c.Enabled() { + return nil + } + switch { + case strings.TrimSpace(c.URL) == "": + return fmt.Errorf("network.content.movies.url must be set when movies content is configured") + case strings.TrimSpace(c.APIKey) == "": + return fmt.Errorf("network.content.movies.api_key must be set when movies content is configured") + case strings.TrimSpace(c.RootFolderPath) == "": + return fmt.Errorf("network.content.movies.root_folder_path must be set when movies content is configured") + case c.QualityProfileID <= 0: + return fmt.Errorf("network.content.movies.quality_profile_id must be set when movies content is configured") + case strings.TrimSpace(c.MinimumAvailability) == "": + return fmt.Errorf("network.content.movies.minimum_availability must not be empty") + default: + return nil + } +} + +func (c RadarrConfig) SearchOnAddValue() bool { + return boolValue(c.SearchOnAdd, true) +} + +func (c *SonarrConfig) ApplyDefaults() { + if c.SeriesType == "" { + c.SeriesType = "standard" + } +} + +func (c SonarrConfig) Enabled() bool { + return strings.TrimSpace(c.URL) != "" || strings.TrimSpace(c.APIKey) != "" +} + +func (c SonarrConfig) Validate() error { + if !c.Enabled() { + return nil + } + switch { + case strings.TrimSpace(c.URL) == "": + return fmt.Errorf("network.content.series.url must be set when series content is configured") + case strings.TrimSpace(c.APIKey) == "": + return fmt.Errorf("network.content.series.api_key must be set when series content is configured") + case strings.TrimSpace(c.RootFolderPath) == "": + return fmt.Errorf("network.content.series.root_folder_path must be set when series content is configured") + case c.QualityProfileID <= 0: + return fmt.Errorf("network.content.series.quality_profile_id must be set when series content is configured") + case c.LanguageProfileID <= 0: + return fmt.Errorf("network.content.series.language_profile_id must be set when series content is configured") + case strings.TrimSpace(c.SeriesType) == "": + return fmt.Errorf("network.content.series.series_type must not be empty") + default: + return nil + } +} + +func (c SonarrConfig) SeasonFolderValue() bool { + return boolValue(c.SeasonFolder, true) +} + +func (c SonarrConfig) SearchOnAddValue() bool { + return boolValue(c.SearchOnAdd, true) +} + +func newHTTPClient(rawURL, apiKey string) (*httpClient, error) { + parsedURL, err := url.Parse(strings.TrimRight(strings.TrimSpace(rawURL), "/")) + if err != nil { + return nil, err + } + return &httpClient{ + baseURL: parsedURL, + apiKey: apiKey, + httpClient: http.DefaultClient, + }, nil +} + +func (c *httpClient) do(ctx context.Context, method, requestPath string, query url.Values, body any, dest any) error { + endpoint := *c.baseURL + endpoint.Path = path.Join(endpoint.Path, requestPath) + if len(query) > 0 { + endpoint.RawQuery = query.Encode() + } + + var payload io.Reader + if body != nil { + data, err := json.Marshal(body) + if err != nil { + return err + } + payload = bytes.NewReader(data) + } + + req, err := http.NewRequestWithContext(ctx, method, endpoint.String(), payload) + if err != nil { + return err + } + req.Header.Set("X-Api-Key", c.apiKey) + if body != nil { + req.Header.Set("Content-Type", "application/json") + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + return fmt.Errorf("%s %s returned %d: %s", method, endpoint.String(), resp.StatusCode, strings.TrimSpace(string(data))) + } + if dest == nil { + return nil + } + return json.NewDecoder(resp.Body).Decode(dest) +} + +func boolValue(value *bool, fallback bool) bool { + if value == nil { + return fallback + } + return *value +} + +func containsFold(haystack, needle string) bool { + return strings.Contains(strings.ToLower(haystack), strings.ToLower(strings.TrimSpace(needle))) +} + +func FormatSearchResult(result SearchResult) string { + if result.Year != 0 { + return fmt.Sprintf("%s (%d)", result.Title, result.Year) + } + return result.Title +} diff --git a/packages/arrtrix/pkg/arrclient/radarr.go b/packages/arrtrix/pkg/arrclient/radarr.go new file mode 100644 index 0000000..21ac1fd --- /dev/null +++ b/packages/arrtrix/pkg/arrclient/radarr.go @@ -0,0 +1,164 @@ +package arrclient + +import ( + "context" + "fmt" + "net/http" + "net/url" + "strconv" + "strings" + + "sneeuwvlok/packages/arrtrix/pkg/arr" +) + +type RadarrClient struct { + http *httpClient + config RadarrConfig +} + +type radarrMovie struct { + ID int64 `json:"id"` + Title string `json:"title"` + Year int `json:"year"` + TMDBID int64 `json:"tmdbId"` + Overview string `json:"overview"` + Monitored bool `json:"monitored"` + Path string `json:"path"` +} + +func NewRadarrClient(config RadarrConfig) (*RadarrClient, error) { + config.ApplyDefaults() + if err := config.Validate(); err != nil { + return nil, err + } + httpClient, err := newHTTPClient(config.URL, config.APIKey) + if err != nil { + return nil, err + } + return &RadarrClient{http: httpClient, config: config}, nil +} + +func (c *RadarrClient) ContentType() arr.ContentType { + return arr.ContentTypeMovies +} + +func (c *RadarrClient) Search(ctx context.Context, query string) ([]SearchResult, error) { + var response []radarrMovie + if err := c.http.do(ctx, http.MethodGet, "/api/v3/movie/lookup", url.Values{"term": {strings.TrimSpace(query)}}, nil, &response); err != nil { + return nil, err + } + + results := make([]SearchResult, 0, len(response)) + for _, movie := range response { + if movie.TMDBID == 0 { + continue + } + results = append(results, SearchResult{ + LookupID: movie.TMDBID, + Title: movie.Title, + Year: movie.Year, + Overview: movie.Overview, + }) + } + return results, nil +} + +func (c *RadarrClient) List(ctx context.Context, query string) ([]ManagedItem, error) { + var response []radarrMovie + if err := c.http.do(ctx, http.MethodGet, "/api/v3/movie", nil, nil, &response); err != nil { + return nil, err + } + + items := make([]ManagedItem, 0, len(response)) + for _, movie := range response { + if query != "" && !containsFold(movie.Title, query) && !containsFold(strconv.Itoa(movie.Year), query) { + continue + } + items = append(items, ManagedItem{ + ID: movie.ID, + LookupID: movie.TMDBID, + Title: movie.Title, + Year: movie.Year, + Monitored: movie.Monitored, + Path: movie.Path, + }) + } + return items, nil +} + +func (c *RadarrClient) Add(ctx context.Context, result SearchResult) (*ManagedItem, error) { + payload := map[string]any{ + "title": result.Title, + "tmdbId": result.LookupID, + "year": result.Year, + "qualityProfileId": c.config.QualityProfileID, + "rootFolderPath": c.config.RootFolderPath, + "minimumAvailability": c.config.MinimumAvailability, + "monitored": true, + "addOptions": map[string]any{ + "searchForMovie": c.config.SearchOnAddValue(), + }, + } + + var response radarrMovie + if err := c.http.do(ctx, http.MethodPost, "/api/v3/movie", nil, payload, &response); err != nil { + return nil, err + } + item := ManagedItem{ + ID: response.ID, + LookupID: response.TMDBID, + Title: response.Title, + Year: response.Year, + Monitored: response.Monitored, + Path: response.Path, + } + return &item, nil +} + +func (c *RadarrClient) SetMonitored(ctx context.Context, id int64, monitored bool) (*ManagedItem, error) { + var movie map[string]any + endpoint := "/api/v3/movie/" + strconv.FormatInt(id, 10) + if err := c.http.do(ctx, http.MethodGet, endpoint, nil, nil, &movie); err != nil { + return nil, err + } + movie["monitored"] = monitored + + var response radarrMovie + if err := c.http.do(ctx, http.MethodPut, endpoint, nil, movie, &response); err != nil { + return nil, err + } + item := ManagedItem{ + ID: response.ID, + LookupID: response.TMDBID, + Title: response.Title, + Year: response.Year, + Monitored: response.Monitored, + Path: response.Path, + } + return &item, nil +} + +func (c *RadarrClient) Delete(ctx context.Context, id int64) error { + return c.http.do(ctx, http.MethodDelete, "/api/v3/movie/"+strconv.FormatInt(id, 10), url.Values{ + "deleteFiles": {"false"}, + "addImportExclusion": {"false"}, + }, nil, nil) +} + +func PickSingleResult(results []SearchResult, query string) (SearchResult, error) { + switch len(results) { + case 0: + return SearchResult{}, fmt.Errorf("no matching result found for %q", query) + case 1: + return results[0], nil + default: + normalized := strings.TrimSpace(strings.ToLower(query)) + for _, result := range results { + title := strings.ToLower(FormatSearchResult(result)) + if title == normalized { + return result, nil + } + } + return SearchResult{}, fmt.Errorf("multiple results matched %q", query) + } +} diff --git a/packages/arrtrix/pkg/arrclient/sonarr.go b/packages/arrtrix/pkg/arrclient/sonarr.go new file mode 100644 index 0000000..9b0691b --- /dev/null +++ b/packages/arrtrix/pkg/arrclient/sonarr.go @@ -0,0 +1,149 @@ +package arrclient + +import ( + "context" + "net/http" + "net/url" + "strconv" + "strings" + + "sneeuwvlok/packages/arrtrix/pkg/arr" +) + +type SonarrClient struct { + http *httpClient + config SonarrConfig +} + +type sonarrSeries struct { + ID int64 `json:"id"` + Title string `json:"title"` + Year int `json:"year"` + TVDBID int64 `json:"tvdbId"` + Overview string `json:"overview"` + Monitored bool `json:"monitored"` + Path string `json:"path"` +} + +func NewSonarrClient(config SonarrConfig) (*SonarrClient, error) { + config.ApplyDefaults() + if err := config.Validate(); err != nil { + return nil, err + } + httpClient, err := newHTTPClient(config.URL, config.APIKey) + if err != nil { + return nil, err + } + return &SonarrClient{http: httpClient, config: config}, nil +} + +func (c *SonarrClient) ContentType() arr.ContentType { + return arr.ContentTypeSeries +} + +func (c *SonarrClient) Search(ctx context.Context, query string) ([]SearchResult, error) { + var response []sonarrSeries + if err := c.http.do(ctx, http.MethodGet, "/api/v3/series/lookup", url.Values{"term": {strings.TrimSpace(query)}}, nil, &response); err != nil { + return nil, err + } + + results := make([]SearchResult, 0, len(response)) + for _, series := range response { + if series.TVDBID == 0 { + continue + } + results = append(results, SearchResult{ + LookupID: series.TVDBID, + Title: series.Title, + Year: series.Year, + Overview: series.Overview, + }) + } + return results, nil +} + +func (c *SonarrClient) List(ctx context.Context, query string) ([]ManagedItem, error) { + var response []sonarrSeries + if err := c.http.do(ctx, http.MethodGet, "/api/v3/series", nil, nil, &response); err != nil { + return nil, err + } + + items := make([]ManagedItem, 0, len(response)) + for _, series := range response { + if query != "" && !containsFold(series.Title, query) && !containsFold(strconv.Itoa(series.Year), query) { + continue + } + items = append(items, ManagedItem{ + ID: series.ID, + LookupID: series.TVDBID, + Title: series.Title, + Year: series.Year, + Monitored: series.Monitored, + Path: series.Path, + }) + } + return items, nil +} + +func (c *SonarrClient) Add(ctx context.Context, result SearchResult) (*ManagedItem, error) { + payload := map[string]any{ + "title": result.Title, + "tvdbId": result.LookupID, + "qualityProfileId": c.config.QualityProfileID, + "languageProfileId": c.config.LanguageProfileID, + "rootFolderPath": c.config.RootFolderPath, + "seasonFolder": c.config.SeasonFolderValue(), + "monitored": true, + "seriesType": c.config.SeriesType, + "addOptions": map[string]any{ + "searchForMissingEpisodes": c.config.SearchOnAddValue(), + }, + } + if result.Year != 0 { + payload["year"] = result.Year + } + + var response sonarrSeries + if err := c.http.do(ctx, http.MethodPost, "/api/v3/series", nil, payload, &response); err != nil { + return nil, err + } + item := ManagedItem{ + ID: response.ID, + LookupID: response.TVDBID, + Title: response.Title, + Year: response.Year, + Monitored: response.Monitored, + Path: response.Path, + } + return &item, nil +} + +func (c *SonarrClient) SetMonitored(ctx context.Context, id int64, monitored bool) (*ManagedItem, error) { + var series map[string]any + endpoint := "/api/v3/series/" + strconv.FormatInt(id, 10) + if err := c.http.do(ctx, http.MethodGet, endpoint, nil, nil, &series); err != nil { + return nil, err + } + series["monitored"] = monitored + + var response sonarrSeries + if err := c.http.do(ctx, http.MethodPut, endpoint, nil, series, &response); err != nil { + return nil, err + } + item := ManagedItem{ + ID: response.ID, + LookupID: response.TVDBID, + Title: response.Title, + Year: response.Year, + Monitored: response.Monitored, + Path: response.Path, + } + return &item, nil +} + +func (c *SonarrClient) Delete(ctx context.Context, id int64) error { + return c.http.do(ctx, http.MethodDelete, "/api/v3/series/"+strconv.FormatInt(id, 10), url.Values{ + "deleteFiles": {"false"}, + "addImportListExclusion": {"false"}, + }, nil, nil) +} diff --git a/packages/arrtrix/pkg/connector/config.go b/packages/arrtrix/pkg/connector/config.go index 2cdec34..149fd32 100644 --- a/packages/arrtrix/pkg/connector/config.go +++ b/packages/arrtrix/pkg/connector/config.go @@ -8,13 +8,23 @@ import ( up "go.mau.fi/util/configupgrade" "maunium.net/go/mautrix/bridgev2" + "sneeuwvlok/packages/arrtrix/pkg/arr" + "sneeuwvlok/packages/arrtrix/pkg/arrclient" + "sneeuwvlok/packages/arrtrix/pkg/subscriptions" "sneeuwvlok/packages/arrtrix/pkg/webhook" ) //go:embed example-config.yaml var ExampleConfig string -type Config struct{} +type Config struct { + Content ContentConfig `yaml:"content"` +} + +type ContentConfig struct { + Movies arrclient.RadarrConfig `yaml:"movies"` + Series arrclient.SonarrConfig `yaml:"series"` +} func upgradeConfig(helper up.Helper) {} @@ -23,6 +33,14 @@ func (s *ArrtrixConnector) GetConfig() (string, any, up.Upgrader) { } func (s *ArrtrixConnector) ValidateConfig() error { + s.Config.Content.Movies.ApplyDefaults() + s.Config.Content.Series.ApplyDefaults() + if err := s.Config.Content.Movies.Validate(); err != nil { + return err + } + if err := s.Config.Content.Series.Validate(); err != nil { + return err + } return nil } @@ -30,7 +48,27 @@ func (s *ArrtrixConnector) MountRoutes(router *http.ServeMux) error { if s.Bridge == nil { return fmt.Errorf("bridge is not initialized") } - return webhook.MountArr(router, s.Bridge) + return webhook.MountArr(router, s.Bridge, s.Subscriptions()) } var _ bridgev2.ConfigValidatingNetwork = (*ArrtrixConnector)(nil) +var _ webhook.SubscriptionFilter = (*subscriptions.Repository)(nil) + +func (c ContentConfig) Client(contentType arr.ContentType) (arrclient.Client, bool, error) { + switch contentType { + case arr.ContentTypeMovies: + if !c.Movies.Enabled() { + return nil, false, nil + } + client, err := arrclient.NewRadarrClient(c.Movies) + return client, true, err + case arr.ContentTypeSeries: + if !c.Series.Enabled() { + return nil, false, nil + } + client, err := arrclient.NewSonarrClient(c.Series) + return client, true, err + default: + return nil, false, fmt.Errorf("unsupported content type %q", contentType) + } +} diff --git a/packages/arrtrix/pkg/connector/connector.go b/packages/arrtrix/pkg/connector/connector.go index 121e94c..4be007a 100644 --- a/packages/arrtrix/pkg/connector/connector.go +++ b/packages/arrtrix/pkg/connector/connector.go @@ -10,11 +10,17 @@ import ( "maunium.net/go/mautrix/bridgev2/networkid" "maunium.net/go/mautrix/event" "maunium.net/go/mautrix/id" + + "sneeuwvlok/packages/arrtrix/pkg/arr" + "sneeuwvlok/packages/arrtrix/pkg/arrclient" + "sneeuwvlok/packages/arrtrix/pkg/subscriptions" ) type ArrtrixConnector struct { - Bridge *bridgev2.Bridge - Config Config + Bridge *bridgev2.Bridge + Config Config + clients map[arr.ContentType]arrclient.Client + subscriptions *subscriptions.Repository } var _ bridgev2.NetworkConnector = (*ArrtrixConnector)(nil) @@ -33,6 +39,17 @@ func (s *ArrtrixConnector) GetName() bridgev2.BridgeName { func (s *ArrtrixConnector) Init(bridge *bridgev2.Bridge) { s.Bridge = bridge + s.subscriptions = subscriptions.NewRepository(bridge.DB.Database, string(bridge.ID)) + s.clients = make(map[arr.ContentType]arrclient.Client) + for _, contentType := range arr.SupportedContentTypes() { + client, ok, err := s.Config.Content.Client(contentType) + if err != nil { + panic(err) + } + if ok { + s.clients[contentType] = client + } + } } func (s *ArrtrixConnector) Start(context.Context) error { @@ -107,3 +124,12 @@ func (c *ArrtrixClient) HandleMatrixMessage(context.Context, *bridgev2.MatrixMes func (c *ArrtrixClient) GenerateTransactionID(userID id.UserID, roomID id.RoomID, eventType event.Type) networkid.RawTransactionID { return networkid.RawTransactionID("") } + +func (s *ArrtrixConnector) ContentClient(contentType arr.ContentType) (arrclient.Client, bool) { + client, ok := s.clients[contentType] + return client, ok +} + +func (s *ArrtrixConnector) Subscriptions() *subscriptions.Repository { + return s.subscriptions +} diff --git a/packages/arrtrix/pkg/connector/example-config.yaml b/packages/arrtrix/pkg/connector/example-config.yaml index 9c11ddf..a917e23 100644 --- a/packages/arrtrix/pkg/connector/example-config.yaml +++ b/packages/arrtrix/pkg/connector/example-config.yaml @@ -1,4 +1,23 @@ -# No network-specific config is required yet. -# +content: + movies: + # Radarr connection for movie management commands. + url: "" + api_key: "" + root_folder_path: "" + quality_profile_id: 0 + minimum_availability: released + search_on_add: true + + series: + # Sonarr connection for series management commands. + url: "" + api_key: "" + root_folder_path: "" + quality_profile_id: 0 + language_profile_id: 0 + season_folder: true + series_type: standard + search_on_add: true + # Arr-stack webhooks are exposed automatically on the fixed built-in path: # POST /_arrtrix/webhook diff --git a/packages/arrtrix/pkg/matrixcmd/download.go b/packages/arrtrix/pkg/matrixcmd/download.go new file mode 100644 index 0000000..6d27a1a --- /dev/null +++ b/packages/arrtrix/pkg/matrixcmd/download.go @@ -0,0 +1,222 @@ +package matrixcmd + +import ( + "fmt" + "strconv" + "strings" + + "maunium.net/go/mautrix/id" + + "sneeuwvlok/packages/arrtrix/pkg/arr" + "sneeuwvlok/packages/arrtrix/pkg/arrclient" + "sneeuwvlok/packages/arrtrix/pkg/subscriptions" +) + +type commandServiceProvider interface { + ContentClient(arr.ContentType) (arrclient.Client, bool) + Subscriptions() *subscriptions.Repository +} + +func NewDownloadHandler() Handler { + return NewHandler(Meta{ + Name: "download", + Description: "Manage monitored movies and series in Arr.", + Usage: " [...]", + }, func(ctx *Context) { + if len(ctx.Args) < 2 { + ctx.Reply("Usage: `download [...]`") + return + } + + contentType, err := arr.ParseContentType(ctx.Args[1]) + if err != nil { + ctx.Reply(err.Error()) + return + } + + client, ok := contentClient(ctx, contentType) + if !ok { + ctx.Reply("No %s client is configured yet.", contentType.Label()) + return + } + + switch strings.ToLower(ctx.Args[0]) { + case "list": + handleDownloadList(ctx, client, contentType) + case "search": + handleDownloadSearch(ctx, client, contentType) + case "add": + handleDownloadAdd(ctx, client, contentType) + case "monitor": + handleDownloadMonitor(ctx, client, contentType) + case "remove": + handleDownloadRemove(ctx, client, contentType) + default: + ctx.Reply("Unknown download subcommand `%s`.", ctx.Args[0]) + } + }) +} + +func handleDownloadList(ctx *Context, client arrclient.Client, contentType arr.ContentType) { + query := strings.TrimSpace(strings.Join(ctx.Args[2:], " ")) + items, err := client.List(ctx.Ctx, query) + if err != nil { + ctx.Reply("Failed to list %s: %v", contentType.Label(), err) + return + } + if len(items) == 0 { + if query == "" { + ctx.Reply("No monitored %s are currently tracked.", contentType.Label()) + } else { + ctx.Reply("No %s matched `%s`.", contentType.Label(), query) + } + return + } + + var builder strings.Builder + builder.WriteString(fmt.Sprintf("Tracked %s:\n", contentType.Label())) + for i, item := range items { + if i == 10 { + builder.WriteString("…\n") + break + } + builder.WriteString(fmt.Sprintf("- `%d` %s — monitored=%t\n", item.ID, formatManagedItem(item), item.Monitored)) + } + ctx.Reply(builder.String()) +} + +func handleDownloadSearch(ctx *Context, client arrclient.Client, contentType arr.ContentType) { + query := strings.TrimSpace(strings.Join(ctx.Args[2:], " ")) + if query == "" { + ctx.Reply("Usage: `download search %s `", contentType.Label()) + return + } + results, err := client.Search(ctx.Ctx, query) + if err != nil { + ctx.Reply("Failed to search %s: %v", contentType.Label(), err) + return + } + replyWithSearchResults(ctx, contentType, query, results) +} + +func handleDownloadAdd(ctx *Context, client arrclient.Client, contentType arr.ContentType) { + query := strings.TrimSpace(strings.Join(ctx.Args[2:], " ")) + if query == "" { + ctx.Reply("Usage: `download add %s `", contentType.Label()) + return + } + results, err := client.Search(ctx.Ctx, query) + if err != nil { + ctx.Reply("Failed to search %s: %v", contentType.Label(), err) + return + } + result, err := arrclient.PickSingleResult(results, query) + if err != nil { + replyWithSearchResults(ctx, contentType, query, results) + return + } + item, err := client.Add(ctx.Ctx, result) + if err != nil { + ctx.Reply("Failed to add %s: %v", contentType.Label(), err) + return + } + ctx.Reply("Added %s to %s with id `%d`.", formatManagedItem(*item), contentType.Label(), item.ID) +} + +func handleDownloadMonitor(ctx *Context, client arrclient.Client, contentType arr.ContentType) { + if len(ctx.Args) < 4 { + ctx.Reply("Usage: `download monitor %s `", contentType.Label()) + return + } + itemID, err := strconv.ParseInt(ctx.Args[2], 10, 64) + if err != nil { + ctx.Reply("Invalid %s id `%s`.", contentType.Label(), ctx.Args[2]) + return + } + + state, err := parseEnabled(ctx.Args[3]) + if err != nil { + ctx.Reply(err.Error()) + return + } + item, err := client.SetMonitored(ctx.Ctx, itemID, state) + if err != nil { + ctx.Reply("Failed to update %s monitoring: %v", contentType.Label(), err) + return + } + ctx.Reply("%s is now monitored=%t.", formatManagedItem(*item), item.Monitored) +} + +func handleDownloadRemove(ctx *Context, client arrclient.Client, contentType arr.ContentType) { + if len(ctx.Args) < 3 { + ctx.Reply("Usage: `download remove %s `", contentType.Label()) + return + } + itemID, err := strconv.ParseInt(ctx.Args[2], 10, 64) + if err != nil { + ctx.Reply("Invalid %s id `%s`.", contentType.Label(), ctx.Args[2]) + return + } + if err = client.Delete(ctx.Ctx, itemID); err != nil { + ctx.Reply("Failed to remove %s: %v", contentType.Label(), err) + return + } + ctx.Reply("Removed `%d` from %s.", itemID, contentType.Label()) +} + +func contentClient(ctx *Context, contentType arr.ContentType) (arrclient.Client, bool) { + provider, ok := ctx.Bridge.Network.(commandServiceProvider) + if !ok { + return nil, false + } + return provider.ContentClient(contentType) +} + +func contentSubscriptions(ctx *Context) *subscriptions.Repository { + provider, ok := ctx.Bridge.Network.(commandServiceProvider) + if !ok { + return nil + } + return provider.Subscriptions() +} + +func replyWithSearchResults(ctx *Context, contentType arr.ContentType, query string, results []arrclient.SearchResult) { + if len(results) == 0 { + ctx.Reply("No %s matched `%s`.", contentType.Label(), query) + return + } + + var builder strings.Builder + builder.WriteString(fmt.Sprintf("Search results for `%s` in %s:\n", query, contentType.Label())) + for i, result := range results { + if i == 8 { + builder.WriteString("…\n") + break + } + builder.WriteString(fmt.Sprintf("- `%d` %s\n", result.LookupID, arrclient.FormatSearchResult(result))) + } + builder.WriteString(fmt.Sprintf("\nRefine the query and rerun `download add %s ` until only one match remains.", contentType.Label())) + ctx.Reply(builder.String()) +} + +func formatManagedItem(item arrclient.ManagedItem) string { + if item.Year != 0 { + return fmt.Sprintf("%s (%d)", item.Title, item.Year) + } + return item.Title +} + +func parseEnabled(value string) (bool, error) { + switch strings.ToLower(strings.TrimSpace(value)) { + case "on", "true", "yes", "enabled": + return true, nil + case "off", "false", "no", "disabled": + return false, nil + default: + return false, fmt.Errorf("expected `on` or `off`, got `%s`", value) + } +} + +func userIDString(userID id.UserID) string { + return userID.String() +} diff --git a/packages/arrtrix/pkg/matrixcmd/help_test.go b/packages/arrtrix/pkg/matrixcmd/help_test.go index b5b325b..73fed6d 100644 --- a/packages/arrtrix/pkg/matrixcmd/help_test.go +++ b/packages/arrtrix/pkg/matrixcmd/help_test.go @@ -32,7 +32,9 @@ func TestFormatHelpManagementRoom(t *testing.T) { for _, fragment := range []string{ "prefixing commands with `!arr` is not required", + "**download** [...] - Manage monitored movies and series in Arr.", "**help** - Show this help message.", + "**subscriptions** [movies|series] [event-type|all] - Manage notification subscriptions by content type and event type.", "Extra help text.", } { if !strings.Contains(out, fragment) { diff --git a/packages/arrtrix/pkg/matrixcmd/processor.go b/packages/arrtrix/pkg/matrixcmd/processor.go index a4f15df..78915ea 100644 --- a/packages/arrtrix/pkg/matrixcmd/processor.go +++ b/packages/arrtrix/pkg/matrixcmd/processor.go @@ -87,6 +87,8 @@ func NewProcessor(bridge *bridgev2.Bridge, texts bridgeconfig.ManagementRoomText alias: make(map[string]string), } proc.Add(NewHelpHandler(proc)) + proc.Add(NewDownloadHandler()) + proc.Add(NewSubscriptionsHandler()) return proc } diff --git a/packages/arrtrix/pkg/matrixcmd/subscriptions.go b/packages/arrtrix/pkg/matrixcmd/subscriptions.go new file mode 100644 index 0000000..ed1a11f --- /dev/null +++ b/packages/arrtrix/pkg/matrixcmd/subscriptions.go @@ -0,0 +1,107 @@ +package matrixcmd + +import ( + "context" + "fmt" + "strings" + + "maunium.net/go/mautrix/id" + + "sneeuwvlok/packages/arrtrix/pkg/arr" + "sneeuwvlok/packages/arrtrix/pkg/subscriptions" +) + +func NewSubscriptionsHandler() Handler { + return NewHandler(Meta{ + Name: "subscriptions", + Aliases: []string{"subscription", "notify"}, + Description: "Manage notification subscriptions by content type and event type.", + Usage: " [movies|series] [event-type|all]", + }, func(ctx *Context) { + repo := contentSubscriptions(ctx) + if repo == nil { + ctx.Reply("Subscription storage is not available.") + return + } + if len(ctx.Args) == 0 || strings.EqualFold(ctx.Args[0], "list") { + handleSubscriptionList(ctx, repo) + return + } + if len(ctx.Args) < 3 { + ctx.Reply("Usage: `subscriptions `") + return + } + + contentType, err := arr.ParseContentType(ctx.Args[1]) + if err != nil { + ctx.Reply(err.Error()) + return + } + eventType, err := arr.ParseEventType(contentType, ctx.Args[2]) + if err != nil { + ctx.Reply(err.Error()) + return + } + + switch strings.ToLower(ctx.Args[0]) { + case "enable": + handleSubscriptionSet(ctx, repo, contentType, eventType, true) + case "disable": + handleSubscriptionSet(ctx, repo, contentType, eventType, false) + default: + ctx.Reply("Unknown subscriptions subcommand `%s`.", ctx.Args[0]) + } + }) +} + +func handleSubscriptionList(ctx *Context, repo subscriptionRepo) { + preferences, err := repo.List(ctx.Ctx, ctx.User.MXID) + if err != nil { + ctx.Reply("Failed to load subscriptions: %v", err) + return + } + + var builder strings.Builder + builder.WriteString("Current notification subscriptions:\n") + for _, contentType := range arr.SupportedContentTypes() { + builder.WriteString(fmt.Sprintf("\n**%s**\n", strings.Title(contentType.Label()))) + for _, eventType := range arr.SupportedEventTypes(contentType) { + enabled := findPreference(preferences, contentType, eventType) + builder.WriteString(fmt.Sprintf("- `%s`: %t\n", eventType, enabled)) + } + } + ctx.Reply(builder.String()) +} + +func handleSubscriptionSet(ctx *Context, repo subscriptionRepo, contentType arr.ContentType, eventType string, enabled bool) { + var err error + if eventType == "all" { + err = repo.SetAll(ctx.Ctx, ctx.User.MXID, contentType, enabled) + } else { + err = repo.Set(ctx.Ctx, ctx.User.MXID, contentType, eventType, enabled) + } + if err != nil { + ctx.Reply("Failed to update subscriptions: %v", err) + return + } + if eventType == "all" { + ctx.Reply("Set all `%s` notifications for %s to %t.", contentType.Label(), userIDString(ctx.User.MXID), enabled) + return + } + ctx.Reply("Set `%s/%s` notifications to %t.", contentType.Label(), eventType, enabled) +} + +type subscriptionRepo interface { + List(ctx context.Context, userID id.UserID) ([]subscriptions.Preference, error) + Set(ctx context.Context, userID id.UserID, contentType arr.ContentType, eventType string, enabled bool) error + SetAll(ctx context.Context, userID id.UserID, contentType arr.ContentType, enabled bool) error +} + +func findPreference(preferences []subscriptions.Preference, contentType arr.ContentType, eventType string) bool { + for _, preference := range preferences { + if preference.ContentType == contentType && preference.EventType == eventType { + return preference.Enabled + } + } + return true +} diff --git a/packages/arrtrix/pkg/runtime/main.go b/packages/arrtrix/pkg/runtime/main.go index 5352c54..c685706 100644 --- a/packages/arrtrix/pkg/runtime/main.go +++ b/packages/arrtrix/pkg/runtime/main.go @@ -34,6 +34,7 @@ import ( "sneeuwvlok/packages/arrtrix/pkg/matrixcmd" "sneeuwvlok/packages/arrtrix/pkg/observability" "sneeuwvlok/packages/arrtrix/pkg/onboarding" + "sneeuwvlok/packages/arrtrix/pkg/subscriptions" ) var configPath = flag.MakeFull("c", "config", "The path to your config file.", "config.yaml").String() @@ -305,6 +306,10 @@ func (m *Main) Init() { Msg("Initializing bridge") m.initDB() + if err = subscriptions.EnsureSchema(ctx, m.DB); err != nil { + m.Log.WithLevel(zerolog.FatalLevel).Err(err).Msg("Failed to initialize subscription schema") + os.Exit(14) + } m.Matrix = matrix.NewConnector(m.Config) m.Matrix.OnWebsocketReplaced = func() { m.TriggerStop(0) diff --git a/packages/arrtrix/pkg/subscriptions/repo.go b/packages/arrtrix/pkg/subscriptions/repo.go new file mode 100644 index 0000000..85c6b57 --- /dev/null +++ b/packages/arrtrix/pkg/subscriptions/repo.go @@ -0,0 +1,141 @@ +package subscriptions + +import ( + "context" + "fmt" + + "go.mau.fi/util/dbutil" + "maunium.net/go/mautrix/id" + + "sneeuwvlok/packages/arrtrix/pkg/arr" +) + +type Preference struct { + ContentType arr.ContentType + EventType string + Enabled bool +} + +type Repository struct { + db *dbutil.Database + bridgeID string +} + +func EnsureSchema(ctx context.Context, db *dbutil.Database) error { + _, err := db.Exec(ctx, ` + CREATE TABLE IF NOT EXISTS arrtrix_subscription ( + bridge_id TEXT NOT NULL, + user_mxid TEXT NOT NULL, + content_type TEXT NOT NULL, + event_type TEXT NOT NULL, + enabled BOOLEAN NOT NULL, + PRIMARY KEY (bridge_id, user_mxid, content_type, event_type) + ) + `) + return err +} + +func NewRepository(db *dbutil.Database, bridgeID string) *Repository { + return &Repository{db: db, bridgeID: bridgeID} +} + +func (r *Repository) EnsureDefaults(ctx context.Context, userID id.UserID) error { + var existing int + if err := r.db.QueryRow(ctx, `SELECT COUNT(*) FROM arrtrix_subscription WHERE bridge_id=$1 AND user_mxid=$2`, r.bridgeID, userID.String()).Scan(&existing); err != nil { + return err + } + if existing > 0 { + return nil + } + + for _, contentType := range arr.SupportedContentTypes() { + for _, eventType := range arr.SupportedEventTypes(contentType) { + if _, err := r.db.Exec(ctx, ` + INSERT INTO arrtrix_subscription (bridge_id, user_mxid, content_type, event_type, enabled) + VALUES ($1, $2, $3, $4, TRUE) + `, r.bridgeID, userID.String(), string(contentType), eventType); err != nil { + return err + } + } + } + return nil +} + +func (r *Repository) List(ctx context.Context, userID id.UserID) ([]Preference, error) { + if err := r.EnsureDefaults(ctx, userID); err != nil { + return nil, err + } + + rows, err := r.db.Query(ctx, ` + SELECT content_type, event_type, enabled + FROM arrtrix_subscription + WHERE bridge_id=$1 AND user_mxid=$2 + ORDER BY content_type, event_type + `, r.bridgeID, userID.String()) + if err != nil { + return nil, err + } + defer rows.Close() + + var preferences []Preference + for rows.Next() { + var contentType string + var preference Preference + if err = rows.Scan(&contentType, &preference.EventType, &preference.Enabled); err != nil { + return nil, err + } + preference.ContentType = arr.ContentType(contentType) + preferences = append(preferences, preference) + } + if err = rows.Err(); err != nil { + return nil, err + } + return preferences, nil +} + +func (r *Repository) Set(ctx context.Context, userID id.UserID, contentType arr.ContentType, eventType string, enabled bool) error { + if err := r.EnsureDefaults(ctx, userID); err != nil { + return err + } + if _, err := r.db.Exec(ctx, ` + INSERT INTO arrtrix_subscription (bridge_id, user_mxid, content_type, event_type, enabled) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (bridge_id, user_mxid, content_type, event_type) + DO UPDATE SET enabled=excluded.enabled + `, r.bridgeID, userID.String(), string(contentType), eventType, enabled); err != nil { + return err + } + return nil +} + +func (r *Repository) SetAll(ctx context.Context, userID id.UserID, contentType arr.ContentType, enabled bool) error { + if err := r.EnsureDefaults(ctx, userID); err != nil { + return err + } + for _, eventType := range arr.SupportedEventTypes(contentType) { + if err := r.Set(ctx, userID, contentType, eventType, enabled); err != nil { + return err + } + } + return nil +} + +func (r *Repository) Allows(ctx context.Context, userID id.UserID, contentType arr.ContentType, eventType string) (bool, error) { + if !arr.SupportsEventType(contentType, eventType) { + return true, nil + } + if err := r.EnsureDefaults(ctx, userID); err != nil { + return false, err + } + + var enabled bool + err := r.db.QueryRow(ctx, ` + SELECT enabled + FROM arrtrix_subscription + WHERE bridge_id=$1 AND user_mxid=$2 AND content_type=$3 AND event_type=$4 + `, r.bridgeID, userID.String(), string(contentType), eventType).Scan(&enabled) + if err != nil { + return false, fmt.Errorf("query subscription: %w", err) + } + return enabled, nil +} diff --git a/packages/arrtrix/pkg/webhook/arr.go b/packages/arrtrix/pkg/webhook/arr.go index eb7540c..5446825 100644 --- a/packages/arrtrix/pkg/webhook/arr.go +++ b/packages/arrtrix/pkg/webhook/arr.go @@ -17,6 +17,7 @@ import ( "maunium.net/go/mautrix/format" "maunium.net/go/mautrix/id" + "sneeuwvlok/packages/arrtrix/pkg/arr" "sneeuwvlok/packages/arrtrix/pkg/observability" ) @@ -28,10 +29,13 @@ var ( ) type payload struct { - EventType string `json:"eventType"` - Movie *movie `json:"movie"` - MovieFile *movieFile `json:"movieFile"` - IsUpgrade bool `json:"isUpgrade"` + EventType string `json:"eventType"` + Movie *movie `json:"movie"` + MovieFile *movieFile `json:"movieFile"` + Series *series `json:"series"` + Episodes []episode `json:"episodes"` + EpisodeFile *episodeFile `json:"episodeFile"` + IsUpgrade bool `json:"isUpgrade"` } type movie struct { @@ -49,26 +53,55 @@ type movieFile struct { ReleaseGroup string `json:"releaseGroup"` } +type series struct { + Title string `json:"title"` + Year int `json:"year"` + Path string `json:"path"` +} + +type episode struct { + SeasonNumber int `json:"seasonNumber"` + EpisodeNumber int `json:"episodeNumber"` + Title string `json:"title"` +} + +type episodeFile struct { + Quality string `json:"quality"` + RelativePath string `json:"relativePath"` + SceneName string `json:"sceneName"` +} + +type managementTarget struct { + UserID id.UserID + RoomID id.RoomID +} + type roomResolver interface { - ResolveManagementRoom(context.Context) (id.RoomID, error) + ResolveManagementRoom(context.Context) (managementTarget, error) } type noticeSender interface { SendNotice(context.Context, id.RoomID, string) error } -type ArrHandler struct { - resolver roomResolver - sender noticeSender +type SubscriptionFilter interface { + Allows(context.Context, id.UserID, arr.ContentType, string) (bool, error) } -func MountArr(router *http.ServeMux, bridge *bridgev2.Bridge) error { +type ArrHandler struct { + resolver roomResolver + sender noticeSender + subscriptions SubscriptionFilter +} + +func MountArr(router *http.ServeMux, bridge *bridgev2.Bridge, subscriptions SubscriptionFilter) error { if bridge == nil { return fmt.Errorf("bridge is not initialized") } handler := &ArrHandler{ - resolver: bridgeRoomResolver{bridge: bridge}, - sender: bridgeNoticeSender{bridge: bridge}, + resolver: bridgeRoomResolver{bridge: bridge}, + sender: bridgeNoticeSender{bridge: bridge}, + subscriptions: subscriptions, } router.Handle(fmt.Sprintf("POST %s", ArrWebhookPath), handler) return nil @@ -109,7 +142,7 @@ func (h *ArrHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { attribute.String("http.route", ArrWebhookPath), ) - roomID, err := h.resolver.ResolveManagementRoom(ctx) + target, err := h.resolver.ResolveManagementRoom(ctx) if err != nil { statusCode = http.StatusInternalServerError outcome = "resolve_failed" @@ -123,7 +156,26 @@ func (h *ArrHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - if err = h.sender.SendNotice(ctx, roomID, renderNotice(body)); err != nil { + contentType, ok := body.ContentType() + if ok && h.subscriptions != nil { + allowed, filterErr := h.subscriptions.Allows(ctx, target.UserID, contentType, body.EventType) + if filterErr != nil { + statusCode = http.StatusInternalServerError + outcome = "subscription_check_failed" + span.RecordError(filterErr) + span.SetStatus(codes.Error, filterErr.Error()) + http.Error(w, "failed to evaluate subscriptions", statusCode) + return + } + if !allowed { + outcome = "filtered" + span.SetStatus(codes.Ok, "filtered") + w.WriteHeader(statusCode) + return + } + } + + if err = h.sender.SendNotice(ctx, target.RoomID, renderNotice(body)); err != nil { statusCode = http.StatusBadGateway outcome = "delivery_failed" span.RecordError(err) @@ -140,7 +192,7 @@ type bridgeRoomResolver struct { bridge *bridgev2.Bridge } -func (r bridgeRoomResolver) ResolveManagementRoom(ctx context.Context) (id.RoomID, error) { +func (r bridgeRoomResolver) ResolveManagementRoom(ctx context.Context) (managementTarget, error) { ctx, span := observability.StartSpan(ctx, "arrtrix.webhook.resolve_management_room") defer span.End() @@ -148,42 +200,45 @@ func (r bridgeRoomResolver) ResolveManagementRoom(ctx context.Context) (id.RoomI if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - return "", fmt.Errorf("failed to query management rooms: %w", err) + return managementTarget{}, fmt.Errorf("failed to query management rooms: %w", err) } defer rows.Close() - var roomID id.RoomID + var target managementTarget var owners []id.UserID for rows.Next() { var mxid, managementRoom string if err = rows.Scan(&mxid, &managementRoom); err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - return "", fmt.Errorf("failed to scan management room: %w", err) + return managementTarget{}, fmt.Errorf("failed to scan management room: %w", err) } owners = append(owners, id.UserID(mxid)) - if roomID == "" { - roomID = id.RoomID(managementRoom) + if target.RoomID == "" { + target = managementTarget{ + UserID: id.UserID(mxid), + RoomID: id.RoomID(managementRoom), + } } } if err = rows.Err(); err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - return "", fmt.Errorf("failed to iterate management rooms: %w", err) + return managementTarget{}, fmt.Errorf("failed to iterate management rooms: %w", err) } switch len(owners) { case 0: span.SetStatus(codes.Error, ErrNoManagementRoom.Error()) - return "", ErrNoManagementRoom + return managementTarget{}, ErrNoManagementRoom case 1: span.SetAttributes(attribute.Int("arrtrix.management_room.count", 1)) span.SetStatus(codes.Ok, "") - return roomID, nil + return target, nil default: span.SetAttributes(attribute.Int("arrtrix.management_room.count", len(owners))) span.SetStatus(codes.Error, ErrAmbiguousManagementRoom.Error()) - return "", fmt.Errorf("%w: %s", ErrAmbiguousManagementRoom, strings.Join(convertUserIDs(owners), ", ")) + return managementTarget{}, fmt.Errorf("%w: %s", ErrAmbiguousManagementRoom, strings.Join(convertUserIDs(owners), ", ")) } } @@ -213,30 +268,48 @@ func (s bridgeNoticeSender) SendNotice(ctx context.Context, roomID id.RoomID, ma } func renderNotice(body payload) string { - title := "Arr" - if body.Movie != nil { - title = body.Movie.Title + lines := []string{fmt.Sprintf("**Arr %s**", body.EventType)} + + switch contentType, ok := body.ContentType(); { + case ok && contentType == arr.ContentTypeMovies: + title := body.Movie.Title if body.Movie.Year != 0 { title = fmt.Sprintf("%s (%d)", title, body.Movie.Year) } + lines = append(lines, fmt.Sprintf("Movie: %s", title)) + if body.MovieFile != nil && body.MovieFile.Quality != "" { + lines = append(lines, fmt.Sprintf("Quality: %s", body.MovieFile.Quality)) + } + if body.MovieFile != nil && body.MovieFile.RelativePath != "" { + lines = append(lines, fmt.Sprintf("File: `%s`", body.MovieFile.RelativePath)) + } + if body.EventType == "Download" { + lines = append(lines, fmt.Sprintf("Upgrade: %t", body.IsUpgrade)) + } + if body.Movie.ImdbID != "" { + lines = append(lines, fmt.Sprintf("IMDb: `%s`", body.Movie.ImdbID)) + } + case ok && contentType == arr.ContentTypeSeries: + title := body.Series.Title + if body.Series.Year != 0 { + title = fmt.Sprintf("%s (%d)", title, body.Series.Year) + } + lines = append(lines, fmt.Sprintf("Series: %s", title)) + if len(body.Episodes) > 0 { + lines = append(lines, fmt.Sprintf("Episodes: %s", renderEpisodes(body.Episodes))) + } + if body.EpisodeFile != nil && body.EpisodeFile.Quality != "" { + lines = append(lines, fmt.Sprintf("Quality: %s", body.EpisodeFile.Quality)) + } + if body.EpisodeFile != nil && body.EpisodeFile.RelativePath != "" { + lines = append(lines, fmt.Sprintf("File: `%s`", body.EpisodeFile.RelativePath)) + } + default: + if body.EventType != "Test" { + lines = append(lines, "Payload received.") + } } - lines := []string{fmt.Sprintf("**Arr %s**", body.EventType)} - if title != "Arr" { - lines = append(lines, fmt.Sprintf("Movie: %s", title)) - } - if body.MovieFile != nil && body.MovieFile.Quality != "" { - lines = append(lines, fmt.Sprintf("Quality: %s", body.MovieFile.Quality)) - } - if body.MovieFile != nil && body.MovieFile.RelativePath != "" { - lines = append(lines, fmt.Sprintf("File: `%s`", body.MovieFile.RelativePath)) - } - if body.EventType == "Download" { - lines = append(lines, fmt.Sprintf("Upgrade: %t", body.IsUpgrade)) - } - if body.Movie != nil && body.Movie.ImdbID != "" { - lines = append(lines, fmt.Sprintf("IMDb: `%s`", body.Movie.ImdbID)) - } return strings.Join(lines, "\n") } @@ -251,3 +324,26 @@ func convertUserIDs(users []id.UserID) []string { var _ roomResolver = bridgeRoomResolver{} var _ noticeSender = bridgeNoticeSender{} var _ http.Handler = (*ArrHandler)(nil) + +func (p payload) ContentType() (arr.ContentType, bool) { + switch { + case p.Movie != nil: + return arr.ContentTypeMovies, true + case p.Series != nil: + return arr.ContentTypeSeries, true + default: + return "", false + } +} + +func renderEpisodes(episodes []episode) string { + parts := make([]string, 0, len(episodes)) + for _, item := range episodes { + if item.Title != "" { + parts = append(parts, fmt.Sprintf("S%02dE%02d %s", item.SeasonNumber, item.EpisodeNumber, item.Title)) + continue + } + parts = append(parts, fmt.Sprintf("S%02dE%02d", item.SeasonNumber, item.EpisodeNumber)) + } + return strings.Join(parts, ", ") +} diff --git a/packages/arrtrix/pkg/webhook/arr_test.go b/packages/arrtrix/pkg/webhook/arr_test.go index b7ac511..246df72 100644 --- a/packages/arrtrix/pkg/webhook/arr_test.go +++ b/packages/arrtrix/pkg/webhook/arr_test.go @@ -12,12 +12,12 @@ import ( ) type stubRoomResolver struct { - roomID id.RoomID + target managementTarget err error } -func (s stubRoomResolver) ResolveManagementRoom(context.Context) (id.RoomID, error) { - return s.roomID, s.err +func (s stubRoomResolver) ResolveManagementRoom(context.Context) (managementTarget, error) { + return s.target, s.err } type stubNoticeSender struct { @@ -34,7 +34,7 @@ func (s *stubNoticeSender) SendNotice(_ context.Context, roomID id.RoomID, messa func TestMountArrRequiresBridge(t *testing.T) { router := http.NewServeMux() - if err := MountArr(router, nil); err == nil { + if err := MountArr(router, nil, nil); err == nil { t.Fatal("expected nil bridge to fail") } } @@ -42,7 +42,7 @@ func TestMountArrRequiresBridge(t *testing.T) { func TestArrHandlerDeliversNotice(t *testing.T) { sender := &stubNoticeSender{} handler := &ArrHandler{ - resolver: stubRoomResolver{roomID: "!room:test"}, + resolver: stubRoomResolver{target: managementTarget{UserID: "@user:test", RoomID: "!room:test"}}, sender: sender, } @@ -85,7 +85,7 @@ func TestRenderNoticeForTestEvent(t *testing.T) { func TestArrHandlerReturnsBadGatewayOnSendFailure(t *testing.T) { handler := &ArrHandler{ - resolver: stubRoomResolver{roomID: "!room:test"}, + resolver: stubRoomResolver{target: managementTarget{UserID: "@user:test", RoomID: "!room:test"}}, sender: &stubNoticeSender{err: errors.New("send failed")}, } @@ -100,7 +100,7 @@ func TestArrHandlerReturnsBadGatewayOnSendFailure(t *testing.T) { func TestArrHandlerRejectsMissingEventType(t *testing.T) { handler := &ArrHandler{ - resolver: stubRoomResolver{roomID: "!room:test"}, + resolver: stubRoomResolver{target: managementTarget{UserID: "@user:test", RoomID: "!room:test"}}, sender: &stubNoticeSender{}, } diff --git a/systems/x86_64-linux/ulmo/default.nix b/systems/x86_64-linux/ulmo/default.nix index 57f57d3..18c5751 100644 --- a/systems/x86_64-linux/ulmo/default.nix +++ b/systems/x86_64-linux/ulmo/default.nix @@ -146,7 +146,7 @@ }; grafana = { - redirectUris = ["http://localhost:9001/login/generic_oauth"]; + redirectUris = ["http://localhost:9100/login/generic_oauth"]; grantTypes = ["authorizationCode"]; responseTypes = ["code"]; };