diff --git a/cmd/server/server.go b/cmd/server/server.go index f69458e..862184f 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -11,6 +11,8 @@ import ( func main() { conf := configurationFromFile() + logrus.Debug(conf) + repo := storage.NewRepository(conf) disp := core.NewDispatcher() diff --git a/core/collectors.go b/core/collectors.go index 0cac1d9..16a4bfe 100644 --- a/core/collectors.go +++ b/core/collectors.go @@ -25,7 +25,7 @@ const ( var tcpSingleton *tcpCollector -func NewCollector(typ CollectorType, proc Pusher, config *Configuration) Collector { +func NewCollector(typ CollectorType, proc Processor, config *Configuration) Collector { var coll Collector switch typ { case SERIAL: @@ -45,7 +45,7 @@ func NewCollector(typ CollectorType, proc Pusher, config *Configuration) Collect type serialCollector struct { active bool - proc Pusher + proc Processor config *Configuration mu sync.RWMutex } @@ -121,7 +121,7 @@ func (s *serialCollector) Close() { s.mu.Unlock() } -func newSerial(proc Pusher, config *Configuration) *serialCollector { +func newSerial(proc Processor, config *Configuration) *serialCollector { return &serialCollector{ active: false, proc: proc, @@ -131,7 +131,7 @@ func newSerial(proc Pusher, config *Configuration) *serialCollector { type tcpCollector struct { active bool - processor Pusher + processor Processor //config *Configuration } @@ -143,11 +143,11 @@ func (t *tcpCollector) Close() { t.active = false } -func (t *tcpCollector) SetProcessor(p Pusher) { +func (t *tcpCollector) SetProcessor(p Processor) { t.processor = p } -func newTcp(proc Pusher, config *Configuration) *tcpCollector { +func newTcp(proc Processor, config *Configuration) *tcpCollector { logrus.Println("start tcp collector") listener, err := net.Listen("tcp", config.Collectors.TcpCollectorPort) diff --git a/core/config.go b/core/config.go index adabf79..076f9a8 100644 --- a/core/config.go +++ b/core/config.go @@ -1,16 +1,16 @@ package core type Configuration struct { - Collectors struct { - TcpCollectorPort string `mapstructure:"porttcp"` - SerialCollectorPort string `mapstructure:"portserial"` - } `mapstructure:"Collectors"` - Webserver struct { - Port string `mapstructure:"port"` - } `mapstructure:"webserver"` - Pipeline struct { - PublishIntervalMs int `mapstructure:"publishintervalms"` - SyncUpdateIntervalMs int `mapstructure:"syncupdateintervalms"` - } `mapstructure:"pipeline"` - Debuglevel string `mapstructure:"debuglevel"` + Collectors struct { + TcpCollectorPort string `mapstructure:"porttcp"` + SerialCollectorPort string `mapstructure:"portserial"` + } `mapstructure:"Collectors"` + Webserver struct { + Port string `mapstructure:"port"` + } `mapstructure:"webserver"` + Pipeline struct { + PublishIntervalMs int `mapstructure:"publishintervalms"` + SyncUpdateIntervalMs int `mapstructure:"syncupdateintervalms"` + } `mapstructure:"pipeline"` + Debuglevel string `mapstructure:"debuglevel"` } diff --git a/core/dispatcher.go b/core/dispatcher.go index a705d67..00ffdca 100644 --- a/core/dispatcher.go +++ b/core/dispatcher.go @@ -18,8 +18,8 @@ func NewDispatcher() *dispatcher { } func (d *dispatcher) Publish(message string) { - logrus.Debugf("publish to %v listeners\n", len(d.listeners)) - logrus.Debug(message) + logrus.Tracef("publishing to %v listeners\n", len(d.listeners)) + logrus.Trace(message) for _, ch := range d.listeners { ch <- message } diff --git a/core/interfaces.go b/core/interfaces.go index 4beafe9..e59aff9 100644 --- a/core/interfaces.go +++ b/core/interfaces.go @@ -11,17 +11,17 @@ type Publisher interface { Publish(message string) } -type Pusher interface { +type Processor interface { Push(data *sensorData) error } type Storer interface { - EnqueuePair(tcp sensorData, ser sensorData) - EnqueueRaw(data sensorData) + AddDataPair(tcp sensorData, ser sensorData) + AddRaw(data sensorData) } type Repo interface { Save(tracking Tracking) error LoadAll() ([]TrackingMetadata, error) - Load(id uuid.UUID) (Tracking, error) + Load(id uuid.UUID) (*Tracking, error) } diff --git a/core/pipeline.go b/core/pipeline.go index 36cfc58..66d1084 100644 --- a/core/pipeline.go +++ b/core/pipeline.go @@ -24,7 +24,7 @@ type pipeline struct { sema *semaphore.Weighted } -// pipe implements Runner & Pusher +// pipe implements Runner & Processor func NewPipeline(d Publisher, s Storer, conf *Configuration) *pipeline { return &pipeline{ false, @@ -51,30 +51,35 @@ func (p *pipeline) isPipeActive() bool { defer p.mu.RUnlock() return p.active } +func (p *pipeline) isPipeRecording() bool { + p.mu.RLock() + defer p.mu.RUnlock() + return p.record +} func (p *pipeline) Run() { - p.sema.Acquire(context.Background(), 2) + p.sema.Acquire(context.Background(), 1) // !!! n=2 wenn synchronizer mitläuft p.mu.Lock() p.active = true p.mu.Unlock() logrus.Println("pipe: processing service started") - go func() { - for p.isPipeActive() { - <-p.synchroniz.updateTicker.C - err := p.refreshDelay() - if err != nil { - logrus.Debugln(err) - } - } - p.sema.Release(1) - logrus.Println("pipe: updater stopped") - }() + //go func() { + // for p.isPipeActive() { + // <-p.synchroniz.updateTicker.C + // err := p.refreshDelay() + // if err != nil { + // logrus.Debugln(err) + // } + // } + // p.sema.Release(1) + // logrus.Println("pipe: updater stopped") + //}() go func() { for p.isPipeActive() { <-p.publishTicker.C err := p.publish() if err != nil && err.Error() != "no data available" { - logrus.Debug(err) + logrus.Trace(err) } } p.sema.Release(1) @@ -89,9 +94,25 @@ func (p *pipeline) StopRecord() { p.record = false } +func (p *pipeline) Push(data *sensorData) error { + if (data == nil || *data == sensorData{}) { + return errors.New("no data") + } + //logrus.Println("push data to pipe:", string(data.Source)) + switch data.Source { + case SOURCE_TCP: + go p.pushTcpDataToBuffer(*data) + case SOURCE_SERIAL: + go p.pushSerialDataToBuffer(*data) + default: + panic("pipe: invalid data Source") + } + return nil +} + func (p *pipeline) publish() error { - p.buffer.tcpMutex.Lock() p.buffer.serialMutex.Lock() + p.buffer.tcpMutex.Lock() if (p.buffer.MeasTcp == sensorData{} && p.buffer.MeasSerial == sensorData{}) { p.buffer.tcpMutex.Unlock() @@ -104,21 +125,25 @@ func (p *pipeline) publish() error { p.buffer.serialMutex.Unlock() return errors.New("same data") } - logrus.Debug("––––––––––––––––––––––––––––––––––––") - logrus.Debugf("SER old: %v", p.buffer.LastMeasSerial) - logrus.Debugf("SER new: %v", p.buffer.MeasSerial) - logrus.Debugf("TCP old: %v", p.buffer.LastMeasTcp) - logrus.Debugf("TCP new: %v", p.buffer.MeasTcp) - logrus.Debug("––––––––––––––––––––––––––––––––––––") + logrus.Debugf("–––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––") + logrus.Tracef("SER old: %-40s %-40s %v %v", p.buffer.LastMeasSerial.Timestamp.Format(time.RFC3339Nano), p.buffer.LastMeasSerial.ServerTime.Format(time.RFC3339Nano), p.buffer.LastMeasSerial.Position, p.buffer.LastMeasSerial.Orientation) + logrus.Debugf("SER new: %-40s %-40s %v %v", p.buffer.MeasSerial.Timestamp.Format(time.RFC3339Nano), p.buffer.MeasSerial.ServerTime.Format(time.RFC3339Nano), p.buffer.MeasSerial.Position, p.buffer.MeasSerial.Orientation) + logrus.Tracef("TCP old: %-40s %-40s %v %v", p.buffer.LastMeasTcp.Timestamp.Format(time.RFC3339Nano), p.buffer.LastMeasTcp.ServerTime.Format(time.RFC3339Nano), p.buffer.LastMeasTcp.Position, p.buffer.LastMeasTcp.Orientation) + logrus.Debugf("TCP new: %-40s %-40s %v %v", p.buffer.MeasTcp.Timestamp.Format(time.RFC3339Nano), p.buffer.MeasTcp.ServerTime.Format(time.RFC3339Nano), p.buffer.MeasTcp.Position, p.buffer.MeasTcp.Orientation) p.buffer.LastMeasTcp = p.buffer.MeasTcp p.buffer.LastMeasSerial = p.buffer.MeasSerial - p.storer.EnqueuePair(p.buffer.MeasTcp, p.buffer.MeasSerial) - data := map[string]sensorData{ - string(SOURCE_TCP): p.buffer.MeasTcp, - string(SOURCE_SERIAL): p.buffer.MeasSerial, + if p.isPipeRecording() { + p.storer.AddDataPair(p.buffer.MeasTcp, p.buffer.MeasSerial) } + data := map[string]interface{}{} + if p.buffer.MeasTcp.Source == SOURCE_TCP { + data[string(SOURCE_TCP)] = p.buffer.MeasTcp + } + if p.buffer.MeasSerial.Source == SOURCE_SERIAL { + data[string(SOURCE_SERIAL)] = p.buffer.MeasSerial + } p.buffer.tcpMutex.Unlock() p.buffer.serialMutex.Unlock() @@ -149,74 +174,65 @@ type synchronizer struct { } func (p *pipeline) refreshDelay() error { - p.synchroniz.mutex.RLock() - if p.synchroniz.tcpSerialDelayMs != 0 { - logrus.Println("Delay TCP/SERIAL", p.synchroniz.tcpSerialDelayMs) - } - p.synchroniz.mutex.RUnlock() + logrus.Debugf("refreshing delay...") p.buffer.serialMutex.Lock() p.buffer.tcpMutex.Lock() - tcpTime := time.Unix(0, p.buffer.MeasTcp.Timestamp) - serTime := time.Unix(0, p.buffer.MeasSerial.Timestamp) + tcpTime := p.buffer.MeasTcp.Timestamp + serTime := p.buffer.MeasSerial.Timestamp p.buffer.tcpMutex.Unlock() p.buffer.serialMutex.Unlock() - if tcpTime.UnixNano() == 0 || serTime.UnixNano() == 0 { - return errors.New("no sync possible. check if both Collectors running. otherwise check GPS fix") + + if tcpTime.IsZero() || serTime.IsZero() || tcpTime.UnixNano() == 0 || serTime.UnixNano() == 0 { + return errors.New("sync not possible. zero time value detected") } + logrus.Debug("TCP", tcpTime.Format(time.RFC3339Nano)) + logrus.Debug("SER", serTime.Format(time.RFC3339Nano)) + currentDelay := tcpTime.Sub(serTime).Milliseconds() + p.synchroniz.mutex.Lock() + defer p.synchroniz.mutex.Unlock() + logrus.Debugf("old delay-> %vms...", p.synchroniz.tcpSerialDelayMs) if currentDelay > 5000 || currentDelay < -5000 { - p.synchroniz.mutex.Lock() p.synchroniz.tcpSerialDelayMs = 0 - p.synchroniz.mutex.Unlock() return errors.New("skipping synchronisation! time not properly configured or facing network problems.") } - logrus.Debug("TCP", tcpTime.String()) - logrus.Debug("SER", serTime.String()) - logrus.Debug("Difference", tcpTime.Sub(serTime).Milliseconds(), "ms") - delay := tcpTime.Sub(serTime).Milliseconds() - p.synchroniz.mutex.Lock() - p.synchroniz.tcpSerialDelayMs += delay - p.synchroniz.mutex.Unlock() - return nil -} - -func (p *pipeline) Push(data *sensorData) error { - if data == nil { - return errors.New("nil processing not allowed") - } - //logrus.Println("push data to pipe:", string(data.source)) - if p.isPipeActive() { - p.storer.EnqueueRaw(*data) - } - switch data.source { - case SOURCE_TCP: - go p.pushTcpDataToBuffer(*data) - case SOURCE_SERIAL: - go p.pushSerialDataToBuffer(*data) - default: - panic("pipe: invalid data source") - } + p.synchroniz.tcpSerialDelayMs += currentDelay + logrus.Infof("new delay-> %vms", p.synchroniz.tcpSerialDelayMs) return nil } func (p *pipeline) pushTcpDataToBuffer(data sensorData) { + data.ServerTime = time.Now().UTC() + + if p.isPipeRecording() { + p.storer.AddRaw(data) + } + p.synchroniz.mutex.RLock() if p.synchroniz.tcpSerialDelayMs > 0 { time.Sleep(time.Duration(p.synchroniz.tcpSerialDelayMs) * time.Millisecond) } - p.synchroniz.mutex.RLock() + p.synchroniz.mutex.RUnlock() p.buffer.tcpMutex.Lock() - p.buffer.MeasTcp = p.buffer.MeasTcp.ConsolidateExTime(data) + p.buffer.MeasTcp = data + //p.buffer.MeasTcp = p.buffer.MeasTcp.ConsolidateExTime(data) p.buffer.tcpMutex.Unlock() } func (p *pipeline) pushSerialDataToBuffer(data sensorData) { + data.ServerTime = time.Now().UTC() + + if p.isPipeRecording() { + p.storer.AddRaw(data) + } + p.synchroniz.mutex.RLock() if p.synchroniz.tcpSerialDelayMs < 0 { time.Sleep(time.Duration(-p.synchroniz.tcpSerialDelayMs) * time.Millisecond) } p.synchroniz.mutex.RUnlock() p.buffer.serialMutex.Lock() - p.buffer.MeasSerial = p.buffer.MeasSerial.ConsolidateEpochsOnly(data) + p.buffer.MeasSerial = data + //p.buffer.MeasSerial = p.buffer.MeasSerial.ConsolidateEpochsOnly(data) p.buffer.serialMutex.Unlock() } diff --git a/core/sensordata.go b/core/sensordata.go index 1114f37..7e5cba5 100644 --- a/core/sensordata.go +++ b/core/sensordata.go @@ -16,11 +16,13 @@ const ( SOURCE_SERIAL sourceId = "SOURCE_SERIAL" ) +var timeex int64 + type sensorData struct { itow uint32 - source sourceId + Source sourceId ServerTime time.Time - Timestamp int64 + Timestamp time.Time Position [3]float64 Orientation [3]float64 } @@ -66,7 +68,7 @@ func (s sensorData) ConsolidateExTime(n sensorData) sensorData { } func (s *sensorData) checkSources(n *sensorData) { - if (s.source != n.source && *s != sensorData{}) { + if (s.Source != n.Source && *s != sensorData{}) { logrus.Println(s) logrus.Println(n) logrus.Fatalln("Do not consolidate sensorData from different Sources") @@ -80,21 +82,21 @@ var ( func ConvertUbxToSensorData(msg interface{}) (*sensorData, error) { sd := &sensorData{ - ServerTime: time.Now(), - source: SOURCE_SERIAL, + //ServerTime: time.Now().UTC(), + Source: SOURCE_SERIAL, } switch v := msg.(type) { case *ublox.NavPvt: //logrus.Println("NAV-PVT") sd.itow = v.ITOW_ms - sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC).UnixNano() + sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC) sd.Position[0] = float64(v.Lat_dege7) / 1e+7 sd.Position[1] = float64(v.Lon_dege7) / 1e+7 sd.Position[2] = float64(v.HMSL_mm) / 1e+3 // mm in m case *ublox.HnrPvt: //logrus.Println("HNR-PVT") sd.itow = v.ITOW_ms - sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC).UnixNano() + sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC) sd.Position[0] = float64(v.Lat_dege7) / 1e+7 sd.Position[1] = float64(v.Lon_dege7) / 1e+7 sd.Position[2] = float64(v.HMSL_mm) / 1e+3 // mm in m @@ -130,16 +132,24 @@ func convertIPhoneSensorLog(jsonData []byte) (*sensorData, error) { pitch := gjson.Get(string(jsonData), "motionPitch").Float() * 180 / math.Pi roll := gjson.Get(string(jsonData), "motionRoll").Float() * 180 / math.Pi yaw := gjson.Get(string(jsonData), "motionYaw").Float() * 180 / math.Pi + var ts time.Time + if timestamp != 0 { + ts = time.Unix(0, int64(timestamp*float64(time.Second))).UTC() + timeex = time.Now().UnixNano() - ts.UnixNano() + } else if timeex != 0 { + ts = time.Now().Add(time.Duration(timeex)).UTC() + } + //if ts == time.Date() sd := &sensorData{ - ServerTime: time.Now(), - source: SOURCE_TCP, - Timestamp: int64(timestamp * float64(time.Second)), + //ServerTime: time.Now().UTC(), + Source: SOURCE_TCP, + Timestamp: ts, Position: [3]float64{lat, lon, alt}, Orientation: [3]float64{pitch, roll, yaw}, - //Timestamp: time.Unix(0, prep.Timestamp * int64(time.Millisecond)), } - //logrus.Println(string(pretty.Pretty(jsonData))) - //logrus.Println(sd) + if (*sd == sensorData{}) { + return nil, errors.New("iphone sensorlog: convert empty") + } return sd, nil } @@ -153,12 +163,14 @@ func convertAndroidHyperImu(jsonData []byte) (*sensorData, error) { yaw := gjson.Get(string(jsonData), "orientation.2").Float() sd := &sensorData{ - ServerTime: time.Now(), - source: SOURCE_TCP, - Timestamp: timestamp * int64(time.Millisecond), + //ServerTime: time.Now().UTC(), + Source: SOURCE_TCP, + Timestamp: time.Unix(0, timestamp*int64(time.Millisecond)).UTC(), Position: [3]float64{lat, lon, alt}, Orientation: [3]float64{pitch, roll, yaw}, - //Timestamp: time.Unix(0, prep.Timestamp * int64(time.Millisecond)), + } + if (*sd == sensorData{}) { + return nil, errors.New("android hyperimu: convert empty") } return sd, nil } diff --git a/core/service.go b/core/service.go index b05db75..ba27348 100644 --- a/core/service.go +++ b/core/service.go @@ -23,7 +23,7 @@ type Service interface { StopRecord() (*TrackingMetadata, error) StopAll() (*TrackingMetadata, error) - LoadTracking(trackingId uuid.UUID) + LoadTracking(trackingId uuid.UUID) (*Tracking, error) DeleteTracking(trackingId uuid.UUID) StartReplay() @@ -50,7 +50,8 @@ func TrackingService(r Repo, d Publisher, c *Configuration) *trackingService { pipe: NewPipeline(d, t, c), collectors: nil, } - //ts.pipe.Run() + // first initialize of tcp collector to to open tcp port + NewCollector(TCP, nil, c) return ts } @@ -140,15 +141,21 @@ func (t *trackingService) StopAll() (*TrackingMetadata, error) { e.Close() } if t.opMode == RECORDING { - logrus.Warn("trackingservice: stop recording gracefully") + logrus.Info("trackingservice: gracefully stop recording ") tm, err = t.StopRecord() } t.opMode = STOPPED return tm, err } -func (t *trackingService) LoadTracking(trackingId uuid.UUID) { - panic("implement me") +func (t *trackingService) LoadTracking(trackingId uuid.UUID) (*Tracking, error) { + logrus.Info("LOAD TRACKING from database") + tracking, err := t.repo.Load(trackingId) + if err != nil { + return nil, err + } + t.safelyReplaceTracking(*tracking) + return t.current, nil } func (t *trackingService) DeleteTracking(trackingId uuid.UUID) { diff --git a/core/trackings.go b/core/trackings.go index 6df3672..baddbde 100644 --- a/core/trackings.go +++ b/core/trackings.go @@ -9,8 +9,8 @@ import ( type Tracking struct { TrackingMetadata - Records []recordPair - Rawdata []rawRecord + Records []SensorPair + Rawdata []sensorData } var mRec sync.RWMutex @@ -22,41 +22,38 @@ type TrackingMetadata struct { Collectors []CollectorType } -// persistence wrapper for sensordata -type recordPair struct { - RecordTimeKey time.Time // uniqueness ensured through mutex - DataPair map[sourceId]sensorData +type SensorPair struct { + RecordTime time.Time // uniqueness ensured through mutex + Data map[sourceId]sensorData } -type rawRecord struct { - RecordTimeKey time.Time // uniqueness ensured through mutex - Data sensorData -} +//func (r *SensorPair) restoreDataPair(data []byte) error { +// err := json.Unmarshal(data, &r.Data) +// return err +//} -// END persistence wrapper for sensordata - -func (s *Tracking) EnqueuePair(tcp sensorData, ser sensorData) { - rp := recordPair{ - RecordTimeKey: time.Now(), - DataPair: map[sourceId]sensorData{ - tcp.source: tcp, - ser.source: ser, - }, +func (s *Tracking) AddDataPair(tcp sensorData, ser sensorData) { + rp := SensorPair{ + RecordTime: time.Now().UTC(), + Data: make(map[sourceId]sensorData, 2), } + if tcp.Source == SOURCE_TCP { + rp.Data[tcp.Source] = tcp + } + if ser.Source == SOURCE_SERIAL { + rp.Data[ser.Source] = ser + } + mRec.Lock() s.Records = append(s.Records, rp) - logrus.Debugln("tracking Records: len->", len(s.Records)) + logrus.Traceln("tracking Records: len->", len(s.Records)) mRec.Unlock() } -func (s *Tracking) EnqueueRaw(data sensorData) { - sr := rawRecord{ - time.Now(), - data, - } +func (s *Tracking) AddRaw(data sensorData) { mRec.Lock() - s.Rawdata = append(s.Rawdata, sr) - logrus.Debugln("raw data points: len->", len(s.Rawdata)) + s.Rawdata = append(s.Rawdata, data) + logrus.Traceln("raw data points: len->", len(s.Rawdata)) mRec.Unlock() } @@ -65,8 +62,8 @@ func emptyTracking() Tracking { TrackingMetadata: TrackingMetadata{ UUID: uuid.New(), }, - Records: []recordPair{}, - Rawdata: []rawRecord{}, + Records: []SensorPair{}, + Rawdata: []sensorData{}, } } diff --git a/storage/kvstore.go b/storage/kvstore.go index 3997b9b..1dba933 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -1,8 +1,8 @@ package storage import ( - "encoding/binary" "encoding/json" + "errors" "git.timovolkmann.de/gyrogpsc/core" "github.com/dgraph-io/badger/v2" "github.com/google/uuid" @@ -10,7 +10,7 @@ import ( "github.com/tidwall/pretty" "os" "path/filepath" - "strconv" + "time" ) // Must implement Repo @@ -27,30 +27,46 @@ func NewRepository(c *core.Configuration) *badgerStore { os.Mkdir(filepath.Join(dir, "_db"), os.ModePerm) } - tr, err := badger.Open(badger.DefaultOptions("_db/trackings")) - dp, err := badger.Open(badger.DefaultOptions("_db/records")) - rd, err := badger.Open(badger.DefaultOptions("_db/raw")) - + bs := &badgerStore{} + err := bs.openDBs() if err != nil { logrus.Error(err) } - return &badgerStore{trackingsDb: tr, recordsDb: dp, rawdataDb: rd} + return bs } -func (r *badgerStore) isDbAvailable() bool { +func (r *badgerStore) openDBs() error { + var err error + r.trackingsDb, err = badger.Open(badger.DefaultOptions("_db/trackings")) + if err != nil { + return err + } + r.recordsDb, err = badger.Open(badger.DefaultOptions("_db/records")) + if err != nil { + return err + } + r.rawdataDb, err = badger.Open(badger.DefaultOptions("_db/raw")) + return err +} + +func (r *badgerStore) isDbClosed() bool { return r.trackingsDb.IsClosed() || r.recordsDb.IsClosed() || r.rawdataDb.IsClosed() } func (r *badgerStore) Save(tr core.Tracking) error { - if ok := r.isDbAvailable(); ok { + if ok := r.isDbClosed(); ok { logrus.Error("unable to write to database. database closed!") - return badger.ErrDBClosed + err := r.openDBs() + if err != nil { + return err + } + //return badger.ErrDBClosed } - ts, err := tr.TimeCreated.MarshalText() + uid, err := tr.UUID.MarshalText() if err != nil { logrus.Error(err, tr) } - logrus.Info("save tracking:", tr.TimeCreated) + logrus.Infoln("save tracking:", tr.TimeCreated.Format(time.RFC3339Nano)) meta, err := json.Marshal(tr.TrackingMetadata) if err != nil { logrus.Error(err, tr) @@ -58,41 +74,50 @@ func (r *badgerStore) Save(tr core.Tracking) error { } err = r.recordsDb.Update(func(txn *badger.Txn) error { for _, v := range tr.Records { - k := createRecordKey(tr.UUID, v.RecordTimeKey.UnixNano()) - j, err := json.Marshal(v.DataPair) - logrus.Debugln("save record k/v:\n", tr.UUID.String(), strconv.FormatInt(v.RecordTimeKey.UnixNano(), 10)) - logrus.Debugln(string(pretty.Pretty(j))) - if err != nil { - return err + k := createRecordKey(tr.UUID, v.RecordTime) + logrus.Trace(v.Data, " len key ->", len(k)) + j, err2 := json.Marshal(v.Data) + logrus.Traceln("save record k/v:\n", tr.UUID.String(), v.RecordTime.Format(time.RFC3339Nano)) + logrus.Traceln(string(pretty.Pretty(j))) + if err2 != nil { + return err2 + } + err2 = txn.Set(k, j) + + if err2 != nil { + logrus.Warn(err2) } - txn.Set(k, j) } return nil }) if err != nil { logrus.Error(err, tr) - return err - } - err = r.rawdataDb.Update(func(txn *badger.Txn) error { - for _, v := range tr.Rawdata { - k := createRecordKey(tr.UUID, v.RecordTimeKey.UnixNano()) - j, err := json.Marshal(v) - logrus.Debugln("save raw k/v:\n", tr.UUID.String(), strconv.FormatInt(v.RecordTimeKey.UnixNano(), 10)) - logrus.Debugln(string(pretty.Pretty(j))) - if err != nil { - return err - } - txn.Set(k, j) - } - return nil - }) - if err != nil { - logrus.Error(err, tr) - return err + //return err } + //err = r.rawdataDb.Update(func(txn *badger.Txn) error { + // for _, v := range tr.Rawdata { + // k := createRecordKey(tr.UUID, v.ServerTime) + // j, err2 := json.Marshal(v) + // logrus.Traceln("save raw k/v:\n", tr.UUID.String(), strconv.FormatInt(v.ServerTime.UnixNano(), 10)) + // logrus.Traceln(string(pretty.Pretty(j))) + // if err2 != nil { + // return err2 + // } + // err2 = txn.Set(k, j) + // + // if err2 != nil { + // logrus.Warn(err2) + // } + // } + // return nil + //}) + //if err != nil { + // logrus.Error(err, tr) + // //return err + //} err = r.trackingsDb.Update(func(txn *badger.Txn) error { - logrus.Debug("save tracking meta k/v:\n", string(ts), string(meta)) - err := txn.Set(ts, meta) + logrus.Debugln("save tracking metadata k/v:\n", string(uid), string(meta)) + err := txn.Set(uid, meta) return err }) if err != nil { @@ -101,12 +126,9 @@ func (r *badgerStore) Save(tr core.Tracking) error { } dr := 0.5 - err = r.trackingsDb.RunValueLogGC(dr) - logrus.Debug("DB GC:", err) - err = r.recordsDb.RunValueLogGC(dr) - logrus.Debug("DB GC:", err) - err = r.rawdataDb.RunValueLogGC(dr) - logrus.Debug("DB GC:", err) + _ = r.trackingsDb.RunValueLogGC(dr) + _ = r.recordsDb.RunValueLogGC(dr) + _ = r.rawdataDb.RunValueLogGC(dr) logrus.Info("sucessfully saved tracking") return nil } @@ -120,11 +142,15 @@ func (r *badgerStore) LoadAll() ([]core.TrackingMetadata, error) { for it.Rewind(); it.Valid(); it.Next() { item := it.Item() el := core.TrackingMetadata{} - item.Value(func(val []byte) error { + err2 := item.Value(func(val []byte) error { logrus.Debugln(string(val)) - err := json.Unmarshal(val, &el) - return err + err3 := json.Unmarshal(val, &el) + return err3 }) + if err2 != nil { + logrus.Warn(err2) + } + result = append(result, el) } return nil @@ -135,31 +161,95 @@ func (r *badgerStore) LoadAll() ([]core.TrackingMetadata, error) { return result, nil } -func (r *badgerStore) Load(id uuid.UUID) (core.Tracking, error) { - panic("implement me") +func (r *badgerStore) Load(id uuid.UUID) (*core.Tracking, error) { + logrus.Debugln("try to load from db...", id) + if ok := r.isDbClosed(); ok { + logrus.Error("unable to read from database. database closed!") + err := r.openDBs() + if err != nil { + return nil, err + } + } + t := &core.Tracking{ + TrackingMetadata: core.TrackingMetadata{}, + //Records: []core.recordPair{}, + //Rawdata: nil, + } + err := r.trackingsDb.View(func(txn *badger.Txn) error { + item, err2 := txn.Get([]byte(id.String())) + if err2 != nil { + return err2 + } + err2 = item.Value(func(val []byte) error { + err3 := json.Unmarshal(val, &t.TrackingMetadata) + return err3 + }) + return err2 + }) + if err != nil { + logrus.Error(err) + } + err = r.recordsDb.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.Prefix = []byte(id.String()) + it := txn.NewIterator(opts) + defer it.Close() + + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + _, recTime := unmarshalDataKey(item.Key()) + el := core.SensorPair{} + el.RecordTime = recTime + err2 := item.Value(func(val []byte) error { + logrus.Traceln(string(val)) + err3 := json.Unmarshal(val, &el.Data) + logrus.Traceln(err3, el) + return err3 + }) + if err2 != nil { + logrus.Warn(err2) + } + + t.Records = append(t.Records, el) + } + return nil + }) + if err != nil { + logrus.Error(err) + } + + // implement retrieval of raw data only if needed + + return t, nil } -func createRecordKey(uid uuid.UUID, timestamp int64) []byte { - prefix, err := uid.MarshalText() - if err != nil || timestamp < 0 { - logrus.Error("unable to create key", err) +func createRecordKey(uid uuid.UUID, timestamp time.Time) []byte { + prefix := []byte(uid.String()) + suffix := []byte(timestamp.Format(time.RFC3339Nano)) + if timestamp.IsZero() { + err := errors.New("zero value detected") + logrus.Errorln("unable to create key", err) } - suffix := make([]byte, 8) - binary.BigEndian.PutUint64(suffix, uint64(timestamp)) - + logrus.Traceln("save as:", string(prefix), string(suffix)) + //binary.BigEndian.PutUint64(suffix, uint64(timestamp.UnixNano())) return append(prefix, suffix...) } -func unmarshalDataKey(key []byte) (uuid.UUID, int64) { - if len(key) != 24 { - panic("corrupted key") - } - prefix := key[0:15] - suffix := key[15:24] - uid, err := uuid.FromBytes(prefix) +func unmarshalDataKey(key []byte) (uuid.UUID, time.Time) { + logrus.Trace("key len ->", len(key)) + prefix := key[:36] + suffix := key[36:] + logrus.Traceln("load as:", string(prefix), string(suffix)) + uid, err := uuid.Parse(string(prefix)) if err != nil { - panic("corrupted key") + logrus.Errorln("corrupted key", err) } - timestamp := int64(binary.BigEndian.Uint64(suffix)) + timestamp, err := time.Parse(time.RFC3339Nano, string(suffix)) + if err != nil { + logrus.Errorln("corrupted key", err) + } + logrus.Traceln(uid, timestamp) + + //timestamp := time.Unix(0, int64(binary.BigEndian.Uint64(suffix))) return uid, timestamp } diff --git a/ublox/decode.go b/ublox/decode.go index 5e98eba..3e0ccab 100644 --- a/ublox/decode.go +++ b/ublox/decode.go @@ -6,152 +6,152 @@ package ublox import ( - "bufio" - "bytes" - "encoding/binary" - "errors" - "fmt" - "io" + "bufio" + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" ) // A decoder scans an io stream into UBX (0xB5-0x62 separated) or NMEA ("$xxx,,,,*FF\r\n") frames. // If you have an unmixed stream of NMEA-only data you can use nmea.Decode() on bufio.Scanner.Bytes() directly. type decoder struct { - s *bufio.Scanner + s *bufio.Scanner } // NewDecoder creates a new bufio Scanner with a splitfunc that can handle both UBX and NMEA frames. func NewDecoder(r io.Reader) *decoder { - d := bufio.NewScanner(r) - d.Split(splitFunc) - return &decoder{s: d} + d := bufio.NewScanner(r) + d.Split(splitFunc) + return &decoder{s: d} } // Assume we're either at the start of an NMEA sentence or at the start of a UBX message // if not, skip to the first $ or UBX SOM. func splitFunc(data []byte, atEOF bool) (advance int, token []byte, err error) { - if len(data) == 0 { - return 0, nil, nil - } + if len(data) == 0 { + return 0, nil, nil + } - switch data[0] { - case '$': - return bufio.ScanLines(data, atEOF) + switch data[0] { + case '$': + return bufio.ScanLines(data, atEOF) - case 0xB5: - if len(data) < 8 { - if atEOF { - return len(data), nil, io.ErrUnexpectedEOF - } - return 0, nil, nil - } + case 0xB5: + if len(data) < 8 { + if atEOF { + return len(data), nil, io.ErrUnexpectedEOF + } + return 0, nil, nil + } - sz := 8 + int(data[4]) + int(data[5])*256 - if data[1] == 0x62 { - if sz <= len(data) { - return sz, data[:sz], nil - } - if sz <= bufio.MaxScanTokenSize { - return 0, nil, nil - } - } - } + sz := 8 + int(data[4]) + int(data[5])*256 + if data[1] == 0x62 { + if sz <= len(data) { + return sz, data[:sz], nil + } + if sz <= bufio.MaxScanTokenSize { + return 0, nil, nil + } + } + } - // resync to SOM or $ - data = data[1:] - i1 := bytes.IndexByte(data, '$') - if i1 < 0 { - i1 = len(data) - } + // resync to SOM or $ + data = data[1:] + i1 := bytes.IndexByte(data, '$') + if i1 < 0 { + i1 = len(data) + } - i2 := bytes.IndexByte(data, 0xB5) - if i2 < 0 { - i2 = len(data) - } - if i1 > i2 { - i1 = i2 - } - return 1 + i1, nil, nil + i2 := bytes.IndexByte(data, 0xB5) + if i2 < 0 { + i2 = len(data) + } + if i1 > i2 { + i1 = i2 + } + return 1 + i1, nil, nil } // Decode reads on NMEA or UBX frame and calls decodeUbx accordingly to parse the message, while skipping NMEA. func (d *decoder) Decode() (msg interface{}, err error) { - if !d.s.Scan() { - if err = d.s.Err(); err == nil { - err = io.EOF - } - return nil, err - } + if !d.s.Scan() { + if err = d.s.Err(); err == nil { + err = io.EOF + } + return nil, err + } - switch d.s.Bytes()[0] { - case '$': - return nil, errors.New("NMEA not implemented") - //return nmea.Decode(d.s.Bytes()) - case 0xB5: - return decodeUbx(d.s.Bytes()) - } - panic("impossible frame") + switch d.s.Bytes()[0] { + case '$': + return nil, errors.New("NMEA not implemented") + //return nmea.Decode(d.s.Bytes()) + case 0xB5: + return decodeUbx(d.s.Bytes()) + } + panic("impossible frame") } var ( - errInvalidFrame = errors.New("invalid UBX frame") - errInvalidChkSum = errors.New("invalid UBX checksum") + errInvalidFrame = errors.New("invalid UBX frame") + errInvalidChkSum = errors.New("invalid UBX checksum") ) func decodeUbx(frame []byte) (msg Message, err error) { - buf := bytes.NewReader(frame) + buf := bytes.NewReader(frame) - var header struct { - Preamble uint16 - ClassID uint16 - Length uint16 - } + var header struct { + Preamble uint16 + ClassID uint16 + Length uint16 + } - if err := binary.Read(buf, binary.LittleEndian, &header); err != nil { - return nil, err - } + if err := binary.Read(buf, binary.LittleEndian, &header); err != nil { + return nil, err + } - if header.Preamble != 0x62B5 { - return nil, errInvalidFrame - } + if header.Preamble != 0x62B5 { + return nil, errInvalidFrame + } - if buf.Len()+2 < int(header.Length) { - return nil, io.ErrShortBuffer - } + if buf.Len()+2 < int(header.Length) { + return nil, io.ErrShortBuffer + } - var a, b byte - for _, v := range frame[2 : header.Length+6] { - a += byte(v) - b += a - } + var a, b byte + for _, v := range frame[2 : header.Length+6] { + a += byte(v) + b += a + } - if frame[header.Length+6] != a || frame[header.Length+7] != b { - return nil, errInvalidChkSum - } + if frame[header.Length+6] != a || frame[header.Length+7] != b { + return nil, errInvalidChkSum + } - switch header.ClassID { - case 0x0105: // ACK-ACK - fmt.Println("ACK-ACK not implemented") - //msg = &AckAck{} - case 0x0005: // ACK-NAK - fmt.Println("ACK-NAK not implemented") - //msg = &AckNak{} - case 0x0701: // NAV-PVT - msg = &NavPvt{} - case 0x0028: // HNR-PVT - msg = &HnrPvt{} - case 0x0501: // NAV-ATT - msg = &NavAtt{} - default: - } - if msg != nil { - err = binary.Read(buf, binary.LittleEndian, msg) - } else { - msg = &RawMessage{classID: header.ClassID, Data: append([]byte(nil), frame[6:len(frame)-2]...)} - } - //fmt.Println(msg) + switch header.ClassID { + case 0x0105: // ACK-ACK + fmt.Println("ACK-ACK not implemented") + //msg = &AckAck{} + case 0x0005: // ACK-NAK + fmt.Println("ACK-NAK not implemented") + //msg = &AckNak{} + case 0x0701: // NAV-PVT + msg = &NavPvt{} + case 0x0028: // HNR-PVT + msg = &HnrPvt{} + case 0x0501: // NAV-ATT + msg = &NavAtt{} + default: + } + if msg != nil { + err = binary.Read(buf, binary.LittleEndian, msg) + } else { + msg = &RawMessage{classID: header.ClassID, Data: append([]byte(nil), frame[6:len(frame)-2]...)} + } + //fmt.Println(msg) - return msg, err + return msg, err } diff --git a/ublox/messages.go b/ublox/messages.go index fbebb09..610f7b2 100644 --- a/ublox/messages.go +++ b/ublox/messages.go @@ -1,7 +1,7 @@ package ublox type Message interface { - ClassID() uint16 + ClassID() uint16 } //type UbxMessage interface { @@ -11,90 +11,90 @@ type Message interface { //} type RawMessage struct { - classID uint16 - Data []byte + classID uint16 + Data []byte } func (msg *RawMessage) ClassID() uint16 { return msg.classID } type NavPvt struct { - ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details. - Year_y uint16 // - Year (UTC) - Month_month byte // - Month, range 1..12 (UTC) - Day_d byte // - Day of month, range 1..31 (UTC) - Hour_h byte // - Hour of day, range 0..23 (UTC) - Min_min byte // - Minute of hour, range 0..59 (UTC) - Sec_s byte // - Seconds of minute, range 0..60 (UTC) - Valid NavPVTValid // - Validity flags (see graphic below) - TAcc_ns uint32 // - Time accuracy estimate (UTC) - Nano_ns int32 // - Fraction of second, range -1e9 .. 1e9 (UTC) - FixType NavPVTFixType // - GNSSfix Type - Flags NavPVTFlags // - Fix status flags (see graphic below) - Flags2 NavPVTFlags2 // - Additional flags (see graphic below) - NumSV byte // - Number of satellites used in Nav Solution - Lon_dege7 int32 // 1e-7 Longitude - Lat_dege7 int32 // 1e-7 Latitude - Height_mm int32 // - Height above ellipsoid - HMSL_mm int32 // - Height above mean sea level - HAcc_mm uint32 // - Horizontal accuracy estimate - VAcc_mm uint32 // - Vertical accuracy estimate - VelN_mm_s int32 // - NED north velocity - VelE_mm_s int32 // - NED east velocity - VelD_mm_s int32 // - NED down velocity - GSpeed_mm_s int32 // - Ground Speed (2-D) - HeadMot_dege5 int32 // 1e-5 Heading of motion (2-D) - SAcc_mm_s uint32 // - Speed accuracy estimate - HeadAcc_dege5 uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle) - PDOPe2 uint16 // 0.01 Position DOP - Flags3 NavPVTFlags3 // - Additional flags (see graphic below) - Reserved1 [5]byte // - Reserved - HeadVeh_dege5 int32 // 1e-5 Heading of vehicle (2-D), this is only valid when headVehValid is set, otherwise the output is set to the heading of motion - MagDec_dege2 int16 // 1e-2 Magnetic declination. Only supported in ADR 4.10 and later. - MagAcc_deg2e uint16 // 1e-2 Magnetic declination accuracy. Only supported in ADR 4.10 and later. + ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details. + Year_y uint16 // - Year (UTC) + Month_month byte // - Month, range 1..12 (UTC) + Day_d byte // - Day of month, range 1..31 (UTC) + Hour_h byte // - Hour of day, range 0..23 (UTC) + Min_min byte // - Minute of hour, range 0..59 (UTC) + Sec_s byte // - Seconds of minute, range 0..60 (UTC) + Valid NavPVTValid // - Validity flags (see graphic below) + TAcc_ns uint32 // - Time accuracy estimate (UTC) + Nano_ns int32 // - Fraction of second, range -1e9 .. 1e9 (UTC) + FixType NavPVTFixType // - GNSSfix Type + Flags NavPVTFlags // - Fix status flags (see graphic below) + Flags2 NavPVTFlags2 // - Additional flags (see graphic below) + NumSV byte // - Number of satellites used in Nav Solution + Lon_dege7 int32 // 1e-7 Longitude + Lat_dege7 int32 // 1e-7 Latitude + Height_mm int32 // - Height above ellipsoid + HMSL_mm int32 // - Height above mean sea level + HAcc_mm uint32 // - Horizontal accuracy estimate + VAcc_mm uint32 // - Vertical accuracy estimate + VelN_mm_s int32 // - NED north velocity + VelE_mm_s int32 // - NED east velocity + VelD_mm_s int32 // - NED down velocity + GSpeed_mm_s int32 // - Ground Speed (2-D) + HeadMot_dege5 int32 // 1e-5 Heading of motion (2-D) + SAcc_mm_s uint32 // - Speed accuracy estimate + HeadAcc_dege5 uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle) + PDOPe2 uint16 // 0.01 Position DOP + Flags3 NavPVTFlags3 // - Additional flags (see graphic below) + Reserved1 [5]byte // - Reserved + HeadVeh_dege5 int32 // 1e-5 Heading of vehicle (2-D), this is only valid when headVehValid is set, otherwise the output is set to the heading of motion + MagDec_dege2 int16 // 1e-2 Magnetic declination. Only supported in ADR 4.10 and later. + MagAcc_deg2e uint16 // 1e-2 Magnetic declination accuracy. Only supported in ADR 4.10 and later. } func (NavPvt) ClassID() uint16 { return 0x0701 } type HnrPvt struct { - ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details. - Year_y uint16 // - Year (UTC) - Month_month byte // - Month, range 1..12 (UTC) - Day_d byte // - Day of month, range 1..31 (UTC) - Hour_h byte // - Hour of day, range 0..23 (UTC) - Min_min byte // - Minute of hour, range 0..59 (UTC) - Sec_s byte // - Seconds of minute, range 0..60 (UTC) - Valid byte // - Validity flags (see graphic below) - Nano_ns int32 // - Fraction of second, range -1e9 .. 1e9 (UTC) - FixType byte // - GNSSfix Type - Flags byte // - Fix status flags (see graphic below) - Reserved [2]byte - Lon_dege7 int32 // 1e-7 Longitude - Lat_dege7 int32 // 1e-7 Latitude - Height_mm int32 // - Height above ellipsoid - HMSL_mm int32 // - Height above mean sea level - GSpeed_mm_s int32 // - Ground Speed (2-D) - Speed_mm_s int32 // Speed (3-D) - HeadMot_dege5 int32 // 1e-5 Heading of motion (2-D) - HeadVeh_dege5 int32 // 1e-5 Heading of vehicle (2-D), this is only valid when headVehValid is set, otherwise the output is set to the heading of motion - HAcc uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle) - VAcc uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle) - SAcc uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle) - HeadAcc_dege5 uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle) - Reserved1 [4]byte // - Reserved + ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details. + Year_y uint16 // - Year (UTC) + Month_month byte // - Month, range 1..12 (UTC) + Day_d byte // - Day of month, range 1..31 (UTC) + Hour_h byte // - Hour of day, range 0..23 (UTC) + Min_min byte // - Minute of hour, range 0..59 (UTC) + Sec_s byte // - Seconds of minute, range 0..60 (UTC) + Valid byte // - Validity flags (see graphic below) + Nano_ns int32 // - Fraction of second, range -1e9 .. 1e9 (UTC) + FixType byte // - GNSSfix Type + Flags byte // - Fix status flags (see graphic below) + Reserved [2]byte + Lon_dege7 int32 // 1e-7 Longitude + Lat_dege7 int32 // 1e-7 Latitude + Height_mm int32 // - Height above ellipsoid + HMSL_mm int32 // - Height above mean sea level + GSpeed_mm_s int32 // - Ground Speed (2-D) + Speed_mm_s int32 // Speed (3-D) + HeadMot_dege5 int32 // 1e-5 Heading of motion (2-D) + HeadVeh_dege5 int32 // 1e-5 Heading of vehicle (2-D), this is only valid when headVehValid is set, otherwise the output is set to the heading of motion + HAcc uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle) + VAcc uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle) + SAcc uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle) + HeadAcc_dege5 uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle) + Reserved1 [4]byte // - Reserved } func (HnrPvt) ClassID() uint16 { return 0x0028 } type NavAtt struct { - ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details. - Version byte - Reserved1 [3]byte - Roll_deg int32 - Pitch_deg int32 - Heading_deg int32 - AccRoll_deg uint32 - AccPitch_deg uint32 - AccHeading_deg uint32 + ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details. + Version byte + Reserved1 [3]byte + Roll_deg int32 + Pitch_deg int32 + Heading_deg int32 + AccRoll_deg uint32 + AccPitch_deg uint32 + AccHeading_deg uint32 } func (NavAtt) ClassID() uint16 { return 0x0501 } @@ -104,43 +104,43 @@ func (NavAtt) ClassID() uint16 { return 0x0501 } type NavPVTFixType byte const ( - NavPVTNoFix NavPVTFixType = iota - NavPVTDeadReckoning - NavPVTFix2D - NavPVTFix3D - NavPVTGNSS - NavPVTTimeOnly + NavPVTNoFix NavPVTFixType = iota + NavPVTDeadReckoning + NavPVTFix2D + NavPVTFix3D + NavPVTGNSS + NavPVTTimeOnly ) type NavPVTValid byte const ( - NavPVTValidDate NavPVTValid = (1 << iota) // valid UTC Date (see Time Validity section for details) - NavPVTValidTime // valid UTC time of day (see Time Validity section for details) - NavPVTFullyResolved // UTC time of day has been fully resolved (no seconds uncertainty). Cannot be used to check if time is completely solved. - NavPVTValidMag // valid magnetic declination + NavPVTValidDate NavPVTValid = (1 << iota) // valid UTC Date (see Time Validity section for details) + NavPVTValidTime // valid UTC time of day (see Time Validity section for details) + NavPVTFullyResolved // UTC time of day has been fully resolved (no seconds uncertainty). Cannot be used to check if time is completely solved. + NavPVTValidMag // valid magnetic declination ) type NavPVTFlags byte const ( - NavPVTGnssFixOK NavPVTFlags = 1 << 0 // valid fix (i.e within DOP & accuracy masks) - NavPVTDiffSoln NavPVTFlags = 1 << 1 // differential corrections were applied - NavPVTHeadVehValid NavPVTFlags = 1 << 5 // heading of vehicle is valid, only set if the receiver is in sensor fusion mode - NavPVTCarrSolnFloat NavPVTFlags = 1 << 6 // carrier phase range solution with floating ambiguities - NavPVTCarrSolnFixed NavPVTFlags = 1 << 7 // carrier phase range solution with fixed ambiguities + NavPVTGnssFixOK NavPVTFlags = 1 << 0 // valid fix (i.e within DOP & accuracy masks) + NavPVTDiffSoln NavPVTFlags = 1 << 1 // differential corrections were applied + NavPVTHeadVehValid NavPVTFlags = 1 << 5 // heading of vehicle is valid, only set if the receiver is in sensor fusion mode + NavPVTCarrSolnFloat NavPVTFlags = 1 << 6 // carrier phase range solution with floating ambiguities + NavPVTCarrSolnFixed NavPVTFlags = 1 << 7 // carrier phase range solution with fixed ambiguities ) type NavPVTFlags2 byte const ( - NavPVTConfirmedAvai NavPVTFlags2 = 1 << 5 // information about UTC Date and Time of Day validity confirmation is available (see Time Validity section for details) - NavPVTConfirmedDate NavPVTFlags2 = 1 << 6 // UTC Date validity could be confirmed (see Time Validity section for details) - NavPVTConfirmedTime NavPVTFlags2 = 1 << 7 // UTC Time of Day could be confirmed (see Time Validity section for details) + NavPVTConfirmedAvai NavPVTFlags2 = 1 << 5 // information about UTC Date and Time of Day validity confirmation is available (see Time Validity section for details) + NavPVTConfirmedDate NavPVTFlags2 = 1 << 6 // UTC Date validity could be confirmed (see Time Validity section for details) + NavPVTConfirmedTime NavPVTFlags2 = 1 << 7 // UTC Time of Day could be confirmed (see Time Validity section for details) ) type NavPVTFlags3 byte const ( - NavPVTInvalidLlh NavPVTFlags3 = (1 << iota) // 1 = Invalid lon, lat, height and hMSL + NavPVTInvalidLlh NavPVTFlags3 = (1 << iota) // 1 = Invalid lon, lat, height and hMSL ) diff --git a/web/http.go b/web/http.go index daee7fe..21552cc 100644 --- a/web/http.go +++ b/web/http.go @@ -6,6 +6,7 @@ import ( "github.com/gofiber/fiber/v2" "github.com/gofiber/template/html" "github.com/gofiber/websocket/v2" + "github.com/google/uuid" "github.com/sirupsen/logrus" ) @@ -32,8 +33,8 @@ func CreateServer(s core.Service, sub core.Subscriber, c *core.Configuration) { trackings.Put("/", stopRecordingHandler(s, c)) // Stops current recording. Returns trackingId if record was successful trackings.Delete("/", stopAllHandler(s, c)) // Stops websocket connection, pipelines and collectors - trackings.Get("/:trackingId", stubhander()) // Gets Tracking Metadata and loads sensorRecords from storage. - trackings.Delete("/:trackingId", stubhander()) // Deletes Tracking from storage + trackings.Get("/:trackingId", LoadTrackingHandler(s, c)) // Gets Tracking Metadata and loads sensorRecords from storage. + trackings.Delete("/:trackingId", stubhander()) // Deletes Tracking from storage trackings.Post("/current", stubhander()) // Starts Replay. trackings.Patch("/current", stubhander()) // Pauses Replay. @@ -47,6 +48,36 @@ func stubhander() fiber.Handler { return nil } } + +func LoadTrackingHandler(s core.Service, c *core.Configuration) fiber.Handler { + return func(ctx *fiber.Ctx) error { + trackId := ctx.Params("trackingId") + uid, err := uuid.Parse(trackId) + if err != nil { + logrus.Error(err) + ctx.Status(404).JSON(err) + return err + } + tracking, err := s.LoadTracking(uid) + if err != nil { + logrus.Error(err) + ctx.Status(404).JSON(err) + return err + } + prepres := map[string]interface{}{} + prepres["data"] = *tracking + if err != nil { + prepres["error"] = err.Error() + + } + err2 := ctx.JSON(prepres) + if err2 != nil { + ctx.Status(500).JSON(err2) + return err2 + } + return nil + } +} func allTrackingsHandler(s core.Service, c *core.Configuration) fiber.Handler { return func(ctx *fiber.Ctx) error { trackings, err := s.AllTrackings()