245 lines
5.8 KiB
Go
245 lines
5.8 KiB
Go
package storage
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"git.timovolkmann.de/gyrogpsc/core"
|
|
"github.com/dgraph-io/badger/v2"
|
|
"github.com/google/uuid"
|
|
"github.com/sirupsen/logrus"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
)
|
|
|
|
// Must implement Storer
|
|
type badgerStore struct {
|
|
trackingsDb *badger.DB
|
|
sensordatDb *badger.DB
|
|
}
|
|
|
|
// Returns a badgerDB K/V Store instance. Opens database if exist or creates a new one in working directory
|
|
func NewRepository(c *core.Configuration) *badgerStore {
|
|
dir, _ := os.Getwd()
|
|
logrus.Debug(dir)
|
|
if _, err := os.Stat(filepath.Join(dir, "_db")); os.IsNotExist(err) {
|
|
os.Mkdir(filepath.Join(dir, "_db"), os.ModePerm)
|
|
}
|
|
|
|
bs := &badgerStore{}
|
|
err := bs.openDBs()
|
|
if err != nil {
|
|
logrus.Error(err)
|
|
}
|
|
return bs
|
|
}
|
|
|
|
func (r *badgerStore) openDBs() error {
|
|
var err error
|
|
r.trackingsDb, err = badger.Open(badger.DefaultOptions("_db/trackings"))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
r.sensordatDb, err = badger.Open(badger.DefaultOptions("_db/sensor"))
|
|
return err
|
|
}
|
|
|
|
func (r *badgerStore) isDbClosed() bool {
|
|
return r.trackingsDb.IsClosed() || r.sensordatDb.IsClosed()
|
|
}
|
|
|
|
func (r *badgerStore) Save(tr core.Tracking) error {
|
|
if ok := r.isDbClosed(); ok {
|
|
logrus.Error("unable to write to database. database closed!")
|
|
err := r.openDBs()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
//return badger.ErrDBClosed
|
|
}
|
|
uid, err := tr.UUID.MarshalText()
|
|
if err != nil {
|
|
logrus.Error(err, tr)
|
|
}
|
|
logrus.Infoln("save tracking:", tr.TimeCreated.Format(time.RFC3339Nano))
|
|
meta, err := json.Marshal(tr.TrackingMetadata)
|
|
if err != nil {
|
|
logrus.Error(err, tr)
|
|
return err
|
|
}
|
|
err = r.sensordatDb.Update(func(txn *badger.Txn) error {
|
|
for _, v := range tr.Data {
|
|
k := createRecordKey(tr.UUID, v.Source(), v.Timestamp)
|
|
//logrus.Trace(v, " len key ->", len(k))
|
|
j, err2 := json.Marshal(v)
|
|
logrus.Traceln("save record k/v:\n", tr.UUID.String(), v.Timestamp.Format(time.RFC3339Nano))
|
|
//logrus.Traceln(string(pretty.Pretty(j)))
|
|
if err2 != nil {
|
|
return err2
|
|
}
|
|
err2 = txn.Set(k, j)
|
|
|
|
if err2 != nil {
|
|
logrus.Warn(err2)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
logrus.Error(err, tr)
|
|
}
|
|
err = r.trackingsDb.Update(func(txn *badger.Txn) error {
|
|
logrus.Debugln("save tracking metadata k/v:\n", string(uid), string(meta))
|
|
err := txn.Set(uid, meta)
|
|
return err
|
|
})
|
|
if err != nil {
|
|
logrus.Error(err, tr)
|
|
return err
|
|
}
|
|
|
|
dr := 0.5
|
|
_ = r.trackingsDb.RunValueLogGC(dr)
|
|
_ = r.sensordatDb.RunValueLogGC(dr)
|
|
logrus.Info("sucessfully saved tracking")
|
|
return nil
|
|
}
|
|
|
|
// Retrieves all existing Trackings. Only Metadata. If you want actual data of tracking, load a specific one with Load(uuid)
|
|
func (r *badgerStore) LoadAll() ([]core.TrackingMetadata, error) {
|
|
var result []core.TrackingMetadata
|
|
err := r.trackingsDb.View(func(txn *badger.Txn) error {
|
|
opts := badger.DefaultIteratorOptions
|
|
it := txn.NewIterator(opts)
|
|
defer it.Close()
|
|
for it.Rewind(); it.Valid(); it.Next() {
|
|
item := it.Item()
|
|
el := core.TrackingMetadata{}
|
|
err2 := item.Value(func(val []byte) error {
|
|
logrus.Debugln(string(val))
|
|
err3 := json.Unmarshal(val, &el)
|
|
return err3
|
|
})
|
|
if err2 != nil {
|
|
logrus.Warn(err2)
|
|
}
|
|
|
|
result = append(result, el)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// Retrieves all data of a tracking from disk
|
|
func (r *badgerStore) Load(id uuid.UUID) (*core.Tracking, error) {
|
|
logrus.Debugln("try to load from db...", id)
|
|
if ok := r.isDbClosed(); ok {
|
|
logrus.Error("unable to read from database. database closed!")
|
|
err := r.openDBs()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
t := &core.Tracking{
|
|
TrackingMetadata: core.TrackingMetadata{},
|
|
//Records: []core.recordPair{},
|
|
//Data: nil,
|
|
}
|
|
err := r.trackingsDb.View(func(txn *badger.Txn) error {
|
|
item, err2 := txn.Get([]byte(id.String()))
|
|
if err2 != nil {
|
|
return err2
|
|
}
|
|
err2 = item.Value(func(val []byte) error {
|
|
err3 := json.Unmarshal(val, &t.TrackingMetadata)
|
|
return err3
|
|
})
|
|
return err2
|
|
})
|
|
if err != nil {
|
|
logrus.Error(err)
|
|
}
|
|
err = r.sensordatDb.View(func(txn *badger.Txn) error {
|
|
opts := badger.DefaultIteratorOptions
|
|
opts.Prefix = []byte(id.String())
|
|
it := txn.NewIterator(opts)
|
|
defer it.Close()
|
|
|
|
for it.Rewind(); it.Valid(); it.Next() {
|
|
item := it.Item()
|
|
_, source, recTime := unmarshalDataKey(item.Key())
|
|
el := core.SensorData{}
|
|
el.Timestamp = recTime
|
|
el.SetSource(source)
|
|
err2 := item.Value(func(val []byte) error {
|
|
//logrus.Traceln(string(val))
|
|
err3 := json.Unmarshal(val, &el)
|
|
//logrus.Traceln(err3, el)
|
|
return err3
|
|
})
|
|
if err2 != nil {
|
|
logrus.Warn(err2)
|
|
}
|
|
|
|
t.Data = append(t.Data, el)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
logrus.Error(err)
|
|
}
|
|
|
|
return t, nil
|
|
}
|
|
|
|
// helper function to create []byte key from sensordata element
|
|
func createRecordKey(uid uuid.UUID, source core.SourceId, timestamp time.Time) []byte {
|
|
prefix := []byte(uid.String())
|
|
var i string
|
|
switch source {
|
|
case core.SOURCE_TCP:
|
|
i = "1"
|
|
case core.SOURCE_SERIAL:
|
|
i = "2"
|
|
}
|
|
middle := []byte(i)
|
|
suffix := []byte(timestamp.Format(time.RFC3339Nano))
|
|
if timestamp.IsZero() {
|
|
err := errors.New("zero value detected")
|
|
logrus.Errorln("unable to create key", err)
|
|
}
|
|
logrus.Traceln("save as:", string(prefix), string(middle), string(suffix))
|
|
ret := append(prefix, middle...)
|
|
return append(ret, suffix...)
|
|
}
|
|
|
|
// helper function to split []byte key back to actual data
|
|
func unmarshalDataKey(key []byte) (uuid.UUID, core.SourceId, time.Time) {
|
|
|
|
prefix := string(key[:36])
|
|
suffix := string(key[37:])
|
|
middle := string(key[36:37])
|
|
|
|
var source core.SourceId
|
|
switch middle {
|
|
case "1":
|
|
source = core.SOURCE_TCP
|
|
case "2":
|
|
source = core.SOURCE_SERIAL
|
|
}
|
|
uid, err := uuid.Parse(prefix)
|
|
if err != nil {
|
|
logrus.Errorln("corrupted key", err)
|
|
}
|
|
timestamp, err := time.Parse(time.RFC3339Nano, suffix)
|
|
if err != nil {
|
|
logrus.Errorln("corrupted key", err)
|
|
}
|
|
|
|
return uid, source, timestamp
|
|
}
|