Some checks failed
Deploy to NAS / deploy (push) Failing after 4s
Remove submodule tracking; backend is now a plain directory in the repo. Also update deploy workflow: remove --recurse-submodules. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
164 lines
4.4 KiB
Go
164 lines
4.4 KiB
Go
package db
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
|
|
"github.com/jacek/pamietnik/backend/internal/domain"
|
|
)
|
|
|
|
type TrackpointStore struct {
|
|
pool *pgxpool.Pool
|
|
}
|
|
|
|
func NewTrackpointStore(pool *pgxpool.Pool) *TrackpointStore {
|
|
return &TrackpointStore{pool: pool}
|
|
}
|
|
|
|
// UpsertBatch inserts trackpoints, ignoring duplicates (idempotency via device_id + event_id).
|
|
// Returns accepted event_ids and rejected items with reason.
|
|
func (s *TrackpointStore) UpsertBatch(ctx context.Context, userID string, points []domain.Trackpoint) (accepted []string, rejected []RejectedItem, err error) {
|
|
// First pass: validate all points
|
|
var valid []domain.Trackpoint
|
|
for _, p := range points {
|
|
if vErr := validateTrackpoint(p); vErr != nil {
|
|
rejected = append(rejected, RejectedItem{
|
|
EventID: p.EventID,
|
|
Code: "VALIDATION_ERROR",
|
|
Message: vErr.Error(),
|
|
})
|
|
continue
|
|
}
|
|
valid = append(valid, p)
|
|
}
|
|
|
|
if len(valid) == 0 {
|
|
return accepted, rejected, nil
|
|
}
|
|
|
|
// Ensure devices in a single batch (deduplicated)
|
|
if userID != "" {
|
|
seen := make(map[string]bool)
|
|
batch := &pgx.Batch{}
|
|
for _, p := range valid {
|
|
if !seen[p.DeviceID] {
|
|
seen[p.DeviceID] = true
|
|
batch.Queue(
|
|
`INSERT INTO devices (device_id, user_id) VALUES ($1, $2) ON CONFLICT (device_id) DO NOTHING`,
|
|
p.DeviceID, userID,
|
|
)
|
|
}
|
|
}
|
|
br := s.pool.SendBatch(ctx, batch)
|
|
if closeErr := br.Close(); closeErr != nil {
|
|
return accepted, rejected, fmt.Errorf("ensure devices: %w", closeErr)
|
|
}
|
|
}
|
|
|
|
// Insert trackpoints
|
|
for _, p := range valid {
|
|
_, err := s.pool.Exec(ctx, `
|
|
INSERT INTO trackpoints (
|
|
event_id, device_id, trip_id, ts,
|
|
lat, lon, source, note,
|
|
accuracy_m, speed_mps, bearing_deg, altitude_m
|
|
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)
|
|
ON CONFLICT (device_id, event_id) DO NOTHING`,
|
|
p.EventID, p.DeviceID, p.TripID, p.Timestamp,
|
|
p.Lat, p.Lon, p.Source, p.Note,
|
|
p.AccuracyM, p.SpeedMps, p.BearingDeg, p.AltitudeM,
|
|
)
|
|
if err != nil {
|
|
rejected = append(rejected, RejectedItem{
|
|
EventID: p.EventID,
|
|
Code: "DB_ERROR",
|
|
Message: "database error",
|
|
})
|
|
continue
|
|
}
|
|
accepted = append(accepted, p.EventID)
|
|
}
|
|
return accepted, rejected, nil
|
|
}
|
|
|
|
type RejectedItem struct {
|
|
EventID string `json:"event_id"`
|
|
Code string `json:"code"`
|
|
Message string `json:"message"`
|
|
}
|
|
|
|
func validateTrackpoint(p domain.Trackpoint) error {
|
|
if p.EventID == "" {
|
|
return errors.New("event_id is required")
|
|
}
|
|
if p.DeviceID == "" {
|
|
return errors.New("device_id is required")
|
|
}
|
|
if p.Lat < -90 || p.Lat > 90 {
|
|
return errors.New("lat out of range")
|
|
}
|
|
if p.Lon < -180 || p.Lon > 180 {
|
|
return errors.New("lon out of range")
|
|
}
|
|
if p.Source != "" && p.Source != "gps" && p.Source != "manual" {
|
|
return errors.New("source must be 'gps' or 'manual'")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *TrackpointStore) ListByDate(ctx context.Context, userID, date string) ([]domain.Trackpoint, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT tp.event_id, tp.device_id, tp.trip_id, tp.ts,
|
|
tp.lat, tp.lon, tp.source, tp.note,
|
|
tp.accuracy_m, tp.speed_mps, tp.bearing_deg, tp.altitude_m
|
|
FROM trackpoints tp
|
|
JOIN devices d ON d.device_id = tp.device_id
|
|
WHERE d.user_id = $1
|
|
AND DATE(tp.ts AT TIME ZONE 'UTC') = $2::date
|
|
ORDER BY tp.ts`,
|
|
userID, date,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
return pgx.CollectRows(rows, func(row pgx.CollectableRow) (domain.Trackpoint, error) {
|
|
var p domain.Trackpoint
|
|
err := row.Scan(
|
|
&p.EventID, &p.DeviceID, &p.TripID, &p.Timestamp,
|
|
&p.Lat, &p.Lon, &p.Source, &p.Note,
|
|
&p.AccuracyM, &p.SpeedMps, &p.BearingDeg, &p.AltitudeM,
|
|
)
|
|
return p, err
|
|
})
|
|
}
|
|
|
|
func (s *TrackpointStore) ListDays(ctx context.Context, userID, from, to string) ([]domain.DaySummary, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT DATE(tp.ts AT TIME ZONE 'UTC')::text AS date,
|
|
COUNT(*) AS cnt,
|
|
MIN(tp.ts),
|
|
MAX(tp.ts)
|
|
FROM trackpoints tp
|
|
JOIN devices d ON d.device_id = tp.device_id
|
|
WHERE d.user_id = $1
|
|
AND DATE(tp.ts AT TIME ZONE 'UTC') BETWEEN $2::date AND $3::date
|
|
GROUP BY DATE(tp.ts AT TIME ZONE 'UTC')
|
|
ORDER BY date`,
|
|
userID, from, to,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
return pgx.CollectRows(rows, func(row pgx.CollectableRow) (domain.DaySummary, error) {
|
|
var d domain.DaySummary
|
|
err := row.Scan(&d.Date, &d.Count, &d.FirstTS, &d.LastTS)
|
|
return d, err
|
|
})
|
|
}
|