Initial commit: my-brain-importer RAG knowledge management agent
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
237
internal/brain/ingest.go
Executable file
237
internal/brain/ingest.go
Executable file
@@ -0,0 +1,237 @@
|
||||
// ingest.go – Importiert Markdown-Dateien in Qdrant
|
||||
package brain
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
pb "github.com/qdrant/go-client/qdrant"
|
||||
openai "github.com/sashabaranov/go-openai"
|
||||
"google.golang.org/grpc/metadata"
|
||||
|
||||
"my-brain-importer/internal/config"
|
||||
)
|
||||
|
||||
const maxChunkSize = 800
|
||||
|
||||
// generateID erstellt eine deterministische ID via SHA256.
|
||||
// Gleicher Chunk → gleiche ID → kein Duplikat bei erneutem Import.
|
||||
func generateID(text, source string) string {
|
||||
hash := sha256.Sum256([]byte(source + ":" + text))
|
||||
return hex.EncodeToString(hash[:16])
|
||||
}
|
||||
|
||||
// RunIngest importiert alle Markdown-Dateien aus brainRoot in Qdrant.
|
||||
func RunIngest(brainRoot string) {
|
||||
ctx := context.Background()
|
||||
ctx = metadata.AppendToOutgoingContext(ctx, "api-key", config.Cfg.Qdrant.APIKey)
|
||||
|
||||
fmt.Printf("📂 Verzeichnis: %s\n", brainRoot)
|
||||
fmt.Printf("🗄️ Qdrant: %s:%s, Collection: %s\n", config.Cfg.Qdrant.Host, config.Cfg.Qdrant.Port, config.Cfg.Qdrant.Collection)
|
||||
fmt.Printf("🤖 Embedding: %s (%s)\n\n", config.Cfg.Embedding.Model, config.Cfg.Embedding.URL)
|
||||
|
||||
embClient := config.NewEmbeddingClient()
|
||||
conn := config.NewQdrantConn()
|
||||
defer conn.Close()
|
||||
|
||||
ensureCollection(ctx, pb.NewCollectionsClient(conn))
|
||||
pointsClient := pb.NewPointsClient(conn)
|
||||
|
||||
files := collectMarkdownFiles(brainRoot)
|
||||
fmt.Printf("📄 %d Markdown-Dateien gefunden\n\n", len(files))
|
||||
|
||||
totalChunks := 0
|
||||
for _, filePath := range files {
|
||||
relPath, _ := filepath.Rel(brainRoot, filePath)
|
||||
chunks := readAndChunk(filePath, relPath)
|
||||
if len(chunks) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
fmt.Printf(" %-50s %d Chunks\n", relPath, len(chunks))
|
||||
|
||||
if err := ingestChunks(ctx, embClient, pointsClient, chunks); err != nil {
|
||||
log.Printf(" ⚠️ Fehler bei %s: %v", relPath, err)
|
||||
continue
|
||||
}
|
||||
totalChunks += len(chunks)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
fmt.Printf("\n✅ Import abgeschlossen: %d Chunks aus %d Dateien\n", totalChunks, len(files))
|
||||
fmt.Printf("🌐 Dashboard: http://%s:6333/dashboard\n", config.Cfg.Qdrant.Host)
|
||||
}
|
||||
|
||||
func ensureCollection(ctx context.Context, client pb.CollectionsClient) {
|
||||
_, err := client.Create(ctx, &pb.CreateCollection{
|
||||
CollectionName: config.Cfg.Qdrant.Collection,
|
||||
VectorsConfig: &pb.VectorsConfig{
|
||||
Config: &pb.VectorsConfig_Params{
|
||||
Params: &pb.VectorParams{
|
||||
Size: config.Cfg.Embedding.Dimensions,
|
||||
Distance: pb.Distance_Cosine,
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "already exists") {
|
||||
fmt.Printf("✅ Collection \"%s\" existiert bereits\n", config.Cfg.Qdrant.Collection)
|
||||
} else {
|
||||
log.Fatalf("❌ Collection konnte nicht erstellt werden: %v", err)
|
||||
}
|
||||
} else {
|
||||
fmt.Printf("✅ Collection \"%s\" erstellt\n", config.Cfg.Qdrant.Collection)
|
||||
}
|
||||
}
|
||||
|
||||
func collectMarkdownFiles(root string) []string {
|
||||
var files []string
|
||||
filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
if info.IsDir() && (strings.Contains(path, "05_Agents") || strings.HasSuffix(path, ".git")) {
|
||||
return filepath.SkipDir
|
||||
}
|
||||
if !info.IsDir() && strings.ToLower(filepath.Ext(path)) == ".md" {
|
||||
files = append(files, path)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return files
|
||||
}
|
||||
|
||||
type chunk struct {
|
||||
Text string
|
||||
Source string
|
||||
Type string
|
||||
}
|
||||
|
||||
func readAndChunk(filePath, relPath string) []chunk {
|
||||
data, err := os.ReadFile(filePath)
|
||||
if err != nil {
|
||||
log.Printf("⚠️ Datei nicht lesbar: %s", filePath)
|
||||
return nil
|
||||
}
|
||||
content := strings.TrimSpace(string(data))
|
||||
if content == "" {
|
||||
return nil
|
||||
}
|
||||
var chunks []chunk
|
||||
for _, section := range splitByHeadings(content) {
|
||||
section = strings.TrimSpace(section)
|
||||
if len(section) < 20 {
|
||||
continue
|
||||
}
|
||||
for _, text := range splitLongSection(section) {
|
||||
chunks = append(chunks, chunk{Text: text, Source: relPath, Type: "text"})
|
||||
}
|
||||
}
|
||||
return chunks
|
||||
}
|
||||
|
||||
func splitByHeadings(text string) []string {
|
||||
lines := strings.Split(text, "\n")
|
||||
var sections []string
|
||||
var current strings.Builder
|
||||
for _, line := range lines {
|
||||
if strings.HasPrefix(line, "# ") || strings.HasPrefix(line, "## ") {
|
||||
if current.Len() > 0 {
|
||||
sections = append(sections, current.String())
|
||||
current.Reset()
|
||||
}
|
||||
}
|
||||
current.WriteString(line)
|
||||
current.WriteString("\n")
|
||||
}
|
||||
if current.Len() > 0 {
|
||||
sections = append(sections, current.String())
|
||||
}
|
||||
return sections
|
||||
}
|
||||
|
||||
func splitLongSection(section string) []string {
|
||||
if len(section) <= maxChunkSize {
|
||||
return []string{section}
|
||||
}
|
||||
paragraphs := strings.Split(section, "\n\n")
|
||||
var chunks []string
|
||||
var current strings.Builder
|
||||
for _, para := range paragraphs {
|
||||
para = strings.TrimSpace(para)
|
||||
if para == "" {
|
||||
continue
|
||||
}
|
||||
if current.Len()+len(para) > maxChunkSize && current.Len() > 0 {
|
||||
chunks = append(chunks, current.String())
|
||||
current.Reset()
|
||||
}
|
||||
if current.Len() > 0 {
|
||||
current.WriteString("\n\n")
|
||||
}
|
||||
current.WriteString(para)
|
||||
}
|
||||
if current.Len() > 0 {
|
||||
chunks = append(chunks, current.String())
|
||||
}
|
||||
return chunks
|
||||
}
|
||||
|
||||
func ingestChunks(ctx context.Context, embClient *openai.Client, pointsClient pb.PointsClient, chunks []chunk) error {
|
||||
texts := make([]string, len(chunks))
|
||||
for i, c := range chunks {
|
||||
texts[i] = c.Text
|
||||
}
|
||||
|
||||
batchSize := 10
|
||||
var points []*pb.PointStruct
|
||||
|
||||
for i := 0; i < len(texts); i += batchSize {
|
||||
end := i + batchSize
|
||||
if end > len(texts) {
|
||||
end = len(texts)
|
||||
}
|
||||
embResp, err := embClient.CreateEmbeddings(ctx, openai.EmbeddingRequest{
|
||||
Input: texts[i:end],
|
||||
Model: openai.EmbeddingModel(config.Cfg.Embedding.Model),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Embedding fehlgeschlagen: %w", err)
|
||||
}
|
||||
for j, emb := range embResp.Data {
|
||||
c := chunks[i+j]
|
||||
points = append(points, &pb.PointStruct{
|
||||
Id: &pb.PointId{
|
||||
PointIdOptions: &pb.PointId_Uuid{Uuid: generateID(c.Text, c.Source)},
|
||||
},
|
||||
Vectors: &pb.Vectors{
|
||||
VectorsOptions: &pb.Vectors_Vector{
|
||||
Vector: &pb.Vector{Data: emb.Embedding},
|
||||
},
|
||||
},
|
||||
Payload: map[string]*pb.Value{
|
||||
"text": {Kind: &pb.Value_StringValue{StringValue: c.Text}},
|
||||
"source": {Kind: &pb.Value_StringValue{StringValue: c.Source}},
|
||||
"type": {Kind: &pb.Value_StringValue{StringValue: c.Type}},
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
_, err := pointsClient.Upsert(ctx, &pb.UpsertPoints{
|
||||
CollectionName: config.Cfg.Qdrant.Collection,
|
||||
Points: points,
|
||||
Wait: boolPtr(true),
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func boolPtr(b bool) *bool { return &b }
|
||||
Reference in New Issue
Block a user