255 lines
7.2 KiB
Go
Executable File
255 lines
7.2 KiB
Go
Executable File
// ingest.go – Importiert Markdown-Dateien in Qdrant
|
||
package brain
|
||
|
||
import (
|
||
"context"
|
||
"crypto/sha256"
|
||
"encoding/hex"
|
||
"fmt"
|
||
"log"
|
||
"os"
|
||
"path/filepath"
|
||
"strings"
|
||
"time"
|
||
|
||
pb "github.com/qdrant/go-client/qdrant"
|
||
openai "github.com/sashabaranov/go-openai"
|
||
"google.golang.org/grpc/metadata"
|
||
|
||
"my-brain-importer/internal/config"
|
||
)
|
||
|
||
const maxChunkSize = 800
|
||
|
||
// generateID erstellt eine deterministische ID via SHA256.
|
||
// Gleicher Chunk → gleiche ID → kein Duplikat bei erneutem Import.
|
||
func generateID(text, source string) string {
|
||
hash := sha256.Sum256([]byte(source + ":" + text))
|
||
return hex.EncodeToString(hash[:16])
|
||
}
|
||
|
||
// RunIngest importiert alle Markdown-Dateien aus brainRoot in Qdrant.
|
||
func RunIngest(brainRoot string) {
|
||
ctx := context.Background()
|
||
ctx = metadata.AppendToOutgoingContext(ctx, "api-key", config.Cfg.Qdrant.APIKey)
|
||
|
||
fmt.Printf("📂 Verzeichnis: %s\n", brainRoot)
|
||
fmt.Printf("🗄️ Qdrant: %s:%s, Collection: %s\n", config.Cfg.Qdrant.Host, config.Cfg.Qdrant.Port, config.Cfg.Qdrant.Collection)
|
||
fmt.Printf("🤖 Embedding: %s (%s)\n\n", config.Cfg.Embedding.Model, config.Cfg.Embedding.URL)
|
||
|
||
embClient := config.NewEmbeddingClient()
|
||
conn := config.NewQdrantConn()
|
||
defer conn.Close()
|
||
|
||
ensureCollection(ctx, pb.NewCollectionsClient(conn))
|
||
pointsClient := pb.NewPointsClient(conn)
|
||
|
||
files := collectMarkdownFiles(brainRoot)
|
||
fmt.Printf("📄 %d Markdown-Dateien gefunden\n\n", len(files))
|
||
|
||
totalChunks := 0
|
||
for _, filePath := range files {
|
||
relPath, _ := filepath.Rel(brainRoot, filePath)
|
||
chunks := readAndChunk(filePath, relPath)
|
||
if len(chunks) == 0 {
|
||
continue
|
||
}
|
||
|
||
fmt.Printf(" %-50s %d Chunks\n", relPath, len(chunks))
|
||
|
||
if err := ingestChunks(ctx, embClient, pointsClient, chunks); err != nil {
|
||
log.Printf(" ⚠️ Fehler bei %s: %v", relPath, err)
|
||
continue
|
||
}
|
||
totalChunks += len(chunks)
|
||
time.Sleep(100 * time.Millisecond)
|
||
}
|
||
|
||
fmt.Printf("\n✅ Import abgeschlossen: %d Chunks aus %d Dateien\n", totalChunks, len(files))
|
||
fmt.Printf("🌐 Dashboard: http://%s:6333/dashboard\n", config.Cfg.Qdrant.Host)
|
||
}
|
||
|
||
func ensureCollection(ctx context.Context, client pb.CollectionsClient) {
|
||
_, err := client.Create(ctx, &pb.CreateCollection{
|
||
CollectionName: config.Cfg.Qdrant.Collection,
|
||
VectorsConfig: &pb.VectorsConfig{
|
||
Config: &pb.VectorsConfig_Params{
|
||
Params: &pb.VectorParams{
|
||
Size: config.Cfg.Embedding.Dimensions,
|
||
Distance: pb.Distance_Cosine,
|
||
},
|
||
},
|
||
},
|
||
})
|
||
if err != nil {
|
||
if strings.Contains(err.Error(), "already exists") {
|
||
fmt.Printf("✅ Collection \"%s\" existiert bereits\n", config.Cfg.Qdrant.Collection)
|
||
} else {
|
||
log.Fatalf("❌ Collection konnte nicht erstellt werden: %v", err)
|
||
}
|
||
} else {
|
||
fmt.Printf("✅ Collection \"%s\" erstellt\n", config.Cfg.Qdrant.Collection)
|
||
}
|
||
}
|
||
|
||
func collectMarkdownFiles(root string) []string {
|
||
var files []string
|
||
filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
|
||
if err != nil {
|
||
return nil
|
||
}
|
||
if info.IsDir() && (strings.Contains(path, "05_Agents") || strings.HasSuffix(path, ".git")) {
|
||
return filepath.SkipDir
|
||
}
|
||
if !info.IsDir() && strings.ToLower(filepath.Ext(path)) == ".md" {
|
||
files = append(files, path)
|
||
}
|
||
return nil
|
||
})
|
||
return files
|
||
}
|
||
|
||
type chunk struct {
|
||
Text string
|
||
Source string
|
||
Type string
|
||
}
|
||
|
||
func readAndChunk(filePath, relPath string) []chunk {
|
||
data, err := os.ReadFile(filePath)
|
||
if err != nil {
|
||
log.Printf("⚠️ Datei nicht lesbar: %s", filePath)
|
||
return nil
|
||
}
|
||
content := strings.TrimSpace(string(data))
|
||
if content == "" {
|
||
return nil
|
||
}
|
||
var chunks []chunk
|
||
for _, section := range splitByHeadings(content) {
|
||
section = strings.TrimSpace(section)
|
||
if len(section) < 20 {
|
||
continue
|
||
}
|
||
for _, text := range splitLongSection(section) {
|
||
chunks = append(chunks, chunk{Text: text, Source: relPath, Type: "text"})
|
||
}
|
||
}
|
||
return chunks
|
||
}
|
||
|
||
func splitByHeadings(text string) []string {
|
||
lines := strings.Split(text, "\n")
|
||
var sections []string
|
||
var current strings.Builder
|
||
for _, line := range lines {
|
||
if strings.HasPrefix(line, "# ") || strings.HasPrefix(line, "## ") || strings.HasPrefix(line, "### ") {
|
||
if current.Len() > 0 {
|
||
sections = append(sections, current.String())
|
||
current.Reset()
|
||
}
|
||
}
|
||
current.WriteString(line)
|
||
current.WriteString("\n")
|
||
}
|
||
if current.Len() > 0 {
|
||
sections = append(sections, current.String())
|
||
}
|
||
return sections
|
||
}
|
||
|
||
func splitLongSection(section string) []string {
|
||
if len(section) <= maxChunkSize {
|
||
return []string{section}
|
||
}
|
||
paragraphs := strings.Split(section, "\n\n")
|
||
var chunks []string
|
||
var current strings.Builder
|
||
for _, para := range paragraphs {
|
||
para = strings.TrimSpace(para)
|
||
if para == "" {
|
||
continue
|
||
}
|
||
if current.Len()+len(para) > maxChunkSize && current.Len() > 0 {
|
||
chunks = append(chunks, current.String())
|
||
current.Reset()
|
||
}
|
||
if current.Len() > 0 {
|
||
current.WriteString("\n\n")
|
||
}
|
||
current.WriteString(para)
|
||
}
|
||
if current.Len() > 0 {
|
||
chunks = append(chunks, current.String())
|
||
}
|
||
return chunks
|
||
}
|
||
|
||
func ingestChunks(ctx context.Context, embClient *openai.Client, pointsClient pb.PointsClient, chunks []chunk) error {
|
||
texts := make([]string, len(chunks))
|
||
for i, c := range chunks {
|
||
texts[i] = c.Text
|
||
}
|
||
|
||
batchSize := 10
|
||
var points []*pb.PointStruct
|
||
|
||
for i := 0; i < len(texts); i += batchSize {
|
||
end := i + batchSize
|
||
if end > len(texts) {
|
||
end = len(texts)
|
||
}
|
||
embResp, err := embClient.CreateEmbeddings(ctx, openai.EmbeddingRequest{
|
||
Input: texts[i:end],
|
||
Model: openai.EmbeddingModel(config.Cfg.Embedding.Model),
|
||
})
|
||
if err != nil {
|
||
return fmt.Errorf("Embedding fehlgeschlagen: %w", err)
|
||
}
|
||
for j, emb := range embResp.Data {
|
||
c := chunks[i+j]
|
||
points = append(points, &pb.PointStruct{
|
||
Id: &pb.PointId{
|
||
PointIdOptions: &pb.PointId_Uuid{Uuid: generateID(c.Text, c.Source)},
|
||
},
|
||
Vectors: &pb.Vectors{
|
||
VectorsOptions: &pb.Vectors_Vector{
|
||
Vector: &pb.Vector{Data: emb.Embedding},
|
||
},
|
||
},
|
||
Payload: map[string]*pb.Value{
|
||
"text": {Kind: &pb.Value_StringValue{StringValue: c.Text}},
|
||
"source": {Kind: &pb.Value_StringValue{StringValue: c.Source}},
|
||
"type": {Kind: &pb.Value_StringValue{StringValue: c.Type}},
|
||
},
|
||
})
|
||
}
|
||
}
|
||
|
||
_, err := pointsClient.Upsert(ctx, &pb.UpsertPoints{
|
||
CollectionName: config.Cfg.Qdrant.Collection,
|
||
Points: points,
|
||
Wait: boolPtr(true),
|
||
})
|
||
return err
|
||
}
|
||
|
||
// IngestChatMessage speichert eine einzelne Chat-Nachricht in Qdrant.
|
||
func IngestChatMessage(text, author, source string) error {
|
||
ctx := context.Background()
|
||
ctx = metadata.AppendToOutgoingContext(ctx, "api-key", config.Cfg.Qdrant.APIKey)
|
||
|
||
embClient := config.NewEmbeddingClient()
|
||
conn := config.NewQdrantConn()
|
||
defer conn.Close()
|
||
|
||
ensureCollection(ctx, pb.NewCollectionsClient(conn))
|
||
pointsClient := pb.NewPointsClient(conn)
|
||
|
||
fullText := fmt.Sprintf("[%s] %s", author, text)
|
||
c := chunk{Text: fullText, Source: source, Type: "chat"}
|
||
return ingestChunks(ctx, embClient, pointsClient, []chunk{c})
|
||
}
|
||
|
||
func boolPtr(b bool) *bool { return &b }
|