gyrogpsc/storage/kvstore.go

245 lines
5.8 KiB
Go

package storage
import (
"encoding/json"
"errors"
"git.timovolkmann.de/gyrogpsc/core"
"github.com/dgraph-io/badger/v2"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
"os"
"path/filepath"
"time"
)
// Must implement Storer
type badgerStore struct {
trackingsDb *badger.DB
sensordatDb *badger.DB
}
// Returns a badgerDB K/V Store instance. Opens database if exist or creates a new one in working directory
func NewRepository(c *core.Configuration) *badgerStore {
dir, _ := os.Getwd()
logrus.Debug(dir)
if _, err := os.Stat(filepath.Join(dir, "_db")); os.IsNotExist(err) {
os.Mkdir(filepath.Join(dir, "_db"), os.ModePerm)
}
bs := &badgerStore{}
err := bs.openDBs()
if err != nil {
logrus.Error(err)
}
return bs
}
func (r *badgerStore) openDBs() error {
var err error
r.trackingsDb, err = badger.Open(badger.DefaultOptions("_db/trackings"))
if err != nil {
return err
}
r.sensordatDb, err = badger.Open(badger.DefaultOptions("_db/sensor"))
return err
}
func (r *badgerStore) isDbClosed() bool {
return r.trackingsDb.IsClosed() || r.sensordatDb.IsClosed()
}
func (r *badgerStore) Save(tr core.Tracking) error {
if ok := r.isDbClosed(); ok {
logrus.Error("unable to write to database. database closed!")
err := r.openDBs()
if err != nil {
return err
}
//return badger.ErrDBClosed
}
uid, err := tr.UUID.MarshalText()
if err != nil {
logrus.Error(err, tr)
}
logrus.Infoln("save tracking:", tr.TimeCreated.Format(time.RFC3339Nano))
meta, err := json.Marshal(tr.TrackingMetadata)
if err != nil {
logrus.Error(err, tr)
return err
}
err = r.sensordatDb.Update(func(txn *badger.Txn) error {
for _, v := range tr.Data {
k := createRecordKey(tr.UUID, v.Source(), v.Timestamp)
//logrus.Trace(v, " len key ->", len(k))
j, err2 := json.Marshal(v)
logrus.Traceln("save record k/v:\n", tr.UUID.String(), v.Timestamp.Format(time.RFC3339Nano))
//logrus.Traceln(string(pretty.Pretty(j)))
if err2 != nil {
return err2
}
err2 = txn.Set(k, j)
if err2 != nil {
logrus.Warn(err2)
}
}
return nil
})
if err != nil {
logrus.Error(err, tr)
}
err = r.trackingsDb.Update(func(txn *badger.Txn) error {
logrus.Debugln("save tracking metadata k/v:\n", string(uid), string(meta))
err := txn.Set(uid, meta)
return err
})
if err != nil {
logrus.Error(err, tr)
return err
}
dr := 0.5
_ = r.trackingsDb.RunValueLogGC(dr)
_ = r.sensordatDb.RunValueLogGC(dr)
logrus.Info("sucessfully saved tracking")
return nil
}
// Retrieves all existing Trackings. Only Metadata. If you want actual data of tracking, load a specific one with Load(uuid)
func (r *badgerStore) LoadAll() ([]core.TrackingMetadata, error) {
var result []core.TrackingMetadata
err := r.trackingsDb.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
el := core.TrackingMetadata{}
err2 := item.Value(func(val []byte) error {
logrus.Debugln(string(val))
err3 := json.Unmarshal(val, &el)
return err3
})
if err2 != nil {
logrus.Warn(err2)
}
result = append(result, el)
}
return nil
})
if err != nil {
return nil, err
}
return result, nil
}
// Retrieves all data of a tracking from disk
func (r *badgerStore) Load(id uuid.UUID) (*core.Tracking, error) {
logrus.Debugln("try to load from db...", id)
if ok := r.isDbClosed(); ok {
logrus.Error("unable to read from database. database closed!")
err := r.openDBs()
if err != nil {
return nil, err
}
}
t := &core.Tracking{
TrackingMetadata: core.TrackingMetadata{},
//Records: []core.recordPair{},
//Data: nil,
}
err := r.trackingsDb.View(func(txn *badger.Txn) error {
item, err2 := txn.Get([]byte(id.String()))
if err2 != nil {
return err2
}
err2 = item.Value(func(val []byte) error {
err3 := json.Unmarshal(val, &t.TrackingMetadata)
return err3
})
return err2
})
if err != nil {
logrus.Error(err)
}
err = r.sensordatDb.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.Prefix = []byte(id.String())
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
_, source, recTime := unmarshalDataKey(item.Key())
el := core.SensorData{}
el.Timestamp = recTime
el.SetSource(source)
err2 := item.Value(func(val []byte) error {
//logrus.Traceln(string(val))
err3 := json.Unmarshal(val, &el)
//logrus.Traceln(err3, el)
return err3
})
if err2 != nil {
logrus.Warn(err2)
}
t.Data = append(t.Data, el)
}
return nil
})
if err != nil {
logrus.Error(err)
}
return t, nil
}
// helper function to create []byte key from sensordata element
func createRecordKey(uid uuid.UUID, source core.SourceId, timestamp time.Time) []byte {
prefix := []byte(uid.String())
var i string
switch source {
case core.SOURCE_TCP:
i = "1"
case core.SOURCE_SERIAL:
i = "2"
}
middle := []byte(i)
suffix := []byte(timestamp.Format(time.RFC3339Nano))
if timestamp.IsZero() {
err := errors.New("zero value detected")
logrus.Errorln("unable to create key", err)
}
logrus.Traceln("save as:", string(prefix), string(middle), string(suffix))
ret := append(prefix, middle...)
return append(ret, suffix...)
}
// helper function to split []byte key back to actual data
func unmarshalDataKey(key []byte) (uuid.UUID, core.SourceId, time.Time) {
prefix := string(key[:36])
suffix := string(key[37:])
middle := string(key[36:37])
var source core.SourceId
switch middle {
case "1":
source = core.SOURCE_TCP
case "2":
source = core.SOURCE_SERIAL
}
uid, err := uuid.Parse(prefix)
if err != nil {
logrus.Errorln("corrupted key", err)
}
timestamp, err := time.Parse(time.RFC3339Nano, suffix)
if err != nil {
logrus.Errorln("corrupted key", err)
}
return uid, source, timestamp
}