// triage/triage.go – Speichert und sucht Email-Triage-Entscheidungen in Qdrant (RAG-Lernen) // Eigenes Package um Import-Zyklen zwischen brain und email zu vermeiden. package triage import ( "context" "crypto/sha256" "encoding/hex" "fmt" "log/slog" "strings" pb "github.com/qdrant/go-client/qdrant" openai "github.com/sashabaranov/go-openai" "google.golang.org/grpc/metadata" "my-brain-importer/internal/config" ) // TriageResult repräsentiert ein Suchergebnis aus vergangenen Triage-Entscheidungen. type TriageResult struct { Text string Score float32 } // StoreDecision speichert eine Triage-Entscheidung in Qdrant. // Bei gleicher Email (Von + Betreff als deterministischer Schlüssel) wird die Entscheidung überschrieben. // date und bodySummary sind optional (leerer String = weglassen). // bodySummary wird auf max. 200 Zeichen gekürzt. func StoreDecision(subject, from, date, bodySummary string, isImportant bool) error { label := "wichtig" if !isImportant { label = "unwichtig" } // Kopfzeile immer vorhanden header := fmt.Sprintf("Email-Triage | Von: %s | Betreff: %s", from, subject) if date != "" { header += fmt.Sprintf(" | Datum: %s", date) } header += fmt.Sprintf(" | Entscheidung: %s", label) // Body-Zusammenfassung anhängen wenn vorhanden text := header if bodySummary != "" { summary := bodySummary if len(summary) > 200 { summary = summary[:200] } text = header + "\nBody: " + summary } ctx := context.Background() ctx = metadata.AppendToOutgoingContext(ctx, "api-key", config.Cfg.Qdrant.APIKey) embClient := config.NewEmbeddingClient() embResp, err := embClient.CreateEmbeddings(ctx, openai.EmbeddingRequest{ Input: []string{text}, Model: openai.EmbeddingModel(config.Cfg.Embedding.Model), }) if err != nil { return fmt.Errorf("embedding: %w", err) } conn := config.NewQdrantConn() defer conn.Close() // ID basiert auf Von+Betreff — nicht auf dem vollständigen Text, damit // Re-Processing derselben Email den bestehenden Eintrag überschreibt. id := triageID(from + "|" + subject) wait := true _, err = pb.NewPointsClient(conn).Upsert(ctx, &pb.UpsertPoints{ CollectionName: config.Cfg.Qdrant.Collection, Points: []*pb.PointStruct{{ Id: &pb.PointId{ PointIdOptions: &pb.PointId_Uuid{Uuid: id}, }, Vectors: &pb.Vectors{ VectorsOptions: &pb.Vectors_Vector{ Vector: &pb.Vector{Data: embResp.Data[0].Embedding}, }, }, Payload: map[string]*pb.Value{ "text": {Kind: &pb.Value_StringValue{StringValue: text}}, "source": {Kind: &pb.Value_StringValue{StringValue: "email_triage"}}, "type": {Kind: &pb.Value_StringValue{StringValue: "email_triage"}}, }, }}, Wait: &wait, }) if err != nil { return fmt.Errorf("qdrant upsert: %w", err) } slog.Debug("[Triage] Entscheidung gespeichert", "betreff", subject, "wichtig", isImportant) return nil } // SearchSimilar sucht ähnliche vergangene Triage-Entscheidungen in Qdrant. // Gibt bis zu 3 Ergebnisse zurück (nur type=email_triage, Score ≥ 0.7). func SearchSimilar(query string) []TriageResult { ctx := context.Background() ctx = metadata.AppendToOutgoingContext(ctx, "api-key", config.Cfg.Qdrant.APIKey) embClient := config.NewEmbeddingClient() embResp, err := embClient.CreateEmbeddings(ctx, openai.EmbeddingRequest{ Input: []string{query}, Model: openai.EmbeddingModel(config.Cfg.Embedding.Model), }) if err != nil { slog.Warn("[Triage] Embedding für RAG fehlgeschlagen", "fehler", err) return nil } conn := config.NewQdrantConn() defer conn.Close() threshold := float32(0.7) result, err := pb.NewPointsClient(conn).Search(ctx, &pb.SearchPoints{ CollectionName: config.Cfg.Qdrant.Collection, Vector: embResp.Data[0].Embedding, Limit: 3, WithPayload: &pb.WithPayloadSelector{ SelectorOptions: &pb.WithPayloadSelector_Enable{Enable: true}, }, ScoreThreshold: &threshold, Filter: &pb.Filter{ Must: []*pb.Condition{{ ConditionOneOf: &pb.Condition_Field{ Field: &pb.FieldCondition{ Key: "type", Match: &pb.Match{ MatchValue: &pb.Match_Keyword{Keyword: "email_triage"}, }, }, }, }}, }, }) if err != nil { slog.Warn("[Triage] RAG-Suche fehlgeschlagen", "fehler", err) return nil } var results []TriageResult for _, hit := range result.Result { text := hit.Payload["text"].GetStringValue() if text == "" { continue } results = append(results, TriageResult{Text: text, Score: hit.Score}) } return results } // ListRecent gibt die letzten N Triage-Entscheidungen aus Qdrant zurück. func ListRecent(limit uint32) ([]TriageResult, error) { if limit == 0 { limit = 10 } ctx := context.Background() ctx = metadata.AppendToOutgoingContext(ctx, "api-key", config.Cfg.Qdrant.APIKey) conn := config.NewQdrantConn() defer conn.Close() pointsClient := pb.NewPointsClient(conn) var results []TriageResult var offset *pb.PointId for { req := &pb.ScrollPoints{ CollectionName: config.Cfg.Qdrant.Collection, WithPayload: &pb.WithPayloadSelector{ SelectorOptions: &pb.WithPayloadSelector_Enable{Enable: true}, }, Filter: triageFilter(), Limit: &limit, } if offset != nil { req.Offset = offset } result, err := pointsClient.Scroll(ctx, req) if err != nil { return nil, fmt.Errorf("scroll fehlgeschlagen: %w", err) } for _, pt := range result.Result { text := pt.Payload["text"].GetStringValue() if text != "" { results = append(results, TriageResult{Text: text}) } } if result.NextPageOffset == nil || uint32(len(results)) >= limit { break } offset = result.NextPageOffset } if uint32(len(results)) > limit { results = results[:limit] } return results, nil } // CorrectDecision sucht eine Triage-Entscheidung per Embedding-Suche und flippt sie (wichtig↔unwichtig). // Gibt eine Bestätigungsmeldung zurück. func CorrectDecision(query string) (string, error) { ctx := context.Background() ctx = metadata.AppendToOutgoingContext(ctx, "api-key", config.Cfg.Qdrant.APIKey) embClient := config.NewEmbeddingClient() embResp, err := embClient.CreateEmbeddings(ctx, openai.EmbeddingRequest{ Input: []string{query}, Model: openai.EmbeddingModel(config.Cfg.Embedding.Model), }) if err != nil { return "", fmt.Errorf("embedding: %w", err) } conn := config.NewQdrantConn() defer conn.Close() threshold := float32(0.8) result, err := pb.NewPointsClient(conn).Search(ctx, &pb.SearchPoints{ CollectionName: config.Cfg.Qdrant.Collection, Vector: embResp.Data[0].Embedding, Limit: 1, WithPayload: &pb.WithPayloadSelector{ SelectorOptions: &pb.WithPayloadSelector_Enable{Enable: true}, }, ScoreThreshold: &threshold, Filter: triageFilter(), }) if err != nil { return "", fmt.Errorf("suche fehlgeschlagen: %w", err) } if len(result.Result) == 0 { return "", fmt.Errorf("keine passende Triage-Entscheidung gefunden (Score < 0.8)") } hit := result.Result[0] text := hit.Payload["text"].GetStringValue() // Text parsen: "Email-Triage | Von: X | Betreff: Y | Entscheidung: Z" from, subject, wasImportant, err := parseTriageText(text) if err != nil { return "", err } // Alten Eintrag löschen — ID basiert jetzt auf from|subject oldID := triageID(from + "|" + subject) pointsClient := pb.NewPointsClient(conn) if err := deleteByUUID(ctx, pointsClient, oldID); err != nil { return "", fmt.Errorf("löschen fehlgeschlagen: %w", err) } // Neuen Eintrag mit geflipptem Label speichern (Datum/Body aus altem Text nicht übertragen) newImportant := !wasImportant if err := StoreDecision(subject, from, "", "", newImportant); err != nil { return "", fmt.Errorf("speichern fehlgeschlagen: %w", err) } oldLabel := "wichtig" newLabel := "unwichtig" if newImportant { oldLabel = "unwichtig" newLabel = "wichtig" } msg := fmt.Sprintf("Korrigiert: '%s' von %s → %s (vorher: %s)", subject, from, newLabel, oldLabel) slog.Info("[Triage] Entscheidung korrigiert", "betreff", subject, "von", from, "neu", newLabel) return msg, nil } // SearchExtended sucht ähnliche Triage-Entscheidungen mit niedrigerem Threshold und konfigurierbarem Limit. func SearchExtended(query string, limit uint64) ([]TriageResult, error) { ctx := context.Background() ctx = metadata.AppendToOutgoingContext(ctx, "api-key", config.Cfg.Qdrant.APIKey) embClient := config.NewEmbeddingClient() embResp, err := embClient.CreateEmbeddings(ctx, openai.EmbeddingRequest{ Input: []string{query}, Model: openai.EmbeddingModel(config.Cfg.Embedding.Model), }) if err != nil { return nil, fmt.Errorf("embedding: %w", err) } conn := config.NewQdrantConn() defer conn.Close() threshold := float32(0.5) result, err := pb.NewPointsClient(conn).Search(ctx, &pb.SearchPoints{ CollectionName: config.Cfg.Qdrant.Collection, Vector: embResp.Data[0].Embedding, Limit: limit, WithPayload: &pb.WithPayloadSelector{ SelectorOptions: &pb.WithPayloadSelector_Enable{Enable: true}, }, ScoreThreshold: &threshold, Filter: triageFilter(), }) if err != nil { return nil, fmt.Errorf("suche fehlgeschlagen: %w", err) } var results []TriageResult for _, hit := range result.Result { text := hit.Payload["text"].GetStringValue() if text != "" { results = append(results, TriageResult{Text: text, Score: hit.Score}) } } return results, nil } // triageFilter gibt den Standard-Filter für Triage-Einträge zurück. func triageFilter() *pb.Filter { return &pb.Filter{ Must: []*pb.Condition{{ ConditionOneOf: &pb.Condition_Field{ Field: &pb.FieldCondition{ Key: "type", Match: &pb.Match{ MatchValue: &pb.Match_Keyword{Keyword: "email_triage"}, }, }, }, }}, } } // parseTriageText extrahiert Von, Betreff und Entscheidung aus dem gespeicherten Text. // Unterstützt sowohl das alte Format (4 Felder) als auch das neue Format mit optionalem Datum und Body. // Altes Format: "Email-Triage | Von: X | Betreff: Y | Entscheidung: Z" // Neues Format: "Email-Triage | Von: X | Betreff: Y | Datum: D | Entscheidung: Z\nBody: ..." func parseTriageText(text string) (from, subject string, isImportant bool, err error) { // Body-Zeile abtrennen – nur die Header-Zeile parsen headerLine := text if idx := strings.Index(text, "\nBody:"); idx >= 0 { headerLine = text[:idx] } parts := strings.Split(headerLine, " | ") if len(parts) < 4 { return "", "", false, fmt.Errorf("ungültiges Triage-Format: %s", text) } from = strings.TrimPrefix(parts[1], "Von: ") subject = strings.TrimPrefix(parts[2], "Betreff: ") // Letztes Feld enthält immer "Entscheidung: X", egal ob Datum dazwischen steht lastPart := parts[len(parts)-1] decision := strings.TrimPrefix(lastPart, "Entscheidung: ") isImportant = !strings.Contains(decision, "unwichtig") return from, subject, isImportant, nil } // deleteByUUID löscht einen einzelnen Punkt aus Qdrant anhand seiner UUID. func deleteByUUID(ctx context.Context, client pb.PointsClient, uuid string) error { wait := true _, err := client.Delete(ctx, &pb.DeletePoints{ CollectionName: config.Cfg.Qdrant.Collection, Points: &pb.PointsSelector{ PointsSelectorOneOf: &pb.PointsSelector_Points{ Points: &pb.PointsIdsList{ Ids: []*pb.PointId{{ PointIdOptions: &pb.PointId_Uuid{Uuid: uuid}, }}, }, }, }, Wait: &wait, }) return err } // triageID erzeugt eine deterministische UUID aus einem Schlüssel. // Für Email-Triage sollte der Schlüssel "from|subject" sein, damit // Re-Processing derselben Email (mit ggf. anderem Body/Datum) upsert statt insert auslöst. func triageID(key string) string { hash := sha256.Sum256([]byte("email_triage:" + key)) return hex.EncodeToString(hash[:16]) }