bugfixes everywhere
This commit is contained in:
parent
26014b0c34
commit
2a0b14e03d
@ -11,6 +11,8 @@ import (
|
||||
func main() {
|
||||
conf := configurationFromFile()
|
||||
|
||||
logrus.Debug(conf)
|
||||
|
||||
repo := storage.NewRepository(conf)
|
||||
disp := core.NewDispatcher()
|
||||
|
||||
|
||||
@ -25,7 +25,7 @@ const (
|
||||
|
||||
var tcpSingleton *tcpCollector
|
||||
|
||||
func NewCollector(typ CollectorType, proc Pusher, config *Configuration) Collector {
|
||||
func NewCollector(typ CollectorType, proc Processor, config *Configuration) Collector {
|
||||
var coll Collector
|
||||
switch typ {
|
||||
case SERIAL:
|
||||
@ -45,7 +45,7 @@ func NewCollector(typ CollectorType, proc Pusher, config *Configuration) Collect
|
||||
|
||||
type serialCollector struct {
|
||||
active bool
|
||||
proc Pusher
|
||||
proc Processor
|
||||
config *Configuration
|
||||
mu sync.RWMutex
|
||||
}
|
||||
@ -121,7 +121,7 @@ func (s *serialCollector) Close() {
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func newSerial(proc Pusher, config *Configuration) *serialCollector {
|
||||
func newSerial(proc Processor, config *Configuration) *serialCollector {
|
||||
return &serialCollector{
|
||||
active: false,
|
||||
proc: proc,
|
||||
@ -131,7 +131,7 @@ func newSerial(proc Pusher, config *Configuration) *serialCollector {
|
||||
|
||||
type tcpCollector struct {
|
||||
active bool
|
||||
processor Pusher
|
||||
processor Processor
|
||||
//config *Configuration
|
||||
}
|
||||
|
||||
@ -143,11 +143,11 @@ func (t *tcpCollector) Close() {
|
||||
t.active = false
|
||||
}
|
||||
|
||||
func (t *tcpCollector) SetProcessor(p Pusher) {
|
||||
func (t *tcpCollector) SetProcessor(p Processor) {
|
||||
t.processor = p
|
||||
}
|
||||
|
||||
func newTcp(proc Pusher, config *Configuration) *tcpCollector {
|
||||
func newTcp(proc Processor, config *Configuration) *tcpCollector {
|
||||
logrus.Println("start tcp collector")
|
||||
|
||||
listener, err := net.Listen("tcp", config.Collectors.TcpCollectorPort)
|
||||
|
||||
@ -1,16 +1,16 @@
|
||||
package core
|
||||
|
||||
type Configuration struct {
|
||||
Collectors struct {
|
||||
TcpCollectorPort string `mapstructure:"porttcp"`
|
||||
SerialCollectorPort string `mapstructure:"portserial"`
|
||||
} `mapstructure:"Collectors"`
|
||||
Webserver struct {
|
||||
Port string `mapstructure:"port"`
|
||||
} `mapstructure:"webserver"`
|
||||
Pipeline struct {
|
||||
PublishIntervalMs int `mapstructure:"publishintervalms"`
|
||||
SyncUpdateIntervalMs int `mapstructure:"syncupdateintervalms"`
|
||||
} `mapstructure:"pipeline"`
|
||||
Debuglevel string `mapstructure:"debuglevel"`
|
||||
Collectors struct {
|
||||
TcpCollectorPort string `mapstructure:"porttcp"`
|
||||
SerialCollectorPort string `mapstructure:"portserial"`
|
||||
} `mapstructure:"Collectors"`
|
||||
Webserver struct {
|
||||
Port string `mapstructure:"port"`
|
||||
} `mapstructure:"webserver"`
|
||||
Pipeline struct {
|
||||
PublishIntervalMs int `mapstructure:"publishintervalms"`
|
||||
SyncUpdateIntervalMs int `mapstructure:"syncupdateintervalms"`
|
||||
} `mapstructure:"pipeline"`
|
||||
Debuglevel string `mapstructure:"debuglevel"`
|
||||
}
|
||||
|
||||
@ -18,8 +18,8 @@ func NewDispatcher() *dispatcher {
|
||||
}
|
||||
|
||||
func (d *dispatcher) Publish(message string) {
|
||||
logrus.Debugf("publish to %v listeners\n", len(d.listeners))
|
||||
logrus.Debug(message)
|
||||
logrus.Tracef("publishing to %v listeners\n", len(d.listeners))
|
||||
logrus.Trace(message)
|
||||
for _, ch := range d.listeners {
|
||||
ch <- message
|
||||
}
|
||||
|
||||
@ -11,17 +11,17 @@ type Publisher interface {
|
||||
Publish(message string)
|
||||
}
|
||||
|
||||
type Pusher interface {
|
||||
type Processor interface {
|
||||
Push(data *sensorData) error
|
||||
}
|
||||
|
||||
type Storer interface {
|
||||
EnqueuePair(tcp sensorData, ser sensorData)
|
||||
EnqueueRaw(data sensorData)
|
||||
AddDataPair(tcp sensorData, ser sensorData)
|
||||
AddRaw(data sensorData)
|
||||
}
|
||||
|
||||
type Repo interface {
|
||||
Save(tracking Tracking) error
|
||||
LoadAll() ([]TrackingMetadata, error)
|
||||
Load(id uuid.UUID) (Tracking, error)
|
||||
Load(id uuid.UUID) (*Tracking, error)
|
||||
}
|
||||
|
||||
146
core/pipeline.go
146
core/pipeline.go
@ -24,7 +24,7 @@ type pipeline struct {
|
||||
sema *semaphore.Weighted
|
||||
}
|
||||
|
||||
// pipe implements Runner & Pusher
|
||||
// pipe implements Runner & Processor
|
||||
func NewPipeline(d Publisher, s Storer, conf *Configuration) *pipeline {
|
||||
return &pipeline{
|
||||
false,
|
||||
@ -51,30 +51,35 @@ func (p *pipeline) isPipeActive() bool {
|
||||
defer p.mu.RUnlock()
|
||||
return p.active
|
||||
}
|
||||
func (p *pipeline) isPipeRecording() bool {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
return p.record
|
||||
}
|
||||
|
||||
func (p *pipeline) Run() {
|
||||
p.sema.Acquire(context.Background(), 2)
|
||||
p.sema.Acquire(context.Background(), 1) // !!! n=2 wenn synchronizer mitläuft
|
||||
p.mu.Lock()
|
||||
p.active = true
|
||||
p.mu.Unlock()
|
||||
logrus.Println("pipe: processing service started")
|
||||
go func() {
|
||||
for p.isPipeActive() {
|
||||
<-p.synchroniz.updateTicker.C
|
||||
err := p.refreshDelay()
|
||||
if err != nil {
|
||||
logrus.Debugln(err)
|
||||
}
|
||||
}
|
||||
p.sema.Release(1)
|
||||
logrus.Println("pipe: updater stopped")
|
||||
}()
|
||||
//go func() {
|
||||
// for p.isPipeActive() {
|
||||
// <-p.synchroniz.updateTicker.C
|
||||
// err := p.refreshDelay()
|
||||
// if err != nil {
|
||||
// logrus.Debugln(err)
|
||||
// }
|
||||
// }
|
||||
// p.sema.Release(1)
|
||||
// logrus.Println("pipe: updater stopped")
|
||||
//}()
|
||||
go func() {
|
||||
for p.isPipeActive() {
|
||||
<-p.publishTicker.C
|
||||
err := p.publish()
|
||||
if err != nil && err.Error() != "no data available" {
|
||||
logrus.Debug(err)
|
||||
logrus.Trace(err)
|
||||
}
|
||||
}
|
||||
p.sema.Release(1)
|
||||
@ -89,9 +94,25 @@ func (p *pipeline) StopRecord() {
|
||||
p.record = false
|
||||
}
|
||||
|
||||
func (p *pipeline) Push(data *sensorData) error {
|
||||
if (data == nil || *data == sensorData{}) {
|
||||
return errors.New("no data")
|
||||
}
|
||||
//logrus.Println("push data to pipe:", string(data.Source))
|
||||
switch data.Source {
|
||||
case SOURCE_TCP:
|
||||
go p.pushTcpDataToBuffer(*data)
|
||||
case SOURCE_SERIAL:
|
||||
go p.pushSerialDataToBuffer(*data)
|
||||
default:
|
||||
panic("pipe: invalid data Source")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *pipeline) publish() error {
|
||||
p.buffer.tcpMutex.Lock()
|
||||
p.buffer.serialMutex.Lock()
|
||||
p.buffer.tcpMutex.Lock()
|
||||
|
||||
if (p.buffer.MeasTcp == sensorData{} && p.buffer.MeasSerial == sensorData{}) {
|
||||
p.buffer.tcpMutex.Unlock()
|
||||
@ -104,21 +125,25 @@ func (p *pipeline) publish() error {
|
||||
p.buffer.serialMutex.Unlock()
|
||||
return errors.New("same data")
|
||||
}
|
||||
logrus.Debug("––––––––––––––––––––––––––––––––––––")
|
||||
logrus.Debugf("SER old: %v", p.buffer.LastMeasSerial)
|
||||
logrus.Debugf("SER new: %v", p.buffer.MeasSerial)
|
||||
logrus.Debugf("TCP old: %v", p.buffer.LastMeasTcp)
|
||||
logrus.Debugf("TCP new: %v", p.buffer.MeasTcp)
|
||||
logrus.Debug("––––––––––––––––––––––––––––––––––––")
|
||||
logrus.Debugf("–––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––")
|
||||
logrus.Tracef("SER old: %-40s %-40s %v %v", p.buffer.LastMeasSerial.Timestamp.Format(time.RFC3339Nano), p.buffer.LastMeasSerial.ServerTime.Format(time.RFC3339Nano), p.buffer.LastMeasSerial.Position, p.buffer.LastMeasSerial.Orientation)
|
||||
logrus.Debugf("SER new: %-40s %-40s %v %v", p.buffer.MeasSerial.Timestamp.Format(time.RFC3339Nano), p.buffer.MeasSerial.ServerTime.Format(time.RFC3339Nano), p.buffer.MeasSerial.Position, p.buffer.MeasSerial.Orientation)
|
||||
logrus.Tracef("TCP old: %-40s %-40s %v %v", p.buffer.LastMeasTcp.Timestamp.Format(time.RFC3339Nano), p.buffer.LastMeasTcp.ServerTime.Format(time.RFC3339Nano), p.buffer.LastMeasTcp.Position, p.buffer.LastMeasTcp.Orientation)
|
||||
logrus.Debugf("TCP new: %-40s %-40s %v %v", p.buffer.MeasTcp.Timestamp.Format(time.RFC3339Nano), p.buffer.MeasTcp.ServerTime.Format(time.RFC3339Nano), p.buffer.MeasTcp.Position, p.buffer.MeasTcp.Orientation)
|
||||
p.buffer.LastMeasTcp = p.buffer.MeasTcp
|
||||
p.buffer.LastMeasSerial = p.buffer.MeasSerial
|
||||
p.storer.EnqueuePair(p.buffer.MeasTcp, p.buffer.MeasSerial)
|
||||
|
||||
data := map[string]sensorData{
|
||||
string(SOURCE_TCP): p.buffer.MeasTcp,
|
||||
string(SOURCE_SERIAL): p.buffer.MeasSerial,
|
||||
if p.isPipeRecording() {
|
||||
p.storer.AddDataPair(p.buffer.MeasTcp, p.buffer.MeasSerial)
|
||||
}
|
||||
|
||||
data := map[string]interface{}{}
|
||||
if p.buffer.MeasTcp.Source == SOURCE_TCP {
|
||||
data[string(SOURCE_TCP)] = p.buffer.MeasTcp
|
||||
}
|
||||
if p.buffer.MeasSerial.Source == SOURCE_SERIAL {
|
||||
data[string(SOURCE_SERIAL)] = p.buffer.MeasSerial
|
||||
}
|
||||
p.buffer.tcpMutex.Unlock()
|
||||
p.buffer.serialMutex.Unlock()
|
||||
|
||||
@ -149,74 +174,65 @@ type synchronizer struct {
|
||||
}
|
||||
|
||||
func (p *pipeline) refreshDelay() error {
|
||||
p.synchroniz.mutex.RLock()
|
||||
if p.synchroniz.tcpSerialDelayMs != 0 {
|
||||
logrus.Println("Delay TCP/SERIAL", p.synchroniz.tcpSerialDelayMs)
|
||||
}
|
||||
p.synchroniz.mutex.RUnlock()
|
||||
logrus.Debugf("refreshing delay...")
|
||||
p.buffer.serialMutex.Lock()
|
||||
p.buffer.tcpMutex.Lock()
|
||||
tcpTime := time.Unix(0, p.buffer.MeasTcp.Timestamp)
|
||||
serTime := time.Unix(0, p.buffer.MeasSerial.Timestamp)
|
||||
tcpTime := p.buffer.MeasTcp.Timestamp
|
||||
serTime := p.buffer.MeasSerial.Timestamp
|
||||
p.buffer.tcpMutex.Unlock()
|
||||
p.buffer.serialMutex.Unlock()
|
||||
if tcpTime.UnixNano() == 0 || serTime.UnixNano() == 0 {
|
||||
return errors.New("no sync possible. check if both Collectors running. otherwise check GPS fix")
|
||||
|
||||
if tcpTime.IsZero() || serTime.IsZero() || tcpTime.UnixNano() == 0 || serTime.UnixNano() == 0 {
|
||||
return errors.New("sync not possible. zero time value detected")
|
||||
}
|
||||
logrus.Debug("TCP", tcpTime.Format(time.RFC3339Nano))
|
||||
logrus.Debug("SER", serTime.Format(time.RFC3339Nano))
|
||||
|
||||
currentDelay := tcpTime.Sub(serTime).Milliseconds()
|
||||
p.synchroniz.mutex.Lock()
|
||||
defer p.synchroniz.mutex.Unlock()
|
||||
logrus.Debugf("old delay-> %vms...", p.synchroniz.tcpSerialDelayMs)
|
||||
if currentDelay > 5000 || currentDelay < -5000 {
|
||||
p.synchroniz.mutex.Lock()
|
||||
p.synchroniz.tcpSerialDelayMs = 0
|
||||
p.synchroniz.mutex.Unlock()
|
||||
return errors.New("skipping synchronisation! time not properly configured or facing network problems.")
|
||||
}
|
||||
logrus.Debug("TCP", tcpTime.String())
|
||||
logrus.Debug("SER", serTime.String())
|
||||
logrus.Debug("Difference", tcpTime.Sub(serTime).Milliseconds(), "ms")
|
||||
delay := tcpTime.Sub(serTime).Milliseconds()
|
||||
p.synchroniz.mutex.Lock()
|
||||
p.synchroniz.tcpSerialDelayMs += delay
|
||||
p.synchroniz.mutex.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *pipeline) Push(data *sensorData) error {
|
||||
if data == nil {
|
||||
return errors.New("nil processing not allowed")
|
||||
}
|
||||
//logrus.Println("push data to pipe:", string(data.source))
|
||||
if p.isPipeActive() {
|
||||
p.storer.EnqueueRaw(*data)
|
||||
}
|
||||
switch data.source {
|
||||
case SOURCE_TCP:
|
||||
go p.pushTcpDataToBuffer(*data)
|
||||
case SOURCE_SERIAL:
|
||||
go p.pushSerialDataToBuffer(*data)
|
||||
default:
|
||||
panic("pipe: invalid data source")
|
||||
}
|
||||
p.synchroniz.tcpSerialDelayMs += currentDelay
|
||||
logrus.Infof("new delay-> %vms", p.synchroniz.tcpSerialDelayMs)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *pipeline) pushTcpDataToBuffer(data sensorData) {
|
||||
data.ServerTime = time.Now().UTC()
|
||||
|
||||
if p.isPipeRecording() {
|
||||
p.storer.AddRaw(data)
|
||||
}
|
||||
|
||||
p.synchroniz.mutex.RLock()
|
||||
if p.synchroniz.tcpSerialDelayMs > 0 {
|
||||
time.Sleep(time.Duration(p.synchroniz.tcpSerialDelayMs) * time.Millisecond)
|
||||
}
|
||||
p.synchroniz.mutex.RLock()
|
||||
p.synchroniz.mutex.RUnlock()
|
||||
p.buffer.tcpMutex.Lock()
|
||||
p.buffer.MeasTcp = p.buffer.MeasTcp.ConsolidateExTime(data)
|
||||
p.buffer.MeasTcp = data
|
||||
//p.buffer.MeasTcp = p.buffer.MeasTcp.ConsolidateExTime(data)
|
||||
p.buffer.tcpMutex.Unlock()
|
||||
}
|
||||
func (p *pipeline) pushSerialDataToBuffer(data sensorData) {
|
||||
data.ServerTime = time.Now().UTC()
|
||||
|
||||
if p.isPipeRecording() {
|
||||
p.storer.AddRaw(data)
|
||||
}
|
||||
|
||||
p.synchroniz.mutex.RLock()
|
||||
if p.synchroniz.tcpSerialDelayMs < 0 {
|
||||
time.Sleep(time.Duration(-p.synchroniz.tcpSerialDelayMs) * time.Millisecond)
|
||||
}
|
||||
p.synchroniz.mutex.RUnlock()
|
||||
p.buffer.serialMutex.Lock()
|
||||
p.buffer.MeasSerial = p.buffer.MeasSerial.ConsolidateEpochsOnly(data)
|
||||
p.buffer.MeasSerial = data
|
||||
//p.buffer.MeasSerial = p.buffer.MeasSerial.ConsolidateEpochsOnly(data)
|
||||
p.buffer.serialMutex.Unlock()
|
||||
}
|
||||
|
||||
|
||||
@ -16,11 +16,13 @@ const (
|
||||
SOURCE_SERIAL sourceId = "SOURCE_SERIAL"
|
||||
)
|
||||
|
||||
var timeex int64
|
||||
|
||||
type sensorData struct {
|
||||
itow uint32
|
||||
source sourceId
|
||||
Source sourceId
|
||||
ServerTime time.Time
|
||||
Timestamp int64
|
||||
Timestamp time.Time
|
||||
Position [3]float64
|
||||
Orientation [3]float64
|
||||
}
|
||||
@ -66,7 +68,7 @@ func (s sensorData) ConsolidateExTime(n sensorData) sensorData {
|
||||
}
|
||||
|
||||
func (s *sensorData) checkSources(n *sensorData) {
|
||||
if (s.source != n.source && *s != sensorData{}) {
|
||||
if (s.Source != n.Source && *s != sensorData{}) {
|
||||
logrus.Println(s)
|
||||
logrus.Println(n)
|
||||
logrus.Fatalln("Do not consolidate sensorData from different Sources")
|
||||
@ -80,21 +82,21 @@ var (
|
||||
|
||||
func ConvertUbxToSensorData(msg interface{}) (*sensorData, error) {
|
||||
sd := &sensorData{
|
||||
ServerTime: time.Now(),
|
||||
source: SOURCE_SERIAL,
|
||||
//ServerTime: time.Now().UTC(),
|
||||
Source: SOURCE_SERIAL,
|
||||
}
|
||||
switch v := msg.(type) {
|
||||
case *ublox.NavPvt:
|
||||
//logrus.Println("NAV-PVT")
|
||||
sd.itow = v.ITOW_ms
|
||||
sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC).UnixNano()
|
||||
sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC)
|
||||
sd.Position[0] = float64(v.Lat_dege7) / 1e+7
|
||||
sd.Position[1] = float64(v.Lon_dege7) / 1e+7
|
||||
sd.Position[2] = float64(v.HMSL_mm) / 1e+3 // mm in m
|
||||
case *ublox.HnrPvt:
|
||||
//logrus.Println("HNR-PVT")
|
||||
sd.itow = v.ITOW_ms
|
||||
sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC).UnixNano()
|
||||
sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC)
|
||||
sd.Position[0] = float64(v.Lat_dege7) / 1e+7
|
||||
sd.Position[1] = float64(v.Lon_dege7) / 1e+7
|
||||
sd.Position[2] = float64(v.HMSL_mm) / 1e+3 // mm in m
|
||||
@ -130,16 +132,24 @@ func convertIPhoneSensorLog(jsonData []byte) (*sensorData, error) {
|
||||
pitch := gjson.Get(string(jsonData), "motionPitch").Float() * 180 / math.Pi
|
||||
roll := gjson.Get(string(jsonData), "motionRoll").Float() * 180 / math.Pi
|
||||
yaw := gjson.Get(string(jsonData), "motionYaw").Float() * 180 / math.Pi
|
||||
var ts time.Time
|
||||
if timestamp != 0 {
|
||||
ts = time.Unix(0, int64(timestamp*float64(time.Second))).UTC()
|
||||
timeex = time.Now().UnixNano() - ts.UnixNano()
|
||||
} else if timeex != 0 {
|
||||
ts = time.Now().Add(time.Duration(timeex)).UTC()
|
||||
}
|
||||
//if ts == time.Date()
|
||||
sd := &sensorData{
|
||||
ServerTime: time.Now(),
|
||||
source: SOURCE_TCP,
|
||||
Timestamp: int64(timestamp * float64(time.Second)),
|
||||
//ServerTime: time.Now().UTC(),
|
||||
Source: SOURCE_TCP,
|
||||
Timestamp: ts,
|
||||
Position: [3]float64{lat, lon, alt},
|
||||
Orientation: [3]float64{pitch, roll, yaw},
|
||||
//Timestamp: time.Unix(0, prep.Timestamp * int64(time.Millisecond)),
|
||||
}
|
||||
//logrus.Println(string(pretty.Pretty(jsonData)))
|
||||
//logrus.Println(sd)
|
||||
if (*sd == sensorData{}) {
|
||||
return nil, errors.New("iphone sensorlog: convert empty")
|
||||
}
|
||||
return sd, nil
|
||||
}
|
||||
|
||||
@ -153,12 +163,14 @@ func convertAndroidHyperImu(jsonData []byte) (*sensorData, error) {
|
||||
yaw := gjson.Get(string(jsonData), "orientation.2").Float()
|
||||
|
||||
sd := &sensorData{
|
||||
ServerTime: time.Now(),
|
||||
source: SOURCE_TCP,
|
||||
Timestamp: timestamp * int64(time.Millisecond),
|
||||
//ServerTime: time.Now().UTC(),
|
||||
Source: SOURCE_TCP,
|
||||
Timestamp: time.Unix(0, timestamp*int64(time.Millisecond)).UTC(),
|
||||
Position: [3]float64{lat, lon, alt},
|
||||
Orientation: [3]float64{pitch, roll, yaw},
|
||||
//Timestamp: time.Unix(0, prep.Timestamp * int64(time.Millisecond)),
|
||||
}
|
||||
if (*sd == sensorData{}) {
|
||||
return nil, errors.New("android hyperimu: convert empty")
|
||||
}
|
||||
return sd, nil
|
||||
}
|
||||
|
||||
@ -23,7 +23,7 @@ type Service interface {
|
||||
StopRecord() (*TrackingMetadata, error)
|
||||
StopAll() (*TrackingMetadata, error)
|
||||
|
||||
LoadTracking(trackingId uuid.UUID)
|
||||
LoadTracking(trackingId uuid.UUID) (*Tracking, error)
|
||||
DeleteTracking(trackingId uuid.UUID)
|
||||
|
||||
StartReplay()
|
||||
@ -50,7 +50,8 @@ func TrackingService(r Repo, d Publisher, c *Configuration) *trackingService {
|
||||
pipe: NewPipeline(d, t, c),
|
||||
collectors: nil,
|
||||
}
|
||||
//ts.pipe.Run()
|
||||
// first initialize of tcp collector to to open tcp port
|
||||
NewCollector(TCP, nil, c)
|
||||
return ts
|
||||
}
|
||||
|
||||
@ -140,15 +141,21 @@ func (t *trackingService) StopAll() (*TrackingMetadata, error) {
|
||||
e.Close()
|
||||
}
|
||||
if t.opMode == RECORDING {
|
||||
logrus.Warn("trackingservice: stop recording gracefully")
|
||||
logrus.Info("trackingservice: gracefully stop recording ")
|
||||
tm, err = t.StopRecord()
|
||||
}
|
||||
t.opMode = STOPPED
|
||||
return tm, err
|
||||
}
|
||||
|
||||
func (t *trackingService) LoadTracking(trackingId uuid.UUID) {
|
||||
panic("implement me")
|
||||
func (t *trackingService) LoadTracking(trackingId uuid.UUID) (*Tracking, error) {
|
||||
logrus.Info("LOAD TRACKING from database")
|
||||
tracking, err := t.repo.Load(trackingId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
t.safelyReplaceTracking(*tracking)
|
||||
return t.current, nil
|
||||
}
|
||||
|
||||
func (t *trackingService) DeleteTracking(trackingId uuid.UUID) {
|
||||
|
||||
@ -9,8 +9,8 @@ import (
|
||||
|
||||
type Tracking struct {
|
||||
TrackingMetadata
|
||||
Records []recordPair
|
||||
Rawdata []rawRecord
|
||||
Records []SensorPair
|
||||
Rawdata []sensorData
|
||||
}
|
||||
|
||||
var mRec sync.RWMutex
|
||||
@ -22,41 +22,38 @@ type TrackingMetadata struct {
|
||||
Collectors []CollectorType
|
||||
}
|
||||
|
||||
// persistence wrapper for sensordata
|
||||
type recordPair struct {
|
||||
RecordTimeKey time.Time // uniqueness ensured through mutex
|
||||
DataPair map[sourceId]sensorData
|
||||
type SensorPair struct {
|
||||
RecordTime time.Time // uniqueness ensured through mutex
|
||||
Data map[sourceId]sensorData
|
||||
}
|
||||
|
||||
type rawRecord struct {
|
||||
RecordTimeKey time.Time // uniqueness ensured through mutex
|
||||
Data sensorData
|
||||
}
|
||||
//func (r *SensorPair) restoreDataPair(data []byte) error {
|
||||
// err := json.Unmarshal(data, &r.Data)
|
||||
// return err
|
||||
//}
|
||||
|
||||
// END persistence wrapper for sensordata
|
||||
|
||||
func (s *Tracking) EnqueuePair(tcp sensorData, ser sensorData) {
|
||||
rp := recordPair{
|
||||
RecordTimeKey: time.Now(),
|
||||
DataPair: map[sourceId]sensorData{
|
||||
tcp.source: tcp,
|
||||
ser.source: ser,
|
||||
},
|
||||
func (s *Tracking) AddDataPair(tcp sensorData, ser sensorData) {
|
||||
rp := SensorPair{
|
||||
RecordTime: time.Now().UTC(),
|
||||
Data: make(map[sourceId]sensorData, 2),
|
||||
}
|
||||
if tcp.Source == SOURCE_TCP {
|
||||
rp.Data[tcp.Source] = tcp
|
||||
}
|
||||
if ser.Source == SOURCE_SERIAL {
|
||||
rp.Data[ser.Source] = ser
|
||||
}
|
||||
|
||||
mRec.Lock()
|
||||
s.Records = append(s.Records, rp)
|
||||
logrus.Debugln("tracking Records: len->", len(s.Records))
|
||||
logrus.Traceln("tracking Records: len->", len(s.Records))
|
||||
mRec.Unlock()
|
||||
}
|
||||
|
||||
func (s *Tracking) EnqueueRaw(data sensorData) {
|
||||
sr := rawRecord{
|
||||
time.Now(),
|
||||
data,
|
||||
}
|
||||
func (s *Tracking) AddRaw(data sensorData) {
|
||||
mRec.Lock()
|
||||
s.Rawdata = append(s.Rawdata, sr)
|
||||
logrus.Debugln("raw data points: len->", len(s.Rawdata))
|
||||
s.Rawdata = append(s.Rawdata, data)
|
||||
logrus.Traceln("raw data points: len->", len(s.Rawdata))
|
||||
mRec.Unlock()
|
||||
}
|
||||
|
||||
@ -65,8 +62,8 @@ func emptyTracking() Tracking {
|
||||
TrackingMetadata: TrackingMetadata{
|
||||
UUID: uuid.New(),
|
||||
},
|
||||
Records: []recordPair{},
|
||||
Rawdata: []rawRecord{},
|
||||
Records: []SensorPair{},
|
||||
Rawdata: []sensorData{},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -32,11 +32,11 @@ window.addEventListener("load", function(evt) {
|
||||
}
|
||||
ws.onmessage = function(evt) {
|
||||
//print2("RESPONSE: " + evt.data);
|
||||
dataSmartphone.push(evt.data)
|
||||
// let dat = JSON.parse(evt.data)["bmi26x gyroscope"]
|
||||
// let dat = JSON.parse(evt.data)["lsm6dsm gyroscope"]
|
||||
//let dat = JSON.parse(evt.data)["lsm6ds3c gyroscope"]
|
||||
let dat = JSON.parse(evt.data)
|
||||
dataSmartphone.push(dat)
|
||||
//console.log(evt.data)
|
||||
console.log("JSON geparsed onmessage", dat)
|
||||
//console.log(dat.SOURCE_TCP.Orientation)
|
||||
|
||||
@ -1,8 +1,8 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"git.timovolkmann.de/gyrogpsc/core"
|
||||
"github.com/dgraph-io/badger/v2"
|
||||
"github.com/google/uuid"
|
||||
@ -10,7 +10,7 @@ import (
|
||||
"github.com/tidwall/pretty"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Must implement Repo
|
||||
@ -27,30 +27,46 @@ func NewRepository(c *core.Configuration) *badgerStore {
|
||||
os.Mkdir(filepath.Join(dir, "_db"), os.ModePerm)
|
||||
}
|
||||
|
||||
tr, err := badger.Open(badger.DefaultOptions("_db/trackings"))
|
||||
dp, err := badger.Open(badger.DefaultOptions("_db/records"))
|
||||
rd, err := badger.Open(badger.DefaultOptions("_db/raw"))
|
||||
|
||||
bs := &badgerStore{}
|
||||
err := bs.openDBs()
|
||||
if err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
return &badgerStore{trackingsDb: tr, recordsDb: dp, rawdataDb: rd}
|
||||
return bs
|
||||
}
|
||||
|
||||
func (r *badgerStore) isDbAvailable() bool {
|
||||
func (r *badgerStore) openDBs() error {
|
||||
var err error
|
||||
r.trackingsDb, err = badger.Open(badger.DefaultOptions("_db/trackings"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.recordsDb, err = badger.Open(badger.DefaultOptions("_db/records"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.rawdataDb, err = badger.Open(badger.DefaultOptions("_db/raw"))
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *badgerStore) isDbClosed() bool {
|
||||
return r.trackingsDb.IsClosed() || r.recordsDb.IsClosed() || r.rawdataDb.IsClosed()
|
||||
}
|
||||
|
||||
func (r *badgerStore) Save(tr core.Tracking) error {
|
||||
if ok := r.isDbAvailable(); ok {
|
||||
if ok := r.isDbClosed(); ok {
|
||||
logrus.Error("unable to write to database. database closed!")
|
||||
return badger.ErrDBClosed
|
||||
err := r.openDBs()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
//return badger.ErrDBClosed
|
||||
}
|
||||
ts, err := tr.TimeCreated.MarshalText()
|
||||
uid, err := tr.UUID.MarshalText()
|
||||
if err != nil {
|
||||
logrus.Error(err, tr)
|
||||
}
|
||||
logrus.Info("save tracking:", tr.TimeCreated)
|
||||
logrus.Infoln("save tracking:", tr.TimeCreated.Format(time.RFC3339Nano))
|
||||
meta, err := json.Marshal(tr.TrackingMetadata)
|
||||
if err != nil {
|
||||
logrus.Error(err, tr)
|
||||
@ -58,41 +74,50 @@ func (r *badgerStore) Save(tr core.Tracking) error {
|
||||
}
|
||||
err = r.recordsDb.Update(func(txn *badger.Txn) error {
|
||||
for _, v := range tr.Records {
|
||||
k := createRecordKey(tr.UUID, v.RecordTimeKey.UnixNano())
|
||||
j, err := json.Marshal(v.DataPair)
|
||||
logrus.Debugln("save record k/v:\n", tr.UUID.String(), strconv.FormatInt(v.RecordTimeKey.UnixNano(), 10))
|
||||
logrus.Debugln(string(pretty.Pretty(j)))
|
||||
if err != nil {
|
||||
return err
|
||||
k := createRecordKey(tr.UUID, v.RecordTime)
|
||||
logrus.Trace(v.Data, " len key ->", len(k))
|
||||
j, err2 := json.Marshal(v.Data)
|
||||
logrus.Traceln("save record k/v:\n", tr.UUID.String(), v.RecordTime.Format(time.RFC3339Nano))
|
||||
logrus.Traceln(string(pretty.Pretty(j)))
|
||||
if err2 != nil {
|
||||
return err2
|
||||
}
|
||||
err2 = txn.Set(k, j)
|
||||
|
||||
if err2 != nil {
|
||||
logrus.Warn(err2)
|
||||
}
|
||||
txn.Set(k, j)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
logrus.Error(err, tr)
|
||||
return err
|
||||
}
|
||||
err = r.rawdataDb.Update(func(txn *badger.Txn) error {
|
||||
for _, v := range tr.Rawdata {
|
||||
k := createRecordKey(tr.UUID, v.RecordTimeKey.UnixNano())
|
||||
j, err := json.Marshal(v)
|
||||
logrus.Debugln("save raw k/v:\n", tr.UUID.String(), strconv.FormatInt(v.RecordTimeKey.UnixNano(), 10))
|
||||
logrus.Debugln(string(pretty.Pretty(j)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
txn.Set(k, j)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
logrus.Error(err, tr)
|
||||
return err
|
||||
//return err
|
||||
}
|
||||
//err = r.rawdataDb.Update(func(txn *badger.Txn) error {
|
||||
// for _, v := range tr.Rawdata {
|
||||
// k := createRecordKey(tr.UUID, v.ServerTime)
|
||||
// j, err2 := json.Marshal(v)
|
||||
// logrus.Traceln("save raw k/v:\n", tr.UUID.String(), strconv.FormatInt(v.ServerTime.UnixNano(), 10))
|
||||
// logrus.Traceln(string(pretty.Pretty(j)))
|
||||
// if err2 != nil {
|
||||
// return err2
|
||||
// }
|
||||
// err2 = txn.Set(k, j)
|
||||
//
|
||||
// if err2 != nil {
|
||||
// logrus.Warn(err2)
|
||||
// }
|
||||
// }
|
||||
// return nil
|
||||
//})
|
||||
//if err != nil {
|
||||
// logrus.Error(err, tr)
|
||||
// //return err
|
||||
//}
|
||||
err = r.trackingsDb.Update(func(txn *badger.Txn) error {
|
||||
logrus.Debug("save tracking meta k/v:\n", string(ts), string(meta))
|
||||
err := txn.Set(ts, meta)
|
||||
logrus.Debugln("save tracking metadata k/v:\n", string(uid), string(meta))
|
||||
err := txn.Set(uid, meta)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
@ -101,12 +126,9 @@ func (r *badgerStore) Save(tr core.Tracking) error {
|
||||
}
|
||||
|
||||
dr := 0.5
|
||||
err = r.trackingsDb.RunValueLogGC(dr)
|
||||
logrus.Debug("DB GC:", err)
|
||||
err = r.recordsDb.RunValueLogGC(dr)
|
||||
logrus.Debug("DB GC:", err)
|
||||
err = r.rawdataDb.RunValueLogGC(dr)
|
||||
logrus.Debug("DB GC:", err)
|
||||
_ = r.trackingsDb.RunValueLogGC(dr)
|
||||
_ = r.recordsDb.RunValueLogGC(dr)
|
||||
_ = r.rawdataDb.RunValueLogGC(dr)
|
||||
logrus.Info("sucessfully saved tracking")
|
||||
return nil
|
||||
}
|
||||
@ -120,11 +142,15 @@ func (r *badgerStore) LoadAll() ([]core.TrackingMetadata, error) {
|
||||
for it.Rewind(); it.Valid(); it.Next() {
|
||||
item := it.Item()
|
||||
el := core.TrackingMetadata{}
|
||||
item.Value(func(val []byte) error {
|
||||
err2 := item.Value(func(val []byte) error {
|
||||
logrus.Debugln(string(val))
|
||||
err := json.Unmarshal(val, &el)
|
||||
return err
|
||||
err3 := json.Unmarshal(val, &el)
|
||||
return err3
|
||||
})
|
||||
if err2 != nil {
|
||||
logrus.Warn(err2)
|
||||
}
|
||||
|
||||
result = append(result, el)
|
||||
}
|
||||
return nil
|
||||
@ -135,31 +161,95 @@ func (r *badgerStore) LoadAll() ([]core.TrackingMetadata, error) {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (r *badgerStore) Load(id uuid.UUID) (core.Tracking, error) {
|
||||
panic("implement me")
|
||||
func (r *badgerStore) Load(id uuid.UUID) (*core.Tracking, error) {
|
||||
logrus.Debugln("try to load from db...", id)
|
||||
if ok := r.isDbClosed(); ok {
|
||||
logrus.Error("unable to read from database. database closed!")
|
||||
err := r.openDBs()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
t := &core.Tracking{
|
||||
TrackingMetadata: core.TrackingMetadata{},
|
||||
//Records: []core.recordPair{},
|
||||
//Rawdata: nil,
|
||||
}
|
||||
err := r.trackingsDb.View(func(txn *badger.Txn) error {
|
||||
item, err2 := txn.Get([]byte(id.String()))
|
||||
if err2 != nil {
|
||||
return err2
|
||||
}
|
||||
err2 = item.Value(func(val []byte) error {
|
||||
err3 := json.Unmarshal(val, &t.TrackingMetadata)
|
||||
return err3
|
||||
})
|
||||
return err2
|
||||
})
|
||||
if err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
err = r.recordsDb.View(func(txn *badger.Txn) error {
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.Prefix = []byte(id.String())
|
||||
it := txn.NewIterator(opts)
|
||||
defer it.Close()
|
||||
|
||||
for it.Rewind(); it.Valid(); it.Next() {
|
||||
item := it.Item()
|
||||
_, recTime := unmarshalDataKey(item.Key())
|
||||
el := core.SensorPair{}
|
||||
el.RecordTime = recTime
|
||||
err2 := item.Value(func(val []byte) error {
|
||||
logrus.Traceln(string(val))
|
||||
err3 := json.Unmarshal(val, &el.Data)
|
||||
logrus.Traceln(err3, el)
|
||||
return err3
|
||||
})
|
||||
if err2 != nil {
|
||||
logrus.Warn(err2)
|
||||
}
|
||||
|
||||
t.Records = append(t.Records, el)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
|
||||
// implement retrieval of raw data only if needed
|
||||
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func createRecordKey(uid uuid.UUID, timestamp int64) []byte {
|
||||
prefix, err := uid.MarshalText()
|
||||
if err != nil || timestamp < 0 {
|
||||
logrus.Error("unable to create key", err)
|
||||
func createRecordKey(uid uuid.UUID, timestamp time.Time) []byte {
|
||||
prefix := []byte(uid.String())
|
||||
suffix := []byte(timestamp.Format(time.RFC3339Nano))
|
||||
if timestamp.IsZero() {
|
||||
err := errors.New("zero value detected")
|
||||
logrus.Errorln("unable to create key", err)
|
||||
}
|
||||
suffix := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(suffix, uint64(timestamp))
|
||||
|
||||
logrus.Traceln("save as:", string(prefix), string(suffix))
|
||||
//binary.BigEndian.PutUint64(suffix, uint64(timestamp.UnixNano()))
|
||||
return append(prefix, suffix...)
|
||||
}
|
||||
|
||||
func unmarshalDataKey(key []byte) (uuid.UUID, int64) {
|
||||
if len(key) != 24 {
|
||||
panic("corrupted key")
|
||||
}
|
||||
prefix := key[0:15]
|
||||
suffix := key[15:24]
|
||||
uid, err := uuid.FromBytes(prefix)
|
||||
func unmarshalDataKey(key []byte) (uuid.UUID, time.Time) {
|
||||
logrus.Trace("key len ->", len(key))
|
||||
prefix := key[:36]
|
||||
suffix := key[36:]
|
||||
logrus.Traceln("load as:", string(prefix), string(suffix))
|
||||
uid, err := uuid.Parse(string(prefix))
|
||||
if err != nil {
|
||||
panic("corrupted key")
|
||||
logrus.Errorln("corrupted key", err)
|
||||
}
|
||||
timestamp := int64(binary.BigEndian.Uint64(suffix))
|
||||
timestamp, err := time.Parse(time.RFC3339Nano, string(suffix))
|
||||
if err != nil {
|
||||
logrus.Errorln("corrupted key", err)
|
||||
}
|
||||
logrus.Traceln(uid, timestamp)
|
||||
|
||||
//timestamp := time.Unix(0, int64(binary.BigEndian.Uint64(suffix)))
|
||||
return uid, timestamp
|
||||
}
|
||||
|
||||
216
ublox/decode.go
216
ublox/decode.go
@ -6,152 +6,152 @@
|
||||
package ublox
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
)
|
||||
|
||||
// A decoder scans an io stream into UBX (0xB5-0x62 separated) or NMEA ("$xxx,,,,*FF\r\n") frames.
|
||||
// If you have an unmixed stream of NMEA-only data you can use nmea.Decode() on bufio.Scanner.Bytes() directly.
|
||||
type decoder struct {
|
||||
s *bufio.Scanner
|
||||
s *bufio.Scanner
|
||||
}
|
||||
|
||||
// NewDecoder creates a new bufio Scanner with a splitfunc that can handle both UBX and NMEA frames.
|
||||
func NewDecoder(r io.Reader) *decoder {
|
||||
d := bufio.NewScanner(r)
|
||||
d.Split(splitFunc)
|
||||
return &decoder{s: d}
|
||||
d := bufio.NewScanner(r)
|
||||
d.Split(splitFunc)
|
||||
return &decoder{s: d}
|
||||
}
|
||||
|
||||
// Assume we're either at the start of an NMEA sentence or at the start of a UBX message
|
||||
// if not, skip to the first $ or UBX SOM.
|
||||
func splitFunc(data []byte, atEOF bool) (advance int, token []byte, err error) {
|
||||
if len(data) == 0 {
|
||||
return 0, nil, nil
|
||||
}
|
||||
if len(data) == 0 {
|
||||
return 0, nil, nil
|
||||
}
|
||||
|
||||
switch data[0] {
|
||||
case '$':
|
||||
return bufio.ScanLines(data, atEOF)
|
||||
switch data[0] {
|
||||
case '$':
|
||||
return bufio.ScanLines(data, atEOF)
|
||||
|
||||
case 0xB5:
|
||||
if len(data) < 8 {
|
||||
if atEOF {
|
||||
return len(data), nil, io.ErrUnexpectedEOF
|
||||
}
|
||||
return 0, nil, nil
|
||||
}
|
||||
case 0xB5:
|
||||
if len(data) < 8 {
|
||||
if atEOF {
|
||||
return len(data), nil, io.ErrUnexpectedEOF
|
||||
}
|
||||
return 0, nil, nil
|
||||
}
|
||||
|
||||
sz := 8 + int(data[4]) + int(data[5])*256
|
||||
if data[1] == 0x62 {
|
||||
if sz <= len(data) {
|
||||
return sz, data[:sz], nil
|
||||
}
|
||||
if sz <= bufio.MaxScanTokenSize {
|
||||
return 0, nil, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
sz := 8 + int(data[4]) + int(data[5])*256
|
||||
if data[1] == 0x62 {
|
||||
if sz <= len(data) {
|
||||
return sz, data[:sz], nil
|
||||
}
|
||||
if sz <= bufio.MaxScanTokenSize {
|
||||
return 0, nil, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// resync to SOM or $
|
||||
data = data[1:]
|
||||
i1 := bytes.IndexByte(data, '$')
|
||||
if i1 < 0 {
|
||||
i1 = len(data)
|
||||
}
|
||||
// resync to SOM or $
|
||||
data = data[1:]
|
||||
i1 := bytes.IndexByte(data, '$')
|
||||
if i1 < 0 {
|
||||
i1 = len(data)
|
||||
}
|
||||
|
||||
i2 := bytes.IndexByte(data, 0xB5)
|
||||
if i2 < 0 {
|
||||
i2 = len(data)
|
||||
}
|
||||
if i1 > i2 {
|
||||
i1 = i2
|
||||
}
|
||||
return 1 + i1, nil, nil
|
||||
i2 := bytes.IndexByte(data, 0xB5)
|
||||
if i2 < 0 {
|
||||
i2 = len(data)
|
||||
}
|
||||
if i1 > i2 {
|
||||
i1 = i2
|
||||
}
|
||||
return 1 + i1, nil, nil
|
||||
}
|
||||
|
||||
// Decode reads on NMEA or UBX frame and calls decodeUbx accordingly to parse the message, while skipping NMEA.
|
||||
func (d *decoder) Decode() (msg interface{}, err error) {
|
||||
if !d.s.Scan() {
|
||||
if err = d.s.Err(); err == nil {
|
||||
err = io.EOF
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
if !d.s.Scan() {
|
||||
if err = d.s.Err(); err == nil {
|
||||
err = io.EOF
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch d.s.Bytes()[0] {
|
||||
case '$':
|
||||
return nil, errors.New("NMEA not implemented")
|
||||
//return nmea.Decode(d.s.Bytes())
|
||||
case 0xB5:
|
||||
return decodeUbx(d.s.Bytes())
|
||||
}
|
||||
panic("impossible frame")
|
||||
switch d.s.Bytes()[0] {
|
||||
case '$':
|
||||
return nil, errors.New("NMEA not implemented")
|
||||
//return nmea.Decode(d.s.Bytes())
|
||||
case 0xB5:
|
||||
return decodeUbx(d.s.Bytes())
|
||||
}
|
||||
panic("impossible frame")
|
||||
}
|
||||
|
||||
var (
|
||||
errInvalidFrame = errors.New("invalid UBX frame")
|
||||
errInvalidChkSum = errors.New("invalid UBX checksum")
|
||||
errInvalidFrame = errors.New("invalid UBX frame")
|
||||
errInvalidChkSum = errors.New("invalid UBX checksum")
|
||||
)
|
||||
|
||||
func decodeUbx(frame []byte) (msg Message, err error) {
|
||||
|
||||
buf := bytes.NewReader(frame)
|
||||
buf := bytes.NewReader(frame)
|
||||
|
||||
var header struct {
|
||||
Preamble uint16
|
||||
ClassID uint16
|
||||
Length uint16
|
||||
}
|
||||
var header struct {
|
||||
Preamble uint16
|
||||
ClassID uint16
|
||||
Length uint16
|
||||
}
|
||||
|
||||
if err := binary.Read(buf, binary.LittleEndian, &header); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := binary.Read(buf, binary.LittleEndian, &header); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if header.Preamble != 0x62B5 {
|
||||
return nil, errInvalidFrame
|
||||
}
|
||||
if header.Preamble != 0x62B5 {
|
||||
return nil, errInvalidFrame
|
||||
}
|
||||
|
||||
if buf.Len()+2 < int(header.Length) {
|
||||
return nil, io.ErrShortBuffer
|
||||
}
|
||||
if buf.Len()+2 < int(header.Length) {
|
||||
return nil, io.ErrShortBuffer
|
||||
}
|
||||
|
||||
var a, b byte
|
||||
for _, v := range frame[2 : header.Length+6] {
|
||||
a += byte(v)
|
||||
b += a
|
||||
}
|
||||
var a, b byte
|
||||
for _, v := range frame[2 : header.Length+6] {
|
||||
a += byte(v)
|
||||
b += a
|
||||
}
|
||||
|
||||
if frame[header.Length+6] != a || frame[header.Length+7] != b {
|
||||
return nil, errInvalidChkSum
|
||||
}
|
||||
if frame[header.Length+6] != a || frame[header.Length+7] != b {
|
||||
return nil, errInvalidChkSum
|
||||
}
|
||||
|
||||
switch header.ClassID {
|
||||
case 0x0105: // ACK-ACK
|
||||
fmt.Println("ACK-ACK not implemented")
|
||||
//msg = &AckAck{}
|
||||
case 0x0005: // ACK-NAK
|
||||
fmt.Println("ACK-NAK not implemented")
|
||||
//msg = &AckNak{}
|
||||
case 0x0701: // NAV-PVT
|
||||
msg = &NavPvt{}
|
||||
case 0x0028: // HNR-PVT
|
||||
msg = &HnrPvt{}
|
||||
case 0x0501: // NAV-ATT
|
||||
msg = &NavAtt{}
|
||||
default:
|
||||
}
|
||||
if msg != nil {
|
||||
err = binary.Read(buf, binary.LittleEndian, msg)
|
||||
} else {
|
||||
msg = &RawMessage{classID: header.ClassID, Data: append([]byte(nil), frame[6:len(frame)-2]...)}
|
||||
}
|
||||
//fmt.Println(msg)
|
||||
switch header.ClassID {
|
||||
case 0x0105: // ACK-ACK
|
||||
fmt.Println("ACK-ACK not implemented")
|
||||
//msg = &AckAck{}
|
||||
case 0x0005: // ACK-NAK
|
||||
fmt.Println("ACK-NAK not implemented")
|
||||
//msg = &AckNak{}
|
||||
case 0x0701: // NAV-PVT
|
||||
msg = &NavPvt{}
|
||||
case 0x0028: // HNR-PVT
|
||||
msg = &HnrPvt{}
|
||||
case 0x0501: // NAV-ATT
|
||||
msg = &NavAtt{}
|
||||
default:
|
||||
}
|
||||
if msg != nil {
|
||||
err = binary.Read(buf, binary.LittleEndian, msg)
|
||||
} else {
|
||||
msg = &RawMessage{classID: header.ClassID, Data: append([]byte(nil), frame[6:len(frame)-2]...)}
|
||||
}
|
||||
//fmt.Println(msg)
|
||||
|
||||
return msg, err
|
||||
return msg, err
|
||||
|
||||
}
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
package ublox
|
||||
|
||||
type Message interface {
|
||||
ClassID() uint16
|
||||
ClassID() uint16
|
||||
}
|
||||
|
||||
//type UbxMessage interface {
|
||||
@ -11,90 +11,90 @@ type Message interface {
|
||||
//}
|
||||
|
||||
type RawMessage struct {
|
||||
classID uint16
|
||||
Data []byte
|
||||
classID uint16
|
||||
Data []byte
|
||||
}
|
||||
|
||||
func (msg *RawMessage) ClassID() uint16 { return msg.classID }
|
||||
|
||||
type NavPvt struct {
|
||||
ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details.
|
||||
Year_y uint16 // - Year (UTC)
|
||||
Month_month byte // - Month, range 1..12 (UTC)
|
||||
Day_d byte // - Day of month, range 1..31 (UTC)
|
||||
Hour_h byte // - Hour of day, range 0..23 (UTC)
|
||||
Min_min byte // - Minute of hour, range 0..59 (UTC)
|
||||
Sec_s byte // - Seconds of minute, range 0..60 (UTC)
|
||||
Valid NavPVTValid // - Validity flags (see graphic below)
|
||||
TAcc_ns uint32 // - Time accuracy estimate (UTC)
|
||||
Nano_ns int32 // - Fraction of second, range -1e9 .. 1e9 (UTC)
|
||||
FixType NavPVTFixType // - GNSSfix Type
|
||||
Flags NavPVTFlags // - Fix status flags (see graphic below)
|
||||
Flags2 NavPVTFlags2 // - Additional flags (see graphic below)
|
||||
NumSV byte // - Number of satellites used in Nav Solution
|
||||
Lon_dege7 int32 // 1e-7 Longitude
|
||||
Lat_dege7 int32 // 1e-7 Latitude
|
||||
Height_mm int32 // - Height above ellipsoid
|
||||
HMSL_mm int32 // - Height above mean sea level
|
||||
HAcc_mm uint32 // - Horizontal accuracy estimate
|
||||
VAcc_mm uint32 // - Vertical accuracy estimate
|
||||
VelN_mm_s int32 // - NED north velocity
|
||||
VelE_mm_s int32 // - NED east velocity
|
||||
VelD_mm_s int32 // - NED down velocity
|
||||
GSpeed_mm_s int32 // - Ground Speed (2-D)
|
||||
HeadMot_dege5 int32 // 1e-5 Heading of motion (2-D)
|
||||
SAcc_mm_s uint32 // - Speed accuracy estimate
|
||||
HeadAcc_dege5 uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle)
|
||||
PDOPe2 uint16 // 0.01 Position DOP
|
||||
Flags3 NavPVTFlags3 // - Additional flags (see graphic below)
|
||||
Reserved1 [5]byte // - Reserved
|
||||
HeadVeh_dege5 int32 // 1e-5 Heading of vehicle (2-D), this is only valid when headVehValid is set, otherwise the output is set to the heading of motion
|
||||
MagDec_dege2 int16 // 1e-2 Magnetic declination. Only supported in ADR 4.10 and later.
|
||||
MagAcc_deg2e uint16 // 1e-2 Magnetic declination accuracy. Only supported in ADR 4.10 and later.
|
||||
ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details.
|
||||
Year_y uint16 // - Year (UTC)
|
||||
Month_month byte // - Month, range 1..12 (UTC)
|
||||
Day_d byte // - Day of month, range 1..31 (UTC)
|
||||
Hour_h byte // - Hour of day, range 0..23 (UTC)
|
||||
Min_min byte // - Minute of hour, range 0..59 (UTC)
|
||||
Sec_s byte // - Seconds of minute, range 0..60 (UTC)
|
||||
Valid NavPVTValid // - Validity flags (see graphic below)
|
||||
TAcc_ns uint32 // - Time accuracy estimate (UTC)
|
||||
Nano_ns int32 // - Fraction of second, range -1e9 .. 1e9 (UTC)
|
||||
FixType NavPVTFixType // - GNSSfix Type
|
||||
Flags NavPVTFlags // - Fix status flags (see graphic below)
|
||||
Flags2 NavPVTFlags2 // - Additional flags (see graphic below)
|
||||
NumSV byte // - Number of satellites used in Nav Solution
|
||||
Lon_dege7 int32 // 1e-7 Longitude
|
||||
Lat_dege7 int32 // 1e-7 Latitude
|
||||
Height_mm int32 // - Height above ellipsoid
|
||||
HMSL_mm int32 // - Height above mean sea level
|
||||
HAcc_mm uint32 // - Horizontal accuracy estimate
|
||||
VAcc_mm uint32 // - Vertical accuracy estimate
|
||||
VelN_mm_s int32 // - NED north velocity
|
||||
VelE_mm_s int32 // - NED east velocity
|
||||
VelD_mm_s int32 // - NED down velocity
|
||||
GSpeed_mm_s int32 // - Ground Speed (2-D)
|
||||
HeadMot_dege5 int32 // 1e-5 Heading of motion (2-D)
|
||||
SAcc_mm_s uint32 // - Speed accuracy estimate
|
||||
HeadAcc_dege5 uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle)
|
||||
PDOPe2 uint16 // 0.01 Position DOP
|
||||
Flags3 NavPVTFlags3 // - Additional flags (see graphic below)
|
||||
Reserved1 [5]byte // - Reserved
|
||||
HeadVeh_dege5 int32 // 1e-5 Heading of vehicle (2-D), this is only valid when headVehValid is set, otherwise the output is set to the heading of motion
|
||||
MagDec_dege2 int16 // 1e-2 Magnetic declination. Only supported in ADR 4.10 and later.
|
||||
MagAcc_deg2e uint16 // 1e-2 Magnetic declination accuracy. Only supported in ADR 4.10 and later.
|
||||
}
|
||||
|
||||
func (NavPvt) ClassID() uint16 { return 0x0701 }
|
||||
|
||||
type HnrPvt struct {
|
||||
ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details.
|
||||
Year_y uint16 // - Year (UTC)
|
||||
Month_month byte // - Month, range 1..12 (UTC)
|
||||
Day_d byte // - Day of month, range 1..31 (UTC)
|
||||
Hour_h byte // - Hour of day, range 0..23 (UTC)
|
||||
Min_min byte // - Minute of hour, range 0..59 (UTC)
|
||||
Sec_s byte // - Seconds of minute, range 0..60 (UTC)
|
||||
Valid byte // - Validity flags (see graphic below)
|
||||
Nano_ns int32 // - Fraction of second, range -1e9 .. 1e9 (UTC)
|
||||
FixType byte // - GNSSfix Type
|
||||
Flags byte // - Fix status flags (see graphic below)
|
||||
Reserved [2]byte
|
||||
Lon_dege7 int32 // 1e-7 Longitude
|
||||
Lat_dege7 int32 // 1e-7 Latitude
|
||||
Height_mm int32 // - Height above ellipsoid
|
||||
HMSL_mm int32 // - Height above mean sea level
|
||||
GSpeed_mm_s int32 // - Ground Speed (2-D)
|
||||
Speed_mm_s int32 // Speed (3-D)
|
||||
HeadMot_dege5 int32 // 1e-5 Heading of motion (2-D)
|
||||
HeadVeh_dege5 int32 // 1e-5 Heading of vehicle (2-D), this is only valid when headVehValid is set, otherwise the output is set to the heading of motion
|
||||
HAcc uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle)
|
||||
VAcc uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle)
|
||||
SAcc uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle)
|
||||
HeadAcc_dege5 uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle)
|
||||
Reserved1 [4]byte // - Reserved
|
||||
ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details.
|
||||
Year_y uint16 // - Year (UTC)
|
||||
Month_month byte // - Month, range 1..12 (UTC)
|
||||
Day_d byte // - Day of month, range 1..31 (UTC)
|
||||
Hour_h byte // - Hour of day, range 0..23 (UTC)
|
||||
Min_min byte // - Minute of hour, range 0..59 (UTC)
|
||||
Sec_s byte // - Seconds of minute, range 0..60 (UTC)
|
||||
Valid byte // - Validity flags (see graphic below)
|
||||
Nano_ns int32 // - Fraction of second, range -1e9 .. 1e9 (UTC)
|
||||
FixType byte // - GNSSfix Type
|
||||
Flags byte // - Fix status flags (see graphic below)
|
||||
Reserved [2]byte
|
||||
Lon_dege7 int32 // 1e-7 Longitude
|
||||
Lat_dege7 int32 // 1e-7 Latitude
|
||||
Height_mm int32 // - Height above ellipsoid
|
||||
HMSL_mm int32 // - Height above mean sea level
|
||||
GSpeed_mm_s int32 // - Ground Speed (2-D)
|
||||
Speed_mm_s int32 // Speed (3-D)
|
||||
HeadMot_dege5 int32 // 1e-5 Heading of motion (2-D)
|
||||
HeadVeh_dege5 int32 // 1e-5 Heading of vehicle (2-D), this is only valid when headVehValid is set, otherwise the output is set to the heading of motion
|
||||
HAcc uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle)
|
||||
VAcc uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle)
|
||||
SAcc uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle)
|
||||
HeadAcc_dege5 uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle)
|
||||
Reserved1 [4]byte // - Reserved
|
||||
}
|
||||
|
||||
func (HnrPvt) ClassID() uint16 { return 0x0028 }
|
||||
|
||||
type NavAtt struct {
|
||||
ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details.
|
||||
Version byte
|
||||
Reserved1 [3]byte
|
||||
Roll_deg int32
|
||||
Pitch_deg int32
|
||||
Heading_deg int32
|
||||
AccRoll_deg uint32
|
||||
AccPitch_deg uint32
|
||||
AccHeading_deg uint32
|
||||
ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details.
|
||||
Version byte
|
||||
Reserved1 [3]byte
|
||||
Roll_deg int32
|
||||
Pitch_deg int32
|
||||
Heading_deg int32
|
||||
AccRoll_deg uint32
|
||||
AccPitch_deg uint32
|
||||
AccHeading_deg uint32
|
||||
}
|
||||
|
||||
func (NavAtt) ClassID() uint16 { return 0x0501 }
|
||||
@ -104,43 +104,43 @@ func (NavAtt) ClassID() uint16 { return 0x0501 }
|
||||
type NavPVTFixType byte
|
||||
|
||||
const (
|
||||
NavPVTNoFix NavPVTFixType = iota
|
||||
NavPVTDeadReckoning
|
||||
NavPVTFix2D
|
||||
NavPVTFix3D
|
||||
NavPVTGNSS
|
||||
NavPVTTimeOnly
|
||||
NavPVTNoFix NavPVTFixType = iota
|
||||
NavPVTDeadReckoning
|
||||
NavPVTFix2D
|
||||
NavPVTFix3D
|
||||
NavPVTGNSS
|
||||
NavPVTTimeOnly
|
||||
)
|
||||
|
||||
type NavPVTValid byte
|
||||
|
||||
const (
|
||||
NavPVTValidDate NavPVTValid = (1 << iota) // valid UTC Date (see Time Validity section for details)
|
||||
NavPVTValidTime // valid UTC time of day (see Time Validity section for details)
|
||||
NavPVTFullyResolved // UTC time of day has been fully resolved (no seconds uncertainty). Cannot be used to check if time is completely solved.
|
||||
NavPVTValidMag // valid magnetic declination
|
||||
NavPVTValidDate NavPVTValid = (1 << iota) // valid UTC Date (see Time Validity section for details)
|
||||
NavPVTValidTime // valid UTC time of day (see Time Validity section for details)
|
||||
NavPVTFullyResolved // UTC time of day has been fully resolved (no seconds uncertainty). Cannot be used to check if time is completely solved.
|
||||
NavPVTValidMag // valid magnetic declination
|
||||
)
|
||||
|
||||
type NavPVTFlags byte
|
||||
|
||||
const (
|
||||
NavPVTGnssFixOK NavPVTFlags = 1 << 0 // valid fix (i.e within DOP & accuracy masks)
|
||||
NavPVTDiffSoln NavPVTFlags = 1 << 1 // differential corrections were applied
|
||||
NavPVTHeadVehValid NavPVTFlags = 1 << 5 // heading of vehicle is valid, only set if the receiver is in sensor fusion mode
|
||||
NavPVTCarrSolnFloat NavPVTFlags = 1 << 6 // carrier phase range solution with floating ambiguities
|
||||
NavPVTCarrSolnFixed NavPVTFlags = 1 << 7 // carrier phase range solution with fixed ambiguities
|
||||
NavPVTGnssFixOK NavPVTFlags = 1 << 0 // valid fix (i.e within DOP & accuracy masks)
|
||||
NavPVTDiffSoln NavPVTFlags = 1 << 1 // differential corrections were applied
|
||||
NavPVTHeadVehValid NavPVTFlags = 1 << 5 // heading of vehicle is valid, only set if the receiver is in sensor fusion mode
|
||||
NavPVTCarrSolnFloat NavPVTFlags = 1 << 6 // carrier phase range solution with floating ambiguities
|
||||
NavPVTCarrSolnFixed NavPVTFlags = 1 << 7 // carrier phase range solution with fixed ambiguities
|
||||
)
|
||||
|
||||
type NavPVTFlags2 byte
|
||||
|
||||
const (
|
||||
NavPVTConfirmedAvai NavPVTFlags2 = 1 << 5 // information about UTC Date and Time of Day validity confirmation is available (see Time Validity section for details)
|
||||
NavPVTConfirmedDate NavPVTFlags2 = 1 << 6 // UTC Date validity could be confirmed (see Time Validity section for details)
|
||||
NavPVTConfirmedTime NavPVTFlags2 = 1 << 7 // UTC Time of Day could be confirmed (see Time Validity section for details)
|
||||
NavPVTConfirmedAvai NavPVTFlags2 = 1 << 5 // information about UTC Date and Time of Day validity confirmation is available (see Time Validity section for details)
|
||||
NavPVTConfirmedDate NavPVTFlags2 = 1 << 6 // UTC Date validity could be confirmed (see Time Validity section for details)
|
||||
NavPVTConfirmedTime NavPVTFlags2 = 1 << 7 // UTC Time of Day could be confirmed (see Time Validity section for details)
|
||||
)
|
||||
|
||||
type NavPVTFlags3 byte
|
||||
|
||||
const (
|
||||
NavPVTInvalidLlh NavPVTFlags3 = (1 << iota) // 1 = Invalid lon, lat, height and hMSL
|
||||
NavPVTInvalidLlh NavPVTFlags3 = (1 << iota) // 1 = Invalid lon, lat, height and hMSL
|
||||
)
|
||||
|
||||
35
web/http.go
35
web/http.go
@ -6,6 +6,7 @@ import (
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/gofiber/template/html"
|
||||
"github.com/gofiber/websocket/v2"
|
||||
"github.com/google/uuid"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@ -32,8 +33,8 @@ func CreateServer(s core.Service, sub core.Subscriber, c *core.Configuration) {
|
||||
trackings.Put("/", stopRecordingHandler(s, c)) // Stops current recording. Returns trackingId if record was successful
|
||||
trackings.Delete("/", stopAllHandler(s, c)) // Stops websocket connection, pipelines and collectors
|
||||
|
||||
trackings.Get("/:trackingId", stubhander()) // Gets Tracking Metadata and loads sensorRecords from storage.
|
||||
trackings.Delete("/:trackingId", stubhander()) // Deletes Tracking from storage
|
||||
trackings.Get("/:trackingId", LoadTrackingHandler(s, c)) // Gets Tracking Metadata and loads sensorRecords from storage.
|
||||
trackings.Delete("/:trackingId", stubhander()) // Deletes Tracking from storage
|
||||
|
||||
trackings.Post("/current", stubhander()) // Starts Replay.
|
||||
trackings.Patch("/current", stubhander()) // Pauses Replay.
|
||||
@ -47,6 +48,36 @@ func stubhander() fiber.Handler {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func LoadTrackingHandler(s core.Service, c *core.Configuration) fiber.Handler {
|
||||
return func(ctx *fiber.Ctx) error {
|
||||
trackId := ctx.Params("trackingId")
|
||||
uid, err := uuid.Parse(trackId)
|
||||
if err != nil {
|
||||
logrus.Error(err)
|
||||
ctx.Status(404).JSON(err)
|
||||
return err
|
||||
}
|
||||
tracking, err := s.LoadTracking(uid)
|
||||
if err != nil {
|
||||
logrus.Error(err)
|
||||
ctx.Status(404).JSON(err)
|
||||
return err
|
||||
}
|
||||
prepres := map[string]interface{}{}
|
||||
prepres["data"] = *tracking
|
||||
if err != nil {
|
||||
prepres["error"] = err.Error()
|
||||
|
||||
}
|
||||
err2 := ctx.JSON(prepres)
|
||||
if err2 != nil {
|
||||
ctx.Status(500).JSON(err2)
|
||||
return err2
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
func allTrackingsHandler(s core.Service, c *core.Configuration) fiber.Handler {
|
||||
return func(ctx *fiber.Ctx) error {
|
||||
trackings, err := s.AllTrackings()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user