Merge branch 'develop' into frank-dev2

This commit is contained in:
unknown 2020-12-15 13:30:21 +01:00
commit fa1256b3c9
13 changed files with 560 additions and 405 deletions

View File

@ -11,6 +11,8 @@ import (
func main() { func main() {
conf := configurationFromFile() conf := configurationFromFile()
logrus.Debug(conf)
repo := storage.NewRepository(conf) repo := storage.NewRepository(conf)
disp := core.NewDispatcher() disp := core.NewDispatcher()

View File

@ -25,7 +25,7 @@ const (
var tcpSingleton *tcpCollector var tcpSingleton *tcpCollector
func NewCollector(typ CollectorType, proc Pusher, config *Configuration) Collector { func NewCollector(typ CollectorType, proc Processor, config *Configuration) Collector {
var coll Collector var coll Collector
switch typ { switch typ {
case SERIAL: case SERIAL:
@ -45,7 +45,7 @@ func NewCollector(typ CollectorType, proc Pusher, config *Configuration) Collect
type serialCollector struct { type serialCollector struct {
active bool active bool
proc Pusher proc Processor
config *Configuration config *Configuration
mu sync.RWMutex mu sync.RWMutex
} }
@ -121,7 +121,7 @@ func (s *serialCollector) Close() {
s.mu.Unlock() s.mu.Unlock()
} }
func newSerial(proc Pusher, config *Configuration) *serialCollector { func newSerial(proc Processor, config *Configuration) *serialCollector {
return &serialCollector{ return &serialCollector{
active: false, active: false,
proc: proc, proc: proc,
@ -131,7 +131,7 @@ func newSerial(proc Pusher, config *Configuration) *serialCollector {
type tcpCollector struct { type tcpCollector struct {
active bool active bool
processor Pusher processor Processor
//config *Configuration //config *Configuration
} }
@ -143,11 +143,11 @@ func (t *tcpCollector) Close() {
t.active = false t.active = false
} }
func (t *tcpCollector) SetProcessor(p Pusher) { func (t *tcpCollector) SetProcessor(p Processor) {
t.processor = p t.processor = p
} }
func newTcp(proc Pusher, config *Configuration) *tcpCollector { func newTcp(proc Processor, config *Configuration) *tcpCollector {
logrus.Println("start tcp collector") logrus.Println("start tcp collector")
listener, err := net.Listen("tcp", config.Collectors.TcpCollectorPort) listener, err := net.Listen("tcp", config.Collectors.TcpCollectorPort)

View File

@ -18,8 +18,8 @@ func NewDispatcher() *dispatcher {
} }
func (d *dispatcher) Publish(message string) { func (d *dispatcher) Publish(message string) {
logrus.Debugf("publish to %v listeners\n", len(d.listeners)) logrus.Tracef("publishing to %v listeners\n", len(d.listeners))
logrus.Debug(message) logrus.Trace(message)
for _, ch := range d.listeners { for _, ch := range d.listeners {
ch <- message ch <- message
} }

View File

@ -11,17 +11,17 @@ type Publisher interface {
Publish(message string) Publish(message string)
} }
type Pusher interface { type Processor interface {
Push(data *sensorData) error Push(data *sensorData) error
} }
type Storer interface { type Storer interface {
EnqueuePair(tcp sensorData, ser sensorData) AddDataPair(tcp sensorData, ser sensorData)
EnqueueRaw(data sensorData) AddRaw(data sensorData)
} }
type Repo interface { type Repo interface {
Save(tracking Tracking) error Save(tracking Tracking) error
LoadAll() ([]TrackingMetadata, error) LoadAll() ([]TrackingMetadata, error)
Load(id uuid.UUID) (Tracking, error) Load(id uuid.UUID) (*Tracking, error)
} }

View File

@ -24,7 +24,7 @@ type pipeline struct {
sema *semaphore.Weighted sema *semaphore.Weighted
} }
// pipe implements Runner & Pusher // pipe implements Runner & Processor
func NewPipeline(d Publisher, s Storer, conf *Configuration) *pipeline { func NewPipeline(d Publisher, s Storer, conf *Configuration) *pipeline {
return &pipeline{ return &pipeline{
false, false,
@ -51,30 +51,35 @@ func (p *pipeline) isPipeActive() bool {
defer p.mu.RUnlock() defer p.mu.RUnlock()
return p.active return p.active
} }
func (p *pipeline) isPipeRecording() bool {
p.mu.RLock()
defer p.mu.RUnlock()
return p.record
}
func (p *pipeline) Run() { func (p *pipeline) Run() {
p.sema.Acquire(context.Background(), 2) p.sema.Acquire(context.Background(), 1) // !!! n=2 wenn synchronizer mitläuft
p.mu.Lock() p.mu.Lock()
p.active = true p.active = true
p.mu.Unlock() p.mu.Unlock()
logrus.Println("pipe: processing service started") logrus.Println("pipe: processing service started")
go func() { //go func() {
for p.isPipeActive() { // for p.isPipeActive() {
<-p.synchroniz.updateTicker.C // <-p.synchroniz.updateTicker.C
err := p.refreshDelay() // err := p.refreshDelay()
if err != nil { // if err != nil {
logrus.Debugln(err) // logrus.Debugln(err)
} // }
} // }
p.sema.Release(1) // p.sema.Release(1)
logrus.Println("pipe: updater stopped") // logrus.Println("pipe: updater stopped")
}() //}()
go func() { go func() {
for p.isPipeActive() { for p.isPipeActive() {
<-p.publishTicker.C <-p.publishTicker.C
err := p.publish() err := p.publish()
if err != nil && err.Error() != "no data available" { if err != nil && err.Error() != "no data available" {
logrus.Debug(err) logrus.Trace(err)
} }
} }
p.sema.Release(1) p.sema.Release(1)
@ -89,9 +94,25 @@ func (p *pipeline) StopRecord() {
p.record = false p.record = false
} }
func (p *pipeline) Push(data *sensorData) error {
if (data == nil || *data == sensorData{}) {
return errors.New("no data")
}
//logrus.Println("push data to pipe:", string(data.Source))
switch data.Source {
case SOURCE_TCP:
go p.pushTcpDataToBuffer(*data)
case SOURCE_SERIAL:
go p.pushSerialDataToBuffer(*data)
default:
panic("pipe: invalid data Source")
}
return nil
}
func (p *pipeline) publish() error { func (p *pipeline) publish() error {
p.buffer.tcpMutex.Lock()
p.buffer.serialMutex.Lock() p.buffer.serialMutex.Lock()
p.buffer.tcpMutex.Lock()
if (p.buffer.MeasTcp == sensorData{} && p.buffer.MeasSerial == sensorData{}) { if (p.buffer.MeasTcp == sensorData{} && p.buffer.MeasSerial == sensorData{}) {
p.buffer.tcpMutex.Unlock() p.buffer.tcpMutex.Unlock()
@ -104,21 +125,25 @@ func (p *pipeline) publish() error {
p.buffer.serialMutex.Unlock() p.buffer.serialMutex.Unlock()
return errors.New("same data") return errors.New("same data")
} }
logrus.Debug("") logrus.Debugf("")
logrus.Debugf("SER old: %v", p.buffer.LastMeasSerial) logrus.Tracef("SER old: %-40s %-40s %v %v", p.buffer.LastMeasSerial.Timestamp.Format(time.RFC3339Nano), p.buffer.LastMeasSerial.ServerTime.Format(time.RFC3339Nano), p.buffer.LastMeasSerial.Position, p.buffer.LastMeasSerial.Orientation)
logrus.Debugf("SER new: %v", p.buffer.MeasSerial) logrus.Debugf("SER new: %-40s %-40s %v %v", p.buffer.MeasSerial.Timestamp.Format(time.RFC3339Nano), p.buffer.MeasSerial.ServerTime.Format(time.RFC3339Nano), p.buffer.MeasSerial.Position, p.buffer.MeasSerial.Orientation)
logrus.Debugf("TCP old: %v", p.buffer.LastMeasTcp) logrus.Tracef("TCP old: %-40s %-40s %v %v", p.buffer.LastMeasTcp.Timestamp.Format(time.RFC3339Nano), p.buffer.LastMeasTcp.ServerTime.Format(time.RFC3339Nano), p.buffer.LastMeasTcp.Position, p.buffer.LastMeasTcp.Orientation)
logrus.Debugf("TCP new: %v", p.buffer.MeasTcp) logrus.Debugf("TCP new: %-40s %-40s %v %v", p.buffer.MeasTcp.Timestamp.Format(time.RFC3339Nano), p.buffer.MeasTcp.ServerTime.Format(time.RFC3339Nano), p.buffer.MeasTcp.Position, p.buffer.MeasTcp.Orientation)
logrus.Debug("")
p.buffer.LastMeasTcp = p.buffer.MeasTcp p.buffer.LastMeasTcp = p.buffer.MeasTcp
p.buffer.LastMeasSerial = p.buffer.MeasSerial p.buffer.LastMeasSerial = p.buffer.MeasSerial
p.storer.EnqueuePair(p.buffer.MeasTcp, p.buffer.MeasSerial)
data := map[string]sensorData{ if p.isPipeRecording() {
string(SOURCE_TCP): p.buffer.MeasTcp, p.storer.AddDataPair(p.buffer.MeasTcp, p.buffer.MeasSerial)
string(SOURCE_SERIAL): p.buffer.MeasSerial,
} }
data := map[string]interface{}{}
if p.buffer.MeasTcp.Source == SOURCE_TCP {
data[string(SOURCE_TCP)] = p.buffer.MeasTcp
}
if p.buffer.MeasSerial.Source == SOURCE_SERIAL {
data[string(SOURCE_SERIAL)] = p.buffer.MeasSerial
}
p.buffer.tcpMutex.Unlock() p.buffer.tcpMutex.Unlock()
p.buffer.serialMutex.Unlock() p.buffer.serialMutex.Unlock()
@ -149,74 +174,65 @@ type synchronizer struct {
} }
func (p *pipeline) refreshDelay() error { func (p *pipeline) refreshDelay() error {
p.synchroniz.mutex.RLock() logrus.Debugf("refreshing delay...")
if p.synchroniz.tcpSerialDelayMs != 0 {
logrus.Println("Delay TCP/SERIAL", p.synchroniz.tcpSerialDelayMs)
}
p.synchroniz.mutex.RUnlock()
p.buffer.serialMutex.Lock() p.buffer.serialMutex.Lock()
p.buffer.tcpMutex.Lock() p.buffer.tcpMutex.Lock()
tcpTime := time.Unix(0, p.buffer.MeasTcp.Timestamp) tcpTime := p.buffer.MeasTcp.Timestamp
serTime := time.Unix(0, p.buffer.MeasSerial.Timestamp) serTime := p.buffer.MeasSerial.Timestamp
p.buffer.tcpMutex.Unlock() p.buffer.tcpMutex.Unlock()
p.buffer.serialMutex.Unlock() p.buffer.serialMutex.Unlock()
if tcpTime.UnixNano() == 0 || serTime.UnixNano() == 0 {
return errors.New("no sync possible. check if both Collectors running. otherwise check GPS fix") if tcpTime.IsZero() || serTime.IsZero() || tcpTime.UnixNano() == 0 || serTime.UnixNano() == 0 {
return errors.New("sync not possible. zero time value detected")
} }
logrus.Debug("TCP", tcpTime.Format(time.RFC3339Nano))
logrus.Debug("SER", serTime.Format(time.RFC3339Nano))
currentDelay := tcpTime.Sub(serTime).Milliseconds() currentDelay := tcpTime.Sub(serTime).Milliseconds()
if currentDelay > 5000 || currentDelay < -5000 {
p.synchroniz.mutex.Lock() p.synchroniz.mutex.Lock()
defer p.synchroniz.mutex.Unlock()
logrus.Debugf("old delay-> %vms...", p.synchroniz.tcpSerialDelayMs)
if currentDelay > 5000 || currentDelay < -5000 {
p.synchroniz.tcpSerialDelayMs = 0 p.synchroniz.tcpSerialDelayMs = 0
p.synchroniz.mutex.Unlock()
return errors.New("skipping synchronisation! time not properly configured or facing network problems.") return errors.New("skipping synchronisation! time not properly configured or facing network problems.")
} }
logrus.Debug("TCP", tcpTime.String()) p.synchroniz.tcpSerialDelayMs += currentDelay
logrus.Debug("SER", serTime.String()) logrus.Infof("new delay-> %vms", p.synchroniz.tcpSerialDelayMs)
logrus.Debug("Difference", tcpTime.Sub(serTime).Milliseconds(), "ms")
delay := tcpTime.Sub(serTime).Milliseconds()
p.synchroniz.mutex.Lock()
p.synchroniz.tcpSerialDelayMs += delay
p.synchroniz.mutex.Unlock()
return nil
}
func (p *pipeline) Push(data *sensorData) error {
if data == nil {
return errors.New("nil processing not allowed")
}
//logrus.Println("push data to pipe:", string(data.source))
if p.isPipeActive() {
p.storer.EnqueueRaw(*data)
}
switch data.source {
case SOURCE_TCP:
go p.pushTcpDataToBuffer(*data)
case SOURCE_SERIAL:
go p.pushSerialDataToBuffer(*data)
default:
panic("pipe: invalid data source")
}
return nil return nil
} }
func (p *pipeline) pushTcpDataToBuffer(data sensorData) { func (p *pipeline) pushTcpDataToBuffer(data sensorData) {
data.ServerTime = time.Now().UTC()
if p.isPipeRecording() {
p.storer.AddRaw(data)
}
p.synchroniz.mutex.RLock() p.synchroniz.mutex.RLock()
if p.synchroniz.tcpSerialDelayMs > 0 { if p.synchroniz.tcpSerialDelayMs > 0 {
time.Sleep(time.Duration(p.synchroniz.tcpSerialDelayMs) * time.Millisecond) time.Sleep(time.Duration(p.synchroniz.tcpSerialDelayMs) * time.Millisecond)
} }
p.synchroniz.mutex.RLock() p.synchroniz.mutex.RUnlock()
p.buffer.tcpMutex.Lock() p.buffer.tcpMutex.Lock()
p.buffer.MeasTcp = p.buffer.MeasTcp.ConsolidateExTime(data) p.buffer.MeasTcp = data
//p.buffer.MeasTcp = p.buffer.MeasTcp.ConsolidateExTime(data)
p.buffer.tcpMutex.Unlock() p.buffer.tcpMutex.Unlock()
} }
func (p *pipeline) pushSerialDataToBuffer(data sensorData) { func (p *pipeline) pushSerialDataToBuffer(data sensorData) {
data.ServerTime = time.Now().UTC()
if p.isPipeRecording() {
p.storer.AddRaw(data)
}
p.synchroniz.mutex.RLock() p.synchroniz.mutex.RLock()
if p.synchroniz.tcpSerialDelayMs < 0 { if p.synchroniz.tcpSerialDelayMs < 0 {
time.Sleep(time.Duration(-p.synchroniz.tcpSerialDelayMs) * time.Millisecond) time.Sleep(time.Duration(-p.synchroniz.tcpSerialDelayMs) * time.Millisecond)
} }
p.synchroniz.mutex.RUnlock() p.synchroniz.mutex.RUnlock()
p.buffer.serialMutex.Lock() p.buffer.serialMutex.Lock()
p.buffer.MeasSerial = p.buffer.MeasSerial.ConsolidateEpochsOnly(data) p.buffer.MeasSerial = data
//p.buffer.MeasSerial = p.buffer.MeasSerial.ConsolidateEpochsOnly(data)
p.buffer.serialMutex.Unlock() p.buffer.serialMutex.Unlock()
} }

View File

@ -16,11 +16,13 @@ const (
SOURCE_SERIAL sourceId = "SOURCE_SERIAL" SOURCE_SERIAL sourceId = "SOURCE_SERIAL"
) )
var timeex int64
type sensorData struct { type sensorData struct {
itow uint32 itow uint32
source sourceId Source sourceId
ServerTime time.Time ServerTime time.Time
Timestamp int64 Timestamp time.Time
Position [3]float64 Position [3]float64
Orientation [3]float64 Orientation [3]float64
} }
@ -66,7 +68,7 @@ func (s sensorData) ConsolidateExTime(n sensorData) sensorData {
} }
func (s *sensorData) checkSources(n *sensorData) { func (s *sensorData) checkSources(n *sensorData) {
if (s.source != n.source && *s != sensorData{}) { if (s.Source != n.Source && *s != sensorData{}) {
logrus.Println(s) logrus.Println(s)
logrus.Println(n) logrus.Println(n)
logrus.Fatalln("Do not consolidate sensorData from different Sources") logrus.Fatalln("Do not consolidate sensorData from different Sources")
@ -80,21 +82,21 @@ var (
func ConvertUbxToSensorData(msg interface{}) (*sensorData, error) { func ConvertUbxToSensorData(msg interface{}) (*sensorData, error) {
sd := &sensorData{ sd := &sensorData{
ServerTime: time.Now(), //ServerTime: time.Now().UTC(),
source: SOURCE_SERIAL, Source: SOURCE_SERIAL,
} }
switch v := msg.(type) { switch v := msg.(type) {
case *ublox.NavPvt: case *ublox.NavPvt:
//logrus.Println("NAV-PVT") //logrus.Println("NAV-PVT")
sd.itow = v.ITOW_ms sd.itow = v.ITOW_ms
sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC).UnixNano() sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC)
sd.Position[0] = float64(v.Lat_dege7) / 1e+7 sd.Position[0] = float64(v.Lat_dege7) / 1e+7
sd.Position[1] = float64(v.Lon_dege7) / 1e+7 sd.Position[1] = float64(v.Lon_dege7) / 1e+7
sd.Position[2] = float64(v.HMSL_mm) / 1e+3 // mm in m sd.Position[2] = float64(v.HMSL_mm) / 1e+3 // mm in m
case *ublox.HnrPvt: case *ublox.HnrPvt:
//logrus.Println("HNR-PVT") //logrus.Println("HNR-PVT")
sd.itow = v.ITOW_ms sd.itow = v.ITOW_ms
sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC).UnixNano() sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC)
sd.Position[0] = float64(v.Lat_dege7) / 1e+7 sd.Position[0] = float64(v.Lat_dege7) / 1e+7
sd.Position[1] = float64(v.Lon_dege7) / 1e+7 sd.Position[1] = float64(v.Lon_dege7) / 1e+7
sd.Position[2] = float64(v.HMSL_mm) / 1e+3 // mm in m sd.Position[2] = float64(v.HMSL_mm) / 1e+3 // mm in m
@ -130,16 +132,24 @@ func convertIPhoneSensorLog(jsonData []byte) (*sensorData, error) {
pitch := gjson.Get(string(jsonData), "motionPitch").Float() * 180 / math.Pi pitch := gjson.Get(string(jsonData), "motionPitch").Float() * 180 / math.Pi
roll := gjson.Get(string(jsonData), "motionRoll").Float() * 180 / math.Pi roll := gjson.Get(string(jsonData), "motionRoll").Float() * 180 / math.Pi
yaw := gjson.Get(string(jsonData), "motionYaw").Float() * 180 / math.Pi yaw := gjson.Get(string(jsonData), "motionYaw").Float() * 180 / math.Pi
var ts time.Time
if timestamp != 0 {
ts = time.Unix(0, int64(timestamp*float64(time.Second))).UTC()
timeex = time.Now().UnixNano() - ts.UnixNano()
} else if timeex != 0 {
ts = time.Now().Add(time.Duration(timeex)).UTC()
}
//if ts == time.Date()
sd := &sensorData{ sd := &sensorData{
ServerTime: time.Now(), //ServerTime: time.Now().UTC(),
source: SOURCE_TCP, Source: SOURCE_TCP,
Timestamp: int64(timestamp * float64(time.Second)), Timestamp: ts,
Position: [3]float64{lat, lon, alt}, Position: [3]float64{lat, lon, alt},
Orientation: [3]float64{pitch, roll, yaw}, Orientation: [3]float64{pitch, roll, yaw},
//Timestamp: time.Unix(0, prep.Timestamp * int64(time.Millisecond)),
} }
//logrus.Println(string(pretty.Pretty(jsonData))) if (*sd == sensorData{}) {
//logrus.Println(sd) return nil, errors.New("iphone sensorlog: convert empty")
}
return sd, nil return sd, nil
} }
@ -153,12 +163,14 @@ func convertAndroidHyperImu(jsonData []byte) (*sensorData, error) {
yaw := gjson.Get(string(jsonData), "orientation.2").Float() yaw := gjson.Get(string(jsonData), "orientation.2").Float()
sd := &sensorData{ sd := &sensorData{
ServerTime: time.Now(), //ServerTime: time.Now().UTC(),
source: SOURCE_TCP, Source: SOURCE_TCP,
Timestamp: timestamp * int64(time.Millisecond), Timestamp: time.Unix(0, timestamp*int64(time.Millisecond)).UTC(),
Position: [3]float64{lat, lon, alt}, Position: [3]float64{lat, lon, alt},
Orientation: [3]float64{pitch, roll, yaw}, Orientation: [3]float64{pitch, roll, yaw},
//Timestamp: time.Unix(0, prep.Timestamp * int64(time.Millisecond)), }
if (*sd == sensorData{}) {
return nil, errors.New("android hyperimu: convert empty")
} }
return sd, nil return sd, nil
} }

View File

@ -23,7 +23,7 @@ type Service interface {
StopRecord() (*TrackingMetadata, error) StopRecord() (*TrackingMetadata, error)
StopAll() (*TrackingMetadata, error) StopAll() (*TrackingMetadata, error)
LoadTracking(trackingId uuid.UUID) LoadTracking(trackingId uuid.UUID) (*Tracking, error)
DeleteTracking(trackingId uuid.UUID) DeleteTracking(trackingId uuid.UUID)
StartReplay() StartReplay()
@ -50,7 +50,8 @@ func TrackingService(r Repo, d Publisher, c *Configuration) *trackingService {
pipe: NewPipeline(d, t, c), pipe: NewPipeline(d, t, c),
collectors: nil, collectors: nil,
} }
//ts.pipe.Run() // first initialize of tcp collector to to open tcp port
NewCollector(TCP, nil, c)
return ts return ts
} }
@ -140,15 +141,21 @@ func (t *trackingService) StopAll() (*TrackingMetadata, error) {
e.Close() e.Close()
} }
if t.opMode == RECORDING { if t.opMode == RECORDING {
logrus.Warn("trackingservice: stop recording gracefully") logrus.Info("trackingservice: gracefully stop recording ")
tm, err = t.StopRecord() tm, err = t.StopRecord()
} }
t.opMode = STOPPED t.opMode = STOPPED
return tm, err return tm, err
} }
func (t *trackingService) LoadTracking(trackingId uuid.UUID) { func (t *trackingService) LoadTracking(trackingId uuid.UUID) (*Tracking, error) {
panic("implement me") logrus.Info("LOAD TRACKING from database")
tracking, err := t.repo.Load(trackingId)
if err != nil {
return nil, err
}
t.safelyReplaceTracking(*tracking)
return t.current, nil
} }
func (t *trackingService) DeleteTracking(trackingId uuid.UUID) { func (t *trackingService) DeleteTracking(trackingId uuid.UUID) {

View File

@ -9,8 +9,8 @@ import (
type Tracking struct { type Tracking struct {
TrackingMetadata TrackingMetadata
Records []recordPair Records []SensorPair
Rawdata []rawRecord Rawdata []sensorData
} }
var mRec sync.RWMutex var mRec sync.RWMutex
@ -22,41 +22,38 @@ type TrackingMetadata struct {
Collectors []CollectorType Collectors []CollectorType
} }
// persistence wrapper for sensordata type SensorPair struct {
type recordPair struct { RecordTime time.Time // uniqueness ensured through mutex
RecordTimeKey time.Time // uniqueness ensured through mutex Data map[sourceId]sensorData
DataPair map[sourceId]sensorData
} }
type rawRecord struct { //func (r *SensorPair) restoreDataPair(data []byte) error {
RecordTimeKey time.Time // uniqueness ensured through mutex // err := json.Unmarshal(data, &r.Data)
Data sensorData // return err
//}
func (s *Tracking) AddDataPair(tcp sensorData, ser sensorData) {
rp := SensorPair{
RecordTime: time.Now().UTC(),
Data: make(map[sourceId]sensorData, 2),
}
if tcp.Source == SOURCE_TCP {
rp.Data[tcp.Source] = tcp
}
if ser.Source == SOURCE_SERIAL {
rp.Data[ser.Source] = ser
} }
// END persistence wrapper for sensordata
func (s *Tracking) EnqueuePair(tcp sensorData, ser sensorData) {
rp := recordPair{
RecordTimeKey: time.Now(),
DataPair: map[sourceId]sensorData{
tcp.source: tcp,
ser.source: ser,
},
}
mRec.Lock() mRec.Lock()
s.Records = append(s.Records, rp) s.Records = append(s.Records, rp)
logrus.Debugln("tracking Records: len->", len(s.Records)) logrus.Traceln("tracking Records: len->", len(s.Records))
mRec.Unlock() mRec.Unlock()
} }
func (s *Tracking) EnqueueRaw(data sensorData) { func (s *Tracking) AddRaw(data sensorData) {
sr := rawRecord{
time.Now(),
data,
}
mRec.Lock() mRec.Lock()
s.Rawdata = append(s.Rawdata, sr) s.Rawdata = append(s.Rawdata, data)
logrus.Debugln("raw data points: len->", len(s.Rawdata)) logrus.Traceln("raw data points: len->", len(s.Rawdata))
mRec.Unlock() mRec.Unlock()
} }
@ -65,8 +62,8 @@ func emptyTracking() Tracking {
TrackingMetadata: TrackingMetadata{ TrackingMetadata: TrackingMetadata{
UUID: uuid.New(), UUID: uuid.New(),
}, },
Records: []recordPair{}, Records: []SensorPair{},
Rawdata: []rawRecord{}, Rawdata: []sensorData{},
} }
} }

View File

@ -1,8 +1,8 @@
package storage package storage
import ( import (
"encoding/binary"
"encoding/json" "encoding/json"
"errors"
"git.timovolkmann.de/gyrogpsc/core" "git.timovolkmann.de/gyrogpsc/core"
"github.com/dgraph-io/badger/v2" "github.com/dgraph-io/badger/v2"
"github.com/google/uuid" "github.com/google/uuid"
@ -10,7 +10,7 @@ import (
"github.com/tidwall/pretty" "github.com/tidwall/pretty"
"os" "os"
"path/filepath" "path/filepath"
"strconv" "time"
) )
// Must implement Repo // Must implement Repo
@ -27,30 +27,46 @@ func NewRepository(c *core.Configuration) *badgerStore {
os.Mkdir(filepath.Join(dir, "_db"), os.ModePerm) os.Mkdir(filepath.Join(dir, "_db"), os.ModePerm)
} }
tr, err := badger.Open(badger.DefaultOptions("_db/trackings")) bs := &badgerStore{}
dp, err := badger.Open(badger.DefaultOptions("_db/records")) err := bs.openDBs()
rd, err := badger.Open(badger.DefaultOptions("_db/raw"))
if err != nil { if err != nil {
logrus.Error(err) logrus.Error(err)
} }
return &badgerStore{trackingsDb: tr, recordsDb: dp, rawdataDb: rd} return bs
} }
func (r *badgerStore) isDbAvailable() bool { func (r *badgerStore) openDBs() error {
var err error
r.trackingsDb, err = badger.Open(badger.DefaultOptions("_db/trackings"))
if err != nil {
return err
}
r.recordsDb, err = badger.Open(badger.DefaultOptions("_db/records"))
if err != nil {
return err
}
r.rawdataDb, err = badger.Open(badger.DefaultOptions("_db/raw"))
return err
}
func (r *badgerStore) isDbClosed() bool {
return r.trackingsDb.IsClosed() || r.recordsDb.IsClosed() || r.rawdataDb.IsClosed() return r.trackingsDb.IsClosed() || r.recordsDb.IsClosed() || r.rawdataDb.IsClosed()
} }
func (r *badgerStore) Save(tr core.Tracking) error { func (r *badgerStore) Save(tr core.Tracking) error {
if ok := r.isDbAvailable(); ok { if ok := r.isDbClosed(); ok {
logrus.Error("unable to write to database. database closed!") logrus.Error("unable to write to database. database closed!")
return badger.ErrDBClosed err := r.openDBs()
if err != nil {
return err
} }
ts, err := tr.TimeCreated.MarshalText() //return badger.ErrDBClosed
}
uid, err := tr.UUID.MarshalText()
if err != nil { if err != nil {
logrus.Error(err, tr) logrus.Error(err, tr)
} }
logrus.Info("save tracking:", tr.TimeCreated) logrus.Infoln("save tracking:", tr.TimeCreated.Format(time.RFC3339Nano))
meta, err := json.Marshal(tr.TrackingMetadata) meta, err := json.Marshal(tr.TrackingMetadata)
if err != nil { if err != nil {
logrus.Error(err, tr) logrus.Error(err, tr)
@ -58,41 +74,50 @@ func (r *badgerStore) Save(tr core.Tracking) error {
} }
err = r.recordsDb.Update(func(txn *badger.Txn) error { err = r.recordsDb.Update(func(txn *badger.Txn) error {
for _, v := range tr.Records { for _, v := range tr.Records {
k := createRecordKey(tr.UUID, v.RecordTimeKey.UnixNano()) k := createRecordKey(tr.UUID, v.RecordTime)
j, err := json.Marshal(v.DataPair) logrus.Trace(v.Data, " len key ->", len(k))
logrus.Debugln("save record k/v:\n", tr.UUID.String(), strconv.FormatInt(v.RecordTimeKey.UnixNano(), 10)) j, err2 := json.Marshal(v.Data)
logrus.Debugln(string(pretty.Pretty(j))) logrus.Traceln("save record k/v:\n", tr.UUID.String(), v.RecordTime.Format(time.RFC3339Nano))
if err != nil { logrus.Traceln(string(pretty.Pretty(j)))
return err if err2 != nil {
return err2
}
err2 = txn.Set(k, j)
if err2 != nil {
logrus.Warn(err2)
} }
txn.Set(k, j)
} }
return nil return nil
}) })
if err != nil { if err != nil {
logrus.Error(err, tr) logrus.Error(err, tr)
return err //return err
}
err = r.rawdataDb.Update(func(txn *badger.Txn) error {
for _, v := range tr.Rawdata {
k := createRecordKey(tr.UUID, v.RecordTimeKey.UnixNano())
j, err := json.Marshal(v)
logrus.Debugln("save raw k/v:\n", tr.UUID.String(), strconv.FormatInt(v.RecordTimeKey.UnixNano(), 10))
logrus.Debugln(string(pretty.Pretty(j)))
if err != nil {
return err
}
txn.Set(k, j)
}
return nil
})
if err != nil {
logrus.Error(err, tr)
return err
} }
//err = r.rawdataDb.Update(func(txn *badger.Txn) error {
// for _, v := range tr.Rawdata {
// k := createRecordKey(tr.UUID, v.ServerTime)
// j, err2 := json.Marshal(v)
// logrus.Traceln("save raw k/v:\n", tr.UUID.String(), strconv.FormatInt(v.ServerTime.UnixNano(), 10))
// logrus.Traceln(string(pretty.Pretty(j)))
// if err2 != nil {
// return err2
// }
// err2 = txn.Set(k, j)
//
// if err2 != nil {
// logrus.Warn(err2)
// }
// }
// return nil
//})
//if err != nil {
// logrus.Error(err, tr)
// //return err
//}
err = r.trackingsDb.Update(func(txn *badger.Txn) error { err = r.trackingsDb.Update(func(txn *badger.Txn) error {
logrus.Debug("save tracking meta k/v:\n", string(ts), string(meta)) logrus.Debugln("save tracking metadata k/v:\n", string(uid), string(meta))
err := txn.Set(ts, meta) err := txn.Set(uid, meta)
return err return err
}) })
if err != nil { if err != nil {
@ -101,12 +126,9 @@ func (r *badgerStore) Save(tr core.Tracking) error {
} }
dr := 0.5 dr := 0.5
err = r.trackingsDb.RunValueLogGC(dr) _ = r.trackingsDb.RunValueLogGC(dr)
logrus.Debug("DB GC:", err) _ = r.recordsDb.RunValueLogGC(dr)
err = r.recordsDb.RunValueLogGC(dr) _ = r.rawdataDb.RunValueLogGC(dr)
logrus.Debug("DB GC:", err)
err = r.rawdataDb.RunValueLogGC(dr)
logrus.Debug("DB GC:", err)
logrus.Info("sucessfully saved tracking") logrus.Info("sucessfully saved tracking")
return nil return nil
} }
@ -120,11 +142,15 @@ func (r *badgerStore) LoadAll() ([]core.TrackingMetadata, error) {
for it.Rewind(); it.Valid(); it.Next() { for it.Rewind(); it.Valid(); it.Next() {
item := it.Item() item := it.Item()
el := core.TrackingMetadata{} el := core.TrackingMetadata{}
item.Value(func(val []byte) error { err2 := item.Value(func(val []byte) error {
logrus.Debugln(string(val)) logrus.Debugln(string(val))
err := json.Unmarshal(val, &el) err3 := json.Unmarshal(val, &el)
return err return err3
}) })
if err2 != nil {
logrus.Warn(err2)
}
result = append(result, el) result = append(result, el)
} }
return nil return nil
@ -135,31 +161,95 @@ func (r *badgerStore) LoadAll() ([]core.TrackingMetadata, error) {
return result, nil return result, nil
} }
func (r *badgerStore) Load(id uuid.UUID) (core.Tracking, error) { func (r *badgerStore) Load(id uuid.UUID) (*core.Tracking, error) {
panic("implement me") logrus.Debugln("try to load from db...", id)
if ok := r.isDbClosed(); ok {
logrus.Error("unable to read from database. database closed!")
err := r.openDBs()
if err != nil {
return nil, err
}
}
t := &core.Tracking{
TrackingMetadata: core.TrackingMetadata{},
//Records: []core.recordPair{},
//Rawdata: nil,
}
err := r.trackingsDb.View(func(txn *badger.Txn) error {
item, err2 := txn.Get([]byte(id.String()))
if err2 != nil {
return err2
}
err2 = item.Value(func(val []byte) error {
err3 := json.Unmarshal(val, &t.TrackingMetadata)
return err3
})
return err2
})
if err != nil {
logrus.Error(err)
}
err = r.recordsDb.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.Prefix = []byte(id.String())
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
_, recTime := unmarshalDataKey(item.Key())
el := core.SensorPair{}
el.RecordTime = recTime
err2 := item.Value(func(val []byte) error {
logrus.Traceln(string(val))
err3 := json.Unmarshal(val, &el.Data)
logrus.Traceln(err3, el)
return err3
})
if err2 != nil {
logrus.Warn(err2)
} }
func createRecordKey(uid uuid.UUID, timestamp int64) []byte { t.Records = append(t.Records, el)
prefix, err := uid.MarshalText() }
if err != nil || timestamp < 0 { return nil
logrus.Error("unable to create key", err) })
if err != nil {
logrus.Error(err)
} }
suffix := make([]byte, 8)
binary.BigEndian.PutUint64(suffix, uint64(timestamp))
// implement retrieval of raw data only if needed
return t, nil
}
func createRecordKey(uid uuid.UUID, timestamp time.Time) []byte {
prefix := []byte(uid.String())
suffix := []byte(timestamp.Format(time.RFC3339Nano))
if timestamp.IsZero() {
err := errors.New("zero value detected")
logrus.Errorln("unable to create key", err)
}
logrus.Traceln("save as:", string(prefix), string(suffix))
//binary.BigEndian.PutUint64(suffix, uint64(timestamp.UnixNano()))
return append(prefix, suffix...) return append(prefix, suffix...)
} }
func unmarshalDataKey(key []byte) (uuid.UUID, int64) { func unmarshalDataKey(key []byte) (uuid.UUID, time.Time) {
if len(key) != 24 { logrus.Trace("key len ->", len(key))
panic("corrupted key") prefix := key[:36]
} suffix := key[36:]
prefix := key[0:15] logrus.Traceln("load as:", string(prefix), string(suffix))
suffix := key[15:24] uid, err := uuid.Parse(string(prefix))
uid, err := uuid.FromBytes(prefix)
if err != nil { if err != nil {
panic("corrupted key") logrus.Errorln("corrupted key", err)
} }
timestamp := int64(binary.BigEndian.Uint64(suffix)) timestamp, err := time.Parse(time.RFC3339Nano, string(suffix))
if err != nil {
logrus.Errorln("corrupted key", err)
}
logrus.Traceln(uid, timestamp)
//timestamp := time.Unix(0, int64(binary.BigEndian.Uint64(suffix)))
return uid, timestamp return uid, timestamp
} }

View File

@ -6,6 +6,7 @@ import (
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
"github.com/gofiber/template/html" "github.com/gofiber/template/html"
"github.com/gofiber/websocket/v2" "github.com/gofiber/websocket/v2"
"github.com/google/uuid"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -32,7 +33,7 @@ func CreateServer(s core.Service, sub core.Subscriber, c *core.Configuration) {
trackings.Put("/", stopRecordingHandler(s, c)) // Stops current recording. Returns trackingId if record was successful trackings.Put("/", stopRecordingHandler(s, c)) // Stops current recording. Returns trackingId if record was successful
trackings.Delete("/", stopAllHandler(s, c)) // Stops websocket connection, pipelines and collectors trackings.Delete("/", stopAllHandler(s, c)) // Stops websocket connection, pipelines and collectors
trackings.Get("/:trackingId", stubhander()) // Gets Tracking Metadata and loads sensorRecords from storage. trackings.Get("/:trackingId", LoadTrackingHandler(s, c)) // Gets Tracking Metadata and loads sensorRecords from storage.
trackings.Delete("/:trackingId", stubhander()) // Deletes Tracking from storage trackings.Delete("/:trackingId", stubhander()) // Deletes Tracking from storage
trackings.Post("/current", stubhander()) // Starts Replay. trackings.Post("/current", stubhander()) // Starts Replay.
@ -47,6 +48,36 @@ func stubhander() fiber.Handler {
return nil return nil
} }
} }
func LoadTrackingHandler(s core.Service, c *core.Configuration) fiber.Handler {
return func(ctx *fiber.Ctx) error {
trackId := ctx.Params("trackingId")
uid, err := uuid.Parse(trackId)
if err != nil {
logrus.Error(err)
ctx.Status(404).JSON(err)
return err
}
tracking, err := s.LoadTracking(uid)
if err != nil {
logrus.Error(err)
ctx.Status(404).JSON(err)
return err
}
prepres := map[string]interface{}{}
prepres["data"] = *tracking
if err != nil {
prepres["error"] = err.Error()
}
err2 := ctx.JSON(prepres)
if err2 != nil {
ctx.Status(500).JSON(err2)
return err2
}
return nil
}
}
func allTrackingsHandler(s core.Service, c *core.Configuration) fiber.Handler { func allTrackingsHandler(s core.Service, c *core.Configuration) fiber.Handler {
return func(ctx *fiber.Ctx) error { return func(ctx *fiber.Ctx) error {
trackings, err := s.AllTrackings() trackings, err := s.AllTrackings()