Files
ai-agent/internal/brain/ingest.go
Christoph K. 3c8d3873dc chuns erhoeht
2026-03-12 19:24:14 +01:00

259 lines
7.3 KiB
Go
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// ingest.go Importiert Markdown-Dateien in Qdrant
package brain
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"log"
"os"
"path/filepath"
"strings"
"time"
pb "github.com/qdrant/go-client/qdrant"
openai "github.com/sashabaranov/go-openai"
"google.golang.org/grpc/metadata"
"my-brain-importer/internal/config"
)
const maxChunkSize = 800
// generateID erstellt eine deterministische ID via SHA256.
// Gleicher Chunk → gleiche ID → kein Duplikat bei erneutem Import.
func generateID(text, source string) string {
hash := sha256.Sum256([]byte(source + ":" + text))
return hex.EncodeToString(hash[:16])
}
// RunIngest importiert alle Markdown-Dateien aus brainRoot in Qdrant.
func RunIngest(brainRoot string) {
ctx := context.Background()
ctx = metadata.AppendToOutgoingContext(ctx, "api-key", config.Cfg.Qdrant.APIKey)
fmt.Printf("📂 Verzeichnis: %s\n", brainRoot)
fmt.Printf("🗄️ Qdrant: %s:%s, Collection: %s\n", config.Cfg.Qdrant.Host, config.Cfg.Qdrant.Port, config.Cfg.Qdrant.Collection)
fmt.Printf("🤖 Embedding: %s (%s)\n\n", config.Cfg.Embedding.Model, config.Cfg.Embedding.URL)
embClient := config.NewEmbeddingClient()
conn := config.NewQdrantConn()
defer conn.Close()
ensureCollection(ctx, pb.NewCollectionsClient(conn))
pointsClient := pb.NewPointsClient(conn)
files := collectMarkdownFiles(brainRoot)
fmt.Printf("📄 %d Markdown-Dateien gefunden\n\n", len(files))
totalChunks := 0
for _, filePath := range files {
relPath, _ := filepath.Rel(brainRoot, filePath)
chunks := readAndChunk(filePath, relPath)
if len(chunks) == 0 {
continue
}
fmt.Printf(" %-50s %d Chunks\n", relPath, len(chunks))
if err := ingestChunks(ctx, embClient, pointsClient, chunks); err != nil {
log.Printf(" ⚠️ Fehler bei %s: %v", relPath, err)
continue
}
totalChunks += len(chunks)
time.Sleep(100 * time.Millisecond)
}
fmt.Printf("\n✅ Import abgeschlossen: %d Chunks aus %d Dateien\n", totalChunks, len(files))
fmt.Printf("🌐 Dashboard: http://%s:6333/dashboard\n", config.Cfg.Qdrant.Host)
}
func ensureCollection(ctx context.Context, client pb.CollectionsClient) {
_, err := client.Create(ctx, &pb.CreateCollection{
CollectionName: config.Cfg.Qdrant.Collection,
VectorsConfig: &pb.VectorsConfig{
Config: &pb.VectorsConfig_Params{
Params: &pb.VectorParams{
Size: config.Cfg.Embedding.Dimensions,
Distance: pb.Distance_Cosine,
},
},
},
})
if err != nil {
if strings.Contains(err.Error(), "already exists") {
fmt.Printf("✅ Collection \"%s\" existiert bereits\n", config.Cfg.Qdrant.Collection)
} else {
log.Fatalf("❌ Collection konnte nicht erstellt werden: %v", err)
}
} else {
fmt.Printf("✅ Collection \"%s\" erstellt\n", config.Cfg.Qdrant.Collection)
}
}
func collectMarkdownFiles(root string) []string {
var files []string
filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return nil
}
if info.IsDir() && (strings.Contains(path, "05_Agents") || strings.HasSuffix(path, ".git")) {
return filepath.SkipDir
}
if !info.IsDir() && strings.ToLower(filepath.Ext(path)) == ".md" {
files = append(files, path)
}
return nil
})
return files
}
type chunk struct {
Text string
Source string
Type string
}
func readAndChunk(filePath, relPath string) []chunk {
data, err := os.ReadFile(filePath)
if err != nil {
log.Printf("⚠️ Datei nicht lesbar: %s", filePath)
return nil
}
content := strings.TrimSpace(string(data))
if content == "" {
return nil
}
var chunks []chunk
for _, section := range splitByHeadings(content) {
section = strings.TrimSpace(section)
if len(section) < 20 {
continue
}
for _, text := range splitLongSection(section) {
chunks = append(chunks, chunk{Text: text, Source: relPath, Type: "text"})
}
}
return chunks
}
func splitByHeadings(text string) []string {
lines := strings.Split(text, "\n")
var sections []string
var current strings.Builder
for _, line := range lines {
if strings.HasPrefix(line, "# ") || strings.HasPrefix(line, "## ") || strings.HasPrefix(line, "### ") {
if current.Len() > 0 {
sections = append(sections, current.String())
current.Reset()
}
}
current.WriteString(line)
current.WriteString("\n")
}
if current.Len() > 0 {
sections = append(sections, current.String())
}
return sections
}
func splitLongSection(section string) []string {
if len(section) <= maxChunkSize {
return []string{section}
}
paragraphs := strings.Split(section, "\n\n")
var chunks []string
var current strings.Builder
for _, para := range paragraphs {
para = strings.TrimSpace(para)
if para == "" {
continue
}
if current.Len()+len(para) > maxChunkSize && current.Len() > 0 {
chunks = append(chunks, current.String())
current.Reset()
}
if current.Len() > 0 {
current.WriteString("\n\n")
}
current.WriteString(para)
}
if current.Len() > 0 {
chunks = append(chunks, current.String())
}
return chunks
}
func ingestChunks(ctx context.Context, embClient *openai.Client, pointsClient pb.PointsClient, chunks []chunk) error {
texts := make([]string, len(chunks))
for i, c := range chunks {
texts[i] = c.Text
}
batchSize := 10
var points []*pb.PointStruct
for i := 0; i < len(texts); i += batchSize {
end := i + batchSize
if end > len(texts) {
end = len(texts)
}
embResp, err := embClient.CreateEmbeddings(ctx, openai.EmbeddingRequest{
Input: texts[i:end],
Model: openai.EmbeddingModel(config.Cfg.Embedding.Model),
})
if err != nil {
return fmt.Errorf("Embedding fehlgeschlagen: %w", err)
}
for j, emb := range embResp.Data {
c := chunks[i+j]
points = append(points, &pb.PointStruct{
Id: &pb.PointId{
PointIdOptions: &pb.PointId_Uuid{Uuid: generateID(c.Text, c.Source)},
},
Vectors: &pb.Vectors{
VectorsOptions: &pb.Vectors_Vector{
Vector: &pb.Vector{Data: emb.Embedding},
},
},
Payload: map[string]*pb.Value{
"text": {Kind: &pb.Value_StringValue{StringValue: c.Text}},
"source": {Kind: &pb.Value_StringValue{StringValue: c.Source}},
"type": {Kind: &pb.Value_StringValue{StringValue: c.Type}},
},
})
}
}
_, err := pointsClient.Upsert(ctx, &pb.UpsertPoints{
CollectionName: config.Cfg.Qdrant.Collection,
Points: points,
Wait: boolPtr(true),
})
return err
}
// IngestChatMessage speichert eine Chat-Nachricht in Qdrant.
// Lange Nachrichten (> maxChunkSize) werden automatisch aufgeteilt.
func IngestChatMessage(text, author, source string) error {
ctx := context.Background()
ctx = metadata.AppendToOutgoingContext(ctx, "api-key", config.Cfg.Qdrant.APIKey)
embClient := config.NewEmbeddingClient()
conn := config.NewQdrantConn()
defer conn.Close()
ensureCollection(ctx, pb.NewCollectionsClient(conn))
pointsClient := pb.NewPointsClient(conn)
prefix := fmt.Sprintf("[%s] ", author)
var chunks []chunk
for _, part := range splitLongSection(text) {
chunks = append(chunks, chunk{Text: prefix + part, Source: source, Type: "chat"})
}
return ingestChunks(ctx, embClient, pointsClient, chunks)
}
func boolPtr(b bool) *bool { return &b }