169 lines
3.8 KiB
Go
169 lines
3.8 KiB
Go
// rss/watcher.go – Überwacht RSS-Feeds und importiert neue Artikel in Qdrant
|
||
package rss
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"log/slog"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/mmcdole/gofeed"
|
||
|
||
"my-brain-importer/internal/brain"
|
||
"my-brain-importer/internal/config"
|
||
)
|
||
|
||
// FeedResult fasst das Ergebnis eines Feed-Imports zusammen.
|
||
type FeedResult struct {
|
||
URL string
|
||
Title string
|
||
Imported int
|
||
Err error
|
||
}
|
||
|
||
// IngestFeed fetcht einen RSS-Feed und importiert neue Artikel in Qdrant.
|
||
// Gibt Anzahl der importierten Artikel zurück.
|
||
func IngestFeed(feedURL string) (int, string, error) {
|
||
fp := gofeed.NewParser()
|
||
fp.Client = gofeed.NewParser().Client // default HTTP client with timeout
|
||
feed, err := fp.ParseURL(feedURL)
|
||
if err != nil {
|
||
return 0, "", fmt.Errorf("Feed-Parsing fehlgeschlagen: %w", err)
|
||
}
|
||
|
||
feedTitle := feed.Title
|
||
if feedTitle == "" {
|
||
feedTitle = feedURL
|
||
}
|
||
|
||
imported := 0
|
||
for _, item := range feed.Items {
|
||
text := buildArticleText(item)
|
||
if len(strings.TrimSpace(text)) < 20 {
|
||
continue
|
||
}
|
||
source := fmt.Sprintf("rss/%s", feedURL)
|
||
if item.Link != "" {
|
||
source = item.Link
|
||
}
|
||
if err := brain.IngestText(text, source, "rss"); err != nil {
|
||
slog.Warn("RSS: Artikel konnte nicht importiert werden", "url", item.Link, "fehler", err)
|
||
continue
|
||
}
|
||
imported++
|
||
}
|
||
return imported, feedTitle, nil
|
||
}
|
||
|
||
// buildArticleText formatiert einen RSS-Artikel als importierbaren Text.
|
||
func buildArticleText(item *gofeed.Item) string {
|
||
var sb strings.Builder
|
||
if item.Title != "" {
|
||
fmt.Fprintf(&sb, "# %s\n\n", item.Title)
|
||
}
|
||
if item.Published != "" {
|
||
fmt.Fprintf(&sb, "Veröffentlicht: %s\n", item.Published)
|
||
}
|
||
if item.Link != "" {
|
||
fmt.Fprintf(&sb, "URL: %s\n\n", item.Link)
|
||
}
|
||
if item.Description != "" {
|
||
sb.WriteString(strings.TrimSpace(item.Description))
|
||
}
|
||
return sb.String()
|
||
}
|
||
|
||
// IngestAllFeeds importiert alle konfigurierten RSS-Feeds.
|
||
// Gibt eine Zusammenfassung der Ergebnisse zurück.
|
||
func IngestAllFeeds() []FeedResult {
|
||
feeds := config.Cfg.RSSFeeds
|
||
if len(feeds) == 0 {
|
||
return nil
|
||
}
|
||
|
||
results := make([]FeedResult, 0, len(feeds))
|
||
for _, f := range feeds {
|
||
n, title, err := IngestFeed(f.URL)
|
||
results = append(results, FeedResult{
|
||
URL: f.URL,
|
||
Title: title,
|
||
Imported: n,
|
||
Err: err,
|
||
})
|
||
}
|
||
return results
|
||
}
|
||
|
||
// FormatResults gibt eine Discord-formatierte Zusammenfassung zurück.
|
||
func FormatResults(results []FeedResult) string {
|
||
if len(results) == 0 {
|
||
return "📭 Keine RSS-Feeds konfiguriert."
|
||
}
|
||
var sb strings.Builder
|
||
for _, r := range results {
|
||
if r.Err != nil {
|
||
fmt.Fprintf(&sb, "❌ **%s**: %v\n", r.URL, r.Err)
|
||
} else {
|
||
name := r.Title
|
||
if name == "" {
|
||
name = r.URL
|
||
}
|
||
fmt.Fprintf(&sb, "✅ **%s**: %d Artikel importiert\n", name, r.Imported)
|
||
}
|
||
}
|
||
return strings.TrimSpace(sb.String())
|
||
}
|
||
|
||
// Watcher überwacht alle konfigurierten RSS-Feeds in regelmäßigen Abständen.
|
||
type Watcher struct {
|
||
OnResults func(summary string)
|
||
}
|
||
|
||
// Run startet die RSS-Überwachungsschleife. Blockiert bis ctx abgebrochen wird.
|
||
func (w *Watcher) Run(ctx context.Context) {
|
||
feeds := config.Cfg.RSSFeeds
|
||
if len(feeds) == 0 {
|
||
slog.Info("RSS-Watcher: Keine Feeds konfiguriert, beende")
|
||
return
|
||
}
|
||
|
||
// Ersten Durchlauf sofort starten
|
||
w.runOnce()
|
||
|
||
// Dann Timer basierend auf minimalem Intervall
|
||
minInterval := 24 * time.Hour
|
||
for _, f := range feeds {
|
||
h := f.IntervalHours
|
||
if h <= 0 {
|
||
h = 24
|
||
}
|
||
d := time.Duration(h) * time.Hour
|
||
if d < minInterval {
|
||
minInterval = d
|
||
}
|
||
}
|
||
|
||
ticker := time.NewTicker(minInterval)
|
||
defer ticker.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case <-ticker.C:
|
||
w.runOnce()
|
||
}
|
||
}
|
||
}
|
||
|
||
func (w *Watcher) runOnce() {
|
||
results := IngestAllFeeds()
|
||
if w.OnResults != nil && len(results) > 0 {
|
||
summary := FormatResults(results)
|
||
if summary != "" {
|
||
w.OnResults(summary)
|
||
}
|
||
}
|
||
}
|