99 lines
2.6 KiB
Go
99 lines
2.6 KiB
Go
// ingest_email.go – Importiert Emails aus einem IMAP-Ordner in Qdrant
|
||
package brain
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"log/slog"
|
||
"strings"
|
||
"time"
|
||
|
||
pb "github.com/qdrant/go-client/qdrant"
|
||
"google.golang.org/grpc/metadata"
|
||
|
||
"my-brain-importer/internal/agents/tool/email"
|
||
"my-brain-importer/internal/config"
|
||
)
|
||
|
||
// IngestEmailFolder importiert alle Emails aus einem IMAP-Ordner in Qdrant.
|
||
// Gibt Anzahl der importierten Emails zurück.
|
||
// maxEmails = 0 bedeutet: alle (bis max. 500).
|
||
func IngestEmailFolder(acc config.EmailAccount, folder string, maxEmails uint32) (int, error) {
|
||
if maxEmails == 0 {
|
||
maxEmails = 500
|
||
}
|
||
|
||
cl, err := email.ConnectAccount(acc)
|
||
if err != nil {
|
||
return 0, fmt.Errorf("IMAP-Verbindung: %w", err)
|
||
}
|
||
defer cl.Close()
|
||
|
||
slog.Info("Email-Ingest: Lade Emails", "account", acc.Name, "folder", folder, "max", maxEmails)
|
||
msgs, err := cl.FetchWithBody(folder, maxEmails)
|
||
if err != nil {
|
||
return 0, fmt.Errorf("Emails laden: %w", err)
|
||
}
|
||
if len(msgs) == 0 {
|
||
return 0, nil
|
||
}
|
||
|
||
ctx := context.Background()
|
||
ctx = metadata.AppendToOutgoingContext(ctx, "api-key", config.Cfg.Qdrant.APIKey)
|
||
|
||
embClient := config.NewEmbeddingClient()
|
||
conn := config.NewQdrantConn()
|
||
defer conn.Close()
|
||
|
||
ensureCollection(ctx, pb.NewCollectionsClient(conn))
|
||
pointsClient := pb.NewPointsClient(conn)
|
||
|
||
var chunks []chunk
|
||
for _, m := range msgs {
|
||
text := formatEmailForIngest(m)
|
||
if len(strings.TrimSpace(text)) < 20 {
|
||
continue
|
||
}
|
||
source := fmt.Sprintf("email/%s/%s", folder, m.Date)
|
||
chunks = append(chunks, chunk{Text: text, Source: source, Type: "email"})
|
||
}
|
||
|
||
if len(chunks) == 0 {
|
||
return 0, nil
|
||
}
|
||
|
||
slog.Info("Email-Ingest: Starte Embedding", "emails", len(msgs), "chunks", len(chunks))
|
||
|
||
// In Batches von 20 ingesten (Embeddings können langsam sein)
|
||
ingested := 0
|
||
for i := 0; i < len(chunks); i += 20 {
|
||
end := i + 20
|
||
if end > len(chunks) {
|
||
end = len(chunks)
|
||
}
|
||
batch := chunks[i:end]
|
||
if err := ingestChunks(ctx, embClient, pointsClient, batch); err != nil {
|
||
slog.Warn("Email-Ingest Batch-Fehler", "batch_start", i, "fehler", err)
|
||
continue
|
||
}
|
||
ingested += len(batch)
|
||
slog.Info("Email-Ingest Fortschritt", "ingested", ingested, "total", len(chunks))
|
||
time.Sleep(50 * time.Millisecond)
|
||
}
|
||
|
||
return ingested, nil
|
||
}
|
||
|
||
// formatEmailForIngest formatiert eine Email als durchsuchbaren Text.
|
||
func formatEmailForIngest(m email.MessageWithBody) string {
|
||
var sb strings.Builder
|
||
fmt.Fprintf(&sb, "Betreff: %s\n", m.Subject)
|
||
fmt.Fprintf(&sb, "Von: %s\n", m.From)
|
||
fmt.Fprintf(&sb, "Datum: %s\n", m.Date)
|
||
if m.Body != "" {
|
||
sb.WriteString("\n")
|
||
sb.WriteString(m.Body)
|
||
}
|
||
return sb.String()
|
||
}
|