- Automatisches Triage-Lernen aus Archiv-Ordnern im Nacht-Ingest: retention_days=0 (Archiv) → wichtig, retention_days>0 → unwichtig - Drei neue Discord-Commands: /email triage-history, triage-correct, triage-search - StoreDecision speichert jetzt Datum + Body-Zusammenfassung (max 200 Zeichen) - MIME-Multipart-Parsing mit PDF-Attachment-Extraktion (FetchWithBodyAndAttachments) - Deterministische IDs basierend auf Absender+Betreff (idempotente Upserts) - Rueckwaertskompatibles Parsing fuer alte Triage-Eintraege ohne Datum/Body Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
389 lines
12 KiB
Go
389 lines
12 KiB
Go
// triage/triage.go – Speichert und sucht Email-Triage-Entscheidungen in Qdrant (RAG-Lernen)
|
||
// Eigenes Package um Import-Zyklen zwischen brain und email zu vermeiden.
|
||
package triage
|
||
|
||
import (
|
||
"context"
|
||
"crypto/sha256"
|
||
"encoding/hex"
|
||
"fmt"
|
||
"log/slog"
|
||
"strings"
|
||
|
||
pb "github.com/qdrant/go-client/qdrant"
|
||
openai "github.com/sashabaranov/go-openai"
|
||
"google.golang.org/grpc/metadata"
|
||
|
||
"my-brain-importer/internal/config"
|
||
)
|
||
|
||
// TriageResult repräsentiert ein Suchergebnis aus vergangenen Triage-Entscheidungen.
|
||
type TriageResult struct {
|
||
Text string
|
||
Score float32
|
||
}
|
||
|
||
// StoreDecision speichert eine Triage-Entscheidung in Qdrant.
|
||
// Bei gleicher Email (Von + Betreff als deterministischer Schlüssel) wird die Entscheidung überschrieben.
|
||
// date und bodySummary sind optional (leerer String = weglassen).
|
||
// bodySummary wird auf max. 200 Zeichen gekürzt.
|
||
func StoreDecision(subject, from, date, bodySummary string, isImportant bool) error {
|
||
label := "wichtig"
|
||
if !isImportant {
|
||
label = "unwichtig"
|
||
}
|
||
|
||
// Kopfzeile immer vorhanden
|
||
header := fmt.Sprintf("Email-Triage | Von: %s | Betreff: %s", from, subject)
|
||
if date != "" {
|
||
header += fmt.Sprintf(" | Datum: %s", date)
|
||
}
|
||
header += fmt.Sprintf(" | Entscheidung: %s", label)
|
||
|
||
// Body-Zusammenfassung anhängen wenn vorhanden
|
||
text := header
|
||
if bodySummary != "" {
|
||
summary := bodySummary
|
||
if len(summary) > 200 {
|
||
summary = summary[:200]
|
||
}
|
||
text = header + "\nBody: " + summary
|
||
}
|
||
|
||
ctx := context.Background()
|
||
ctx = metadata.AppendToOutgoingContext(ctx, "api-key", config.Cfg.Qdrant.APIKey)
|
||
|
||
embClient := config.NewEmbeddingClient()
|
||
embResp, err := embClient.CreateEmbeddings(ctx, openai.EmbeddingRequest{
|
||
Input: []string{text},
|
||
Model: openai.EmbeddingModel(config.Cfg.Embedding.Model),
|
||
})
|
||
if err != nil {
|
||
return fmt.Errorf("embedding: %w", err)
|
||
}
|
||
|
||
conn := config.NewQdrantConn()
|
||
defer conn.Close()
|
||
|
||
// ID basiert auf Von+Betreff — nicht auf dem vollständigen Text, damit
|
||
// Re-Processing derselben Email den bestehenden Eintrag überschreibt.
|
||
id := triageID(from + "|" + subject)
|
||
wait := true
|
||
_, err = pb.NewPointsClient(conn).Upsert(ctx, &pb.UpsertPoints{
|
||
CollectionName: config.Cfg.Qdrant.Collection,
|
||
Points: []*pb.PointStruct{{
|
||
Id: &pb.PointId{
|
||
PointIdOptions: &pb.PointId_Uuid{Uuid: id},
|
||
},
|
||
Vectors: &pb.Vectors{
|
||
VectorsOptions: &pb.Vectors_Vector{
|
||
Vector: &pb.Vector{Data: embResp.Data[0].Embedding},
|
||
},
|
||
},
|
||
Payload: map[string]*pb.Value{
|
||
"text": {Kind: &pb.Value_StringValue{StringValue: text}},
|
||
"source": {Kind: &pb.Value_StringValue{StringValue: "email_triage"}},
|
||
"type": {Kind: &pb.Value_StringValue{StringValue: "email_triage"}},
|
||
},
|
||
}},
|
||
Wait: &wait,
|
||
})
|
||
if err != nil {
|
||
return fmt.Errorf("qdrant upsert: %w", err)
|
||
}
|
||
|
||
slog.Debug("[Triage] Entscheidung gespeichert", "betreff", subject, "wichtig", isImportant)
|
||
return nil
|
||
}
|
||
|
||
// SearchSimilar sucht ähnliche vergangene Triage-Entscheidungen in Qdrant.
|
||
// Gibt bis zu 3 Ergebnisse zurück (nur type=email_triage, Score ≥ 0.7).
|
||
func SearchSimilar(query string) []TriageResult {
|
||
ctx := context.Background()
|
||
ctx = metadata.AppendToOutgoingContext(ctx, "api-key", config.Cfg.Qdrant.APIKey)
|
||
|
||
embClient := config.NewEmbeddingClient()
|
||
embResp, err := embClient.CreateEmbeddings(ctx, openai.EmbeddingRequest{
|
||
Input: []string{query},
|
||
Model: openai.EmbeddingModel(config.Cfg.Embedding.Model),
|
||
})
|
||
if err != nil {
|
||
slog.Warn("[Triage] Embedding für RAG fehlgeschlagen", "fehler", err)
|
||
return nil
|
||
}
|
||
|
||
conn := config.NewQdrantConn()
|
||
defer conn.Close()
|
||
|
||
threshold := float32(0.7)
|
||
result, err := pb.NewPointsClient(conn).Search(ctx, &pb.SearchPoints{
|
||
CollectionName: config.Cfg.Qdrant.Collection,
|
||
Vector: embResp.Data[0].Embedding,
|
||
Limit: 3,
|
||
WithPayload: &pb.WithPayloadSelector{
|
||
SelectorOptions: &pb.WithPayloadSelector_Enable{Enable: true},
|
||
},
|
||
ScoreThreshold: &threshold,
|
||
Filter: &pb.Filter{
|
||
Must: []*pb.Condition{{
|
||
ConditionOneOf: &pb.Condition_Field{
|
||
Field: &pb.FieldCondition{
|
||
Key: "type",
|
||
Match: &pb.Match{
|
||
MatchValue: &pb.Match_Keyword{Keyword: "email_triage"},
|
||
},
|
||
},
|
||
},
|
||
}},
|
||
},
|
||
})
|
||
if err != nil {
|
||
slog.Warn("[Triage] RAG-Suche fehlgeschlagen", "fehler", err)
|
||
return nil
|
||
}
|
||
|
||
var results []TriageResult
|
||
for _, hit := range result.Result {
|
||
text := hit.Payload["text"].GetStringValue()
|
||
if text == "" {
|
||
continue
|
||
}
|
||
results = append(results, TriageResult{Text: text, Score: hit.Score})
|
||
}
|
||
return results
|
||
}
|
||
|
||
// ListRecent gibt die letzten N Triage-Entscheidungen aus Qdrant zurück.
|
||
func ListRecent(limit uint32) ([]TriageResult, error) {
|
||
if limit == 0 {
|
||
limit = 10
|
||
}
|
||
|
||
ctx := context.Background()
|
||
ctx = metadata.AppendToOutgoingContext(ctx, "api-key", config.Cfg.Qdrant.APIKey)
|
||
|
||
conn := config.NewQdrantConn()
|
||
defer conn.Close()
|
||
|
||
pointsClient := pb.NewPointsClient(conn)
|
||
|
||
var results []TriageResult
|
||
var offset *pb.PointId
|
||
|
||
for {
|
||
req := &pb.ScrollPoints{
|
||
CollectionName: config.Cfg.Qdrant.Collection,
|
||
WithPayload: &pb.WithPayloadSelector{
|
||
SelectorOptions: &pb.WithPayloadSelector_Enable{Enable: true},
|
||
},
|
||
Filter: triageFilter(),
|
||
Limit: &limit,
|
||
}
|
||
if offset != nil {
|
||
req.Offset = offset
|
||
}
|
||
|
||
result, err := pointsClient.Scroll(ctx, req)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("scroll fehlgeschlagen: %w", err)
|
||
}
|
||
|
||
for _, pt := range result.Result {
|
||
text := pt.Payload["text"].GetStringValue()
|
||
if text != "" {
|
||
results = append(results, TriageResult{Text: text})
|
||
}
|
||
}
|
||
|
||
if result.NextPageOffset == nil || uint32(len(results)) >= limit {
|
||
break
|
||
}
|
||
offset = result.NextPageOffset
|
||
}
|
||
|
||
if uint32(len(results)) > limit {
|
||
results = results[:limit]
|
||
}
|
||
return results, nil
|
||
}
|
||
|
||
// CorrectDecision sucht eine Triage-Entscheidung per Embedding-Suche und flippt sie (wichtig↔unwichtig).
|
||
// Gibt eine Bestätigungsmeldung zurück.
|
||
func CorrectDecision(query string) (string, error) {
|
||
ctx := context.Background()
|
||
ctx = metadata.AppendToOutgoingContext(ctx, "api-key", config.Cfg.Qdrant.APIKey)
|
||
|
||
embClient := config.NewEmbeddingClient()
|
||
embResp, err := embClient.CreateEmbeddings(ctx, openai.EmbeddingRequest{
|
||
Input: []string{query},
|
||
Model: openai.EmbeddingModel(config.Cfg.Embedding.Model),
|
||
})
|
||
if err != nil {
|
||
return "", fmt.Errorf("embedding: %w", err)
|
||
}
|
||
|
||
conn := config.NewQdrantConn()
|
||
defer conn.Close()
|
||
|
||
threshold := float32(0.8)
|
||
result, err := pb.NewPointsClient(conn).Search(ctx, &pb.SearchPoints{
|
||
CollectionName: config.Cfg.Qdrant.Collection,
|
||
Vector: embResp.Data[0].Embedding,
|
||
Limit: 1,
|
||
WithPayload: &pb.WithPayloadSelector{
|
||
SelectorOptions: &pb.WithPayloadSelector_Enable{Enable: true},
|
||
},
|
||
ScoreThreshold: &threshold,
|
||
Filter: triageFilter(),
|
||
})
|
||
if err != nil {
|
||
return "", fmt.Errorf("suche fehlgeschlagen: %w", err)
|
||
}
|
||
if len(result.Result) == 0 {
|
||
return "", fmt.Errorf("keine passende Triage-Entscheidung gefunden (Score < 0.8)")
|
||
}
|
||
|
||
hit := result.Result[0]
|
||
text := hit.Payload["text"].GetStringValue()
|
||
|
||
// Text parsen: "Email-Triage | Von: X | Betreff: Y | Entscheidung: Z"
|
||
from, subject, wasImportant, err := parseTriageText(text)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
|
||
// Alten Eintrag löschen — ID basiert jetzt auf from|subject
|
||
oldID := triageID(from + "|" + subject)
|
||
pointsClient := pb.NewPointsClient(conn)
|
||
if err := deleteByUUID(ctx, pointsClient, oldID); err != nil {
|
||
return "", fmt.Errorf("löschen fehlgeschlagen: %w", err)
|
||
}
|
||
|
||
// Neuen Eintrag mit geflipptem Label speichern (Datum/Body aus altem Text nicht übertragen)
|
||
newImportant := !wasImportant
|
||
if err := StoreDecision(subject, from, "", "", newImportant); err != nil {
|
||
return "", fmt.Errorf("speichern fehlgeschlagen: %w", err)
|
||
}
|
||
|
||
oldLabel := "wichtig"
|
||
newLabel := "unwichtig"
|
||
if newImportant {
|
||
oldLabel = "unwichtig"
|
||
newLabel = "wichtig"
|
||
}
|
||
|
||
msg := fmt.Sprintf("Korrigiert: '%s' von %s → %s (vorher: %s)", subject, from, newLabel, oldLabel)
|
||
slog.Info("[Triage] Entscheidung korrigiert", "betreff", subject, "von", from, "neu", newLabel)
|
||
return msg, nil
|
||
}
|
||
|
||
// SearchExtended sucht ähnliche Triage-Entscheidungen mit niedrigerem Threshold und konfigurierbarem Limit.
|
||
func SearchExtended(query string, limit uint64) ([]TriageResult, error) {
|
||
ctx := context.Background()
|
||
ctx = metadata.AppendToOutgoingContext(ctx, "api-key", config.Cfg.Qdrant.APIKey)
|
||
|
||
embClient := config.NewEmbeddingClient()
|
||
embResp, err := embClient.CreateEmbeddings(ctx, openai.EmbeddingRequest{
|
||
Input: []string{query},
|
||
Model: openai.EmbeddingModel(config.Cfg.Embedding.Model),
|
||
})
|
||
if err != nil {
|
||
return nil, fmt.Errorf("embedding: %w", err)
|
||
}
|
||
|
||
conn := config.NewQdrantConn()
|
||
defer conn.Close()
|
||
|
||
threshold := float32(0.5)
|
||
result, err := pb.NewPointsClient(conn).Search(ctx, &pb.SearchPoints{
|
||
CollectionName: config.Cfg.Qdrant.Collection,
|
||
Vector: embResp.Data[0].Embedding,
|
||
Limit: limit,
|
||
WithPayload: &pb.WithPayloadSelector{
|
||
SelectorOptions: &pb.WithPayloadSelector_Enable{Enable: true},
|
||
},
|
||
ScoreThreshold: &threshold,
|
||
Filter: triageFilter(),
|
||
})
|
||
if err != nil {
|
||
return nil, fmt.Errorf("suche fehlgeschlagen: %w", err)
|
||
}
|
||
|
||
var results []TriageResult
|
||
for _, hit := range result.Result {
|
||
text := hit.Payload["text"].GetStringValue()
|
||
if text != "" {
|
||
results = append(results, TriageResult{Text: text, Score: hit.Score})
|
||
}
|
||
}
|
||
return results, nil
|
||
}
|
||
|
||
// triageFilter gibt den Standard-Filter für Triage-Einträge zurück.
|
||
func triageFilter() *pb.Filter {
|
||
return &pb.Filter{
|
||
Must: []*pb.Condition{{
|
||
ConditionOneOf: &pb.Condition_Field{
|
||
Field: &pb.FieldCondition{
|
||
Key: "type",
|
||
Match: &pb.Match{
|
||
MatchValue: &pb.Match_Keyword{Keyword: "email_triage"},
|
||
},
|
||
},
|
||
},
|
||
}},
|
||
}
|
||
}
|
||
|
||
// parseTriageText extrahiert Von, Betreff und Entscheidung aus dem gespeicherten Text.
|
||
// Unterstützt sowohl das alte Format (4 Felder) als auch das neue Format mit optionalem Datum und Body.
|
||
// Altes Format: "Email-Triage | Von: X | Betreff: Y | Entscheidung: Z"
|
||
// Neues Format: "Email-Triage | Von: X | Betreff: Y | Datum: D | Entscheidung: Z\nBody: ..."
|
||
func parseTriageText(text string) (from, subject string, isImportant bool, err error) {
|
||
// Body-Zeile abtrennen – nur die Header-Zeile parsen
|
||
headerLine := text
|
||
if idx := strings.Index(text, "\nBody:"); idx >= 0 {
|
||
headerLine = text[:idx]
|
||
}
|
||
|
||
parts := strings.Split(headerLine, " | ")
|
||
if len(parts) < 4 {
|
||
return "", "", false, fmt.Errorf("ungültiges Triage-Format: %s", text)
|
||
}
|
||
from = strings.TrimPrefix(parts[1], "Von: ")
|
||
subject = strings.TrimPrefix(parts[2], "Betreff: ")
|
||
|
||
// Letztes Feld enthält immer "Entscheidung: X", egal ob Datum dazwischen steht
|
||
lastPart := parts[len(parts)-1]
|
||
decision := strings.TrimPrefix(lastPart, "Entscheidung: ")
|
||
isImportant = !strings.Contains(decision, "unwichtig")
|
||
return from, subject, isImportant, nil
|
||
}
|
||
|
||
// deleteByUUID löscht einen einzelnen Punkt aus Qdrant anhand seiner UUID.
|
||
func deleteByUUID(ctx context.Context, client pb.PointsClient, uuid string) error {
|
||
wait := true
|
||
_, err := client.Delete(ctx, &pb.DeletePoints{
|
||
CollectionName: config.Cfg.Qdrant.Collection,
|
||
Points: &pb.PointsSelector{
|
||
PointsSelectorOneOf: &pb.PointsSelector_Points{
|
||
Points: &pb.PointsIdsList{
|
||
Ids: []*pb.PointId{{
|
||
PointIdOptions: &pb.PointId_Uuid{Uuid: uuid},
|
||
}},
|
||
},
|
||
},
|
||
},
|
||
Wait: &wait,
|
||
})
|
||
return err
|
||
}
|
||
|
||
// triageID erzeugt eine deterministische UUID aus einem Schlüssel.
|
||
// Für Email-Triage sollte der Schlüssel "from|subject" sein, damit
|
||
// Re-Processing derselben Email (mit ggf. anderem Body/Datum) upsert statt insert auslöst.
|
||
func triageID(key string) string {
|
||
hash := sha256.Sum256([]byte("email_triage:" + key))
|
||
return hex.EncodeToString(hash[:16])
|
||
}
|