zwischenstand
This commit is contained in:
98
internal/brain/ingest_email.go
Normal file
98
internal/brain/ingest_email.go
Normal file
@@ -0,0 +1,98 @@
|
||||
// ingest_email.go – Importiert Emails aus einem IMAP-Ordner in Qdrant
|
||||
package brain
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
pb "github.com/qdrant/go-client/qdrant"
|
||||
"google.golang.org/grpc/metadata"
|
||||
|
||||
"my-brain-importer/internal/agents/tool/email"
|
||||
"my-brain-importer/internal/config"
|
||||
)
|
||||
|
||||
// IngestEmailFolder importiert alle Emails aus einem IMAP-Ordner in Qdrant.
|
||||
// Gibt Anzahl der importierten Emails zurück.
|
||||
// maxEmails = 0 bedeutet: alle (bis max. 500).
|
||||
func IngestEmailFolder(acc config.EmailAccount, folder string, maxEmails uint32) (int, error) {
|
||||
if maxEmails == 0 {
|
||||
maxEmails = 500
|
||||
}
|
||||
|
||||
cl, err := email.ConnectAccount(acc)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("IMAP-Verbindung: %w", err)
|
||||
}
|
||||
defer cl.Close()
|
||||
|
||||
slog.Info("Email-Ingest: Lade Emails", "account", acc.Name, "folder", folder, "max", maxEmails)
|
||||
msgs, err := cl.FetchWithBody(folder, maxEmails)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("Emails laden: %w", err)
|
||||
}
|
||||
if len(msgs) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
ctx = metadata.AppendToOutgoingContext(ctx, "api-key", config.Cfg.Qdrant.APIKey)
|
||||
|
||||
embClient := config.NewEmbeddingClient()
|
||||
conn := config.NewQdrantConn()
|
||||
defer conn.Close()
|
||||
|
||||
ensureCollection(ctx, pb.NewCollectionsClient(conn))
|
||||
pointsClient := pb.NewPointsClient(conn)
|
||||
|
||||
var chunks []chunk
|
||||
for _, m := range msgs {
|
||||
text := formatEmailForIngest(m)
|
||||
if len(strings.TrimSpace(text)) < 20 {
|
||||
continue
|
||||
}
|
||||
source := fmt.Sprintf("email/%s/%s", folder, m.Date)
|
||||
chunks = append(chunks, chunk{Text: text, Source: source, Type: "email"})
|
||||
}
|
||||
|
||||
if len(chunks) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
slog.Info("Email-Ingest: Starte Embedding", "emails", len(msgs), "chunks", len(chunks))
|
||||
|
||||
// In Batches von 20 ingesten (Embeddings können langsam sein)
|
||||
ingested := 0
|
||||
for i := 0; i < len(chunks); i += 20 {
|
||||
end := i + 20
|
||||
if end > len(chunks) {
|
||||
end = len(chunks)
|
||||
}
|
||||
batch := chunks[i:end]
|
||||
if err := ingestChunks(ctx, embClient, pointsClient, batch); err != nil {
|
||||
slog.Warn("Email-Ingest Batch-Fehler", "batch_start", i, "fehler", err)
|
||||
continue
|
||||
}
|
||||
ingested += len(batch)
|
||||
slog.Info("Email-Ingest Fortschritt", "ingested", ingested, "total", len(chunks))
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
|
||||
return ingested, nil
|
||||
}
|
||||
|
||||
// formatEmailForIngest formatiert eine Email als durchsuchbaren Text.
|
||||
func formatEmailForIngest(m email.MessageWithBody) string {
|
||||
var sb strings.Builder
|
||||
fmt.Fprintf(&sb, "Betreff: %s\n", m.Subject)
|
||||
fmt.Fprintf(&sb, "Von: %s\n", m.From)
|
||||
fmt.Fprintf(&sb, "Datum: %s\n", m.Date)
|
||||
if m.Body != "" {
|
||||
sb.WriteString("\n")
|
||||
sb.WriteString(m.Body)
|
||||
}
|
||||
return sb.String()
|
||||
}
|
||||
Reference in New Issue
Block a user