Files
ai-agent/internal/agents/tool/rss/watcher.go
Christoph K. 905981cd1e zwischenstand
2026-03-20 23:24:56 +01:00

169 lines
3.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// rss/watcher.go Überwacht RSS-Feeds und importiert neue Artikel in Qdrant
package rss
import (
"context"
"fmt"
"log/slog"
"strings"
"time"
"github.com/mmcdole/gofeed"
"my-brain-importer/internal/brain"
"my-brain-importer/internal/config"
)
// FeedResult fasst das Ergebnis eines Feed-Imports zusammen.
type FeedResult struct {
URL string
Title string
Imported int
Err error
}
// IngestFeed fetcht einen RSS-Feed und importiert neue Artikel in Qdrant.
// Gibt Anzahl der importierten Artikel zurück.
func IngestFeed(feedURL string) (int, string, error) {
fp := gofeed.NewParser()
fp.Client = gofeed.NewParser().Client // default HTTP client with timeout
feed, err := fp.ParseURL(feedURL)
if err != nil {
return 0, "", fmt.Errorf("Feed-Parsing fehlgeschlagen: %w", err)
}
feedTitle := feed.Title
if feedTitle == "" {
feedTitle = feedURL
}
imported := 0
for _, item := range feed.Items {
text := buildArticleText(item)
if len(strings.TrimSpace(text)) < 20 {
continue
}
source := fmt.Sprintf("rss/%s", feedURL)
if item.Link != "" {
source = item.Link
}
if err := brain.IngestText(text, source, "rss"); err != nil {
slog.Warn("RSS: Artikel konnte nicht importiert werden", "url", item.Link, "fehler", err)
continue
}
imported++
}
return imported, feedTitle, nil
}
// buildArticleText formatiert einen RSS-Artikel als importierbaren Text.
func buildArticleText(item *gofeed.Item) string {
var sb strings.Builder
if item.Title != "" {
fmt.Fprintf(&sb, "# %s\n\n", item.Title)
}
if item.Published != "" {
fmt.Fprintf(&sb, "Veröffentlicht: %s\n", item.Published)
}
if item.Link != "" {
fmt.Fprintf(&sb, "URL: %s\n\n", item.Link)
}
if item.Description != "" {
sb.WriteString(strings.TrimSpace(item.Description))
}
return sb.String()
}
// IngestAllFeeds importiert alle konfigurierten RSS-Feeds.
// Gibt eine Zusammenfassung der Ergebnisse zurück.
func IngestAllFeeds() []FeedResult {
feeds := config.Cfg.RSSFeeds
if len(feeds) == 0 {
return nil
}
results := make([]FeedResult, 0, len(feeds))
for _, f := range feeds {
n, title, err := IngestFeed(f.URL)
results = append(results, FeedResult{
URL: f.URL,
Title: title,
Imported: n,
Err: err,
})
}
return results
}
// FormatResults gibt eine Discord-formatierte Zusammenfassung zurück.
func FormatResults(results []FeedResult) string {
if len(results) == 0 {
return "📭 Keine RSS-Feeds konfiguriert."
}
var sb strings.Builder
for _, r := range results {
if r.Err != nil {
fmt.Fprintf(&sb, "❌ **%s**: %v\n", r.URL, r.Err)
} else {
name := r.Title
if name == "" {
name = r.URL
}
fmt.Fprintf(&sb, "✅ **%s**: %d Artikel importiert\n", name, r.Imported)
}
}
return strings.TrimSpace(sb.String())
}
// Watcher überwacht alle konfigurierten RSS-Feeds in regelmäßigen Abständen.
type Watcher struct {
OnResults func(summary string)
}
// Run startet die RSS-Überwachungsschleife. Blockiert bis ctx abgebrochen wird.
func (w *Watcher) Run(ctx context.Context) {
feeds := config.Cfg.RSSFeeds
if len(feeds) == 0 {
slog.Info("RSS-Watcher: Keine Feeds konfiguriert, beende")
return
}
// Ersten Durchlauf sofort starten
w.runOnce()
// Dann Timer basierend auf minimalem Intervall
minInterval := 24 * time.Hour
for _, f := range feeds {
h := f.IntervalHours
if h <= 0 {
h = 24
}
d := time.Duration(h) * time.Hour
if d < minInterval {
minInterval = d
}
}
ticker := time.NewTicker(minInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
w.runOnce()
}
}
}
func (w *Watcher) runOnce() {
results := IngestAllFeeds()
if w.OnResults != nil && len(results) > 0 {
summary := FormatResults(results)
if summary != "" {
w.OnResults(summary)
}
}
}