From 09791727a46617bd225a3fda81e72e6b9eb40c4c Mon Sep 17 00:00:00 2001 From: Timo Volkmann Date: Sat, 12 Dec 2020 02:29:22 +0100 Subject: [PATCH] [WIP] eliminated data races & formatted files --- .gitignore | 1 + cmd/autostart/autostart.go | 99 +++++------ cmd/server/server.go | 44 ++--- core/collectors.go | 277 +++++++++++++++--------------- core/config.go | 24 +-- core/dispatcher.go | 49 +++--- core/interfaces.go | 42 +++-- core/pipeline.go | 335 ++++++++++++++++++++----------------- core/sensordata.go | 248 +++++++++++++-------------- core/service.go | 165 +++++++++++------- core/trackings.go | 78 ++++----- go.mod | 1 + go.sum | 1 + storage/kvstore.go | 192 +++++++++++++++++++-- ublox/decode.go | 216 ++++++++++++------------ ublox/messages.go | 178 ++++++++++---------- web/http.go | 118 ++++++------- 17 files changed, 1158 insertions(+), 910 deletions(-) diff --git a/.gitignore b/.gitignore index e57b859..b861e7d 100644 --- a/.gitignore +++ b/.gitignore @@ -161,3 +161,4 @@ Temporary Items .env gpsconfig.yml config.yml +_db diff --git a/cmd/autostart/autostart.go b/cmd/autostart/autostart.go index a0bafeb..8be4e32 100644 --- a/cmd/autostart/autostart.go +++ b/cmd/autostart/autostart.go @@ -1,65 +1,66 @@ package main import ( - "git.timovolkmann.de/gyrogpsc/core" - "git.timovolkmann.de/gyrogpsc/storage" - "git.timovolkmann.de/gyrogpsc/web" - "github.com/sirupsen/logrus" - "github.com/spf13/viper" - "time" + "git.timovolkmann.de/gyrogpsc/core" + "git.timovolkmann.de/gyrogpsc/storage" + "git.timovolkmann.de/gyrogpsc/web" + "github.com/sirupsen/logrus" + "github.com/spf13/viper" + "time" ) func main() { - conf := configurationFromFile() + conf := configurationFromFile() - repo := storage.NewRepository(conf) - disp := core.NewDispatcher() + repo := storage.NewRepository(conf) + disp := core.NewDispatcher() - service := core.TrackingService(repo, disp, conf) + service := core.TrackingService(repo, disp, conf) - go func() { - service.NewTracking(core.TCP, core.SERIAL) - time.Sleep(5 * time.Second) - service.StartRecord() - time.Sleep(20 * time.Second) - service.StopRecord() - time.Sleep(5 * time.Second) - service.NewTracking(core.TCP) - service.NewTracking(core.SERIAL) - time.Sleep(5 * time.Second) - service.StartRecord() - time.Sleep(20 * time.Second) - service.StopRecord() - }() + go func() { + service.NewSetup(core.TCP, core.SERIAL) + time.Sleep(5 * time.Second) + service.StartRecord() + time.Sleep(5 * time.Second) + service.StopRecord() + time.Sleep(5 * time.Second) + service.NewSetup(core.TCP) + time.Sleep(5 * time.Second) + service.StartRecord() + time.Sleep(5 * time.Second) + service.StopRecord() + time.Sleep(5 * time.Second) + service.StopAll() + }() - web.CreateServer(service, disp, conf) + web.CreateServer(service, disp, conf) } func configurationFromFile() *core.Configuration { - viper.SetDefault("collectors.porttcp", ":3010") - viper.SetDefault("collectors.portserial", "/dev/tty.usbmodem14201") - viper.SetDefault("webserver.port", ":3011") - viper.SetDefault("pipeline.publishIntervalMs", 50) - viper.SetDefault("pipeline.syncUpdateIntervalMs", 494) - viper.SetDefault("debuglevel", "INFO") + viper.SetDefault("collectors.porttcp", ":3010") + viper.SetDefault("collectors.portserial", "/dev/tty.usbmodem14201") + viper.SetDefault("webserver.port", ":3011") + viper.SetDefault("pipeline.publishIntervalMs", 50) + viper.SetDefault("pipeline.syncUpdateIntervalMs", 494) + viper.SetDefault("debuglevel", "INFO") - viper.SetConfigName("gpsconfig") // name of config file (without extension) - viper.SetConfigType("yaml") - viper.AddConfigPath(".") - viper.AddConfigPath("./../../") - if err := viper.ReadInConfig(); err != nil { - logrus.Warn("couldn't find config file. using standard configuration") - } + viper.SetConfigName("gpsconfig") // name of config file (without extension) + viper.SetConfigType("yaml") + viper.AddConfigPath(".") + viper.AddConfigPath("./../../") + if err := viper.ReadInConfig(); err != nil { + logrus.Warn("couldn't find config file. using standard configuration") + } - c := core.Configuration{} - if err := viper.Unmarshal(&c); err != nil { - logrus.Debug("couldn't load config...") - logrus.Error(err) - } - lvl, err := logrus.ParseLevel(c.Debuglevel) - if err != nil { - logrus.Error(err) - } - logrus.SetLevel(lvl) - return &c + c := core.Configuration{} + if err := viper.Unmarshal(&c); err != nil { + logrus.Debug("couldn't load config...") + logrus.Error(err) + } + lvl, err := logrus.ParseLevel(c.Debuglevel) + if err != nil { + logrus.Error(err) + } + logrus.SetLevel(lvl) + return &c } diff --git a/cmd/server/server.go b/cmd/server/server.go index 33d557d..b7ebf58 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -1,37 +1,37 @@ package main import ( - "git.timovolkmann.de/gyrogpsc/core" - "git.timovolkmann.de/gyrogpsc/storage" - "git.timovolkmann.de/gyrogpsc/web" - "github.com/sirupsen/logrus" - "github.com/spf13/viper" + "git.timovolkmann.de/gyrogpsc/core" + "git.timovolkmann.de/gyrogpsc/storage" + "git.timovolkmann.de/gyrogpsc/web" + "github.com/sirupsen/logrus" + "github.com/spf13/viper" ) func main() { - conf := &core.Configuration{} - configurationFromFile(conf) + conf := &core.Configuration{} + configurationFromFile(conf) - repo := storage.NewRepository(conf) - disp := core.NewDispatcher() + repo := storage.NewRepository(conf) + disp := core.NewDispatcher() - service := core.TrackingService(repo, disp, conf) + service := core.TrackingService(repo, disp, conf) - web.CreateServer(service, disp, conf) + web.CreateServer(service, disp, conf) } func configurationFromFile(c *core.Configuration) { - viper.SetDefault("TcpCollectorPort", ":3010") - viper.SetDefault("SerialCollectorPort", "/dev/tty.usbmodem14201") - viper.SetDefault("HttpPort", "layouts") - viper.SetDefault("publishIntervalMs", 50) - viper.SetDefault("syncUpdateIntervalMs", 494) + viper.SetDefault("TcpCollectorPort", ":3010") + viper.SetDefault("SerialCollectorPort", "/dev/tty.usbmodem14201") + viper.SetDefault("HttpPort", "layouts") + viper.SetDefault("publishIntervalMs", 50) + viper.SetDefault("syncUpdateIntervalMs", 494) - viper.SetConfigName("gpsconfig") // name of config file (without extension) - viper.SetConfigType("yaml") - viper.AddConfigPath(".") - viper.AddConfigPath("./../../") + viper.SetConfigName("gpsconfig") // name of config file (without extension) + viper.SetConfigType("yaml") + viper.AddConfigPath(".") + viper.AddConfigPath("./../../") - viper.Unmarshal(c) - logrus.Println(c) + viper.Unmarshal(c) + logrus.Println(c) } diff --git a/core/collectors.go b/core/collectors.go index 27339ea..cddf112 100644 --- a/core/collectors.go +++ b/core/collectors.go @@ -1,184 +1,199 @@ package core import ( - "fmt" - "git.timovolkmann.de/gyrogpsc/ublox" - "github.com/sirupsen/logrus" - "go.bug.st/serial" - "net" - "os" + "fmt" + "git.timovolkmann.de/gyrogpsc/ublox" + "github.com/sirupsen/logrus" + "go.bug.st/serial" + "net" + "os" + "sync" ) type Collector interface { - Collect() - Stop() + Collect() + Close() } type CollectorType uint8 const ( - SERIAL CollectorType = iota - TCP + SERIAL CollectorType = iota + TCP ) var tcpSingleton *tcpCollector func NewCollector(typ CollectorType, proc Pusher, config *Configuration) Collector { - var coll Collector - switch typ { - case SERIAL: - coll = newSerial(proc, config) - case TCP: - if tcpSingleton == nil { - tcpSingleton = newTcp(proc, config) - } else { - tcpSingleton.SetProcessor(proc) - } - coll = tcpSingleton - default: - panic("selected collector type not implemented") - } - return coll + var coll Collector + switch typ { + case SERIAL: + coll = newSerial(proc, config) + case TCP: + if tcpSingleton == nil { + tcpSingleton = newTcp(proc, config) + } else { + tcpSingleton.SetProcessor(proc) + } + coll = tcpSingleton + default: + panic("selected collector type not implemented") + } + return coll } type serialCollector struct { - active bool - proc Pusher - config *Configuration + active bool + proc Pusher + config *Configuration + mu sync.RWMutex } +func (s *serialCollector) isSerialCollActive() bool { + s.mu.RLock() + defer s.mu.RUnlock() + return s.active +} + + func (s *serialCollector) Collect() { - s.active = true - go func() { - logrus.Println("start serial collector") - mode := &serial.Mode{ - BaudRate: 115200, - } - port, err := serial.Open(s.config.Collectors.SerialCollectorPort, mode) - if err != nil { - logrus.Fatalln(err.Error()) - } - defer port.Close() + s.mu.Lock() + s.active = true + s.mu.Unlock() + go func() { + logrus.Println("start serial collector") + mode := &serial.Mode{ + BaudRate: 115200, + } + port, err := serial.Open(s.config.Collectors.SerialCollectorPort, mode) + if err != nil { + logrus.Fatalln("can't open serial port:", err.Error()) + } + defer port.Close() - decoder := ublox.NewDecoder(port) + decoder := ublox.NewDecoder(port) - for s.active { - meas, err := decoder.Decode() - if err != nil { - if err.Error() == "NMEA not implemented" { - continue - } - logrus.Println("serial read err:", err) - break - } - sd, err := ConvertUbxToSensorData(meas) - if err != nil { - logrus.Println("convert err:", err, meas, sd) - continue - } - // skip irrelevant messages - if sd == nil { - continue - } + for s.isSerialCollActive() { + meas, err := decoder.Decode() + if err != nil { + if err.Error() == "NMEA not implemented" { + continue + } + logrus.Println("serial read err:", err) + break + } + sd, err := ConvertUbxToSensorData(meas) + if err != nil { + logrus.Println("convert err:", err, meas, sd) + continue + } + // skip irrelevant messages + if sd == nil { + continue + } - err = s.proc.Push(sd) - if err != nil { - logrus.Println("process err:", err, *sd) - continue - } - } - logrus.Println("serial collector stopped") - }() + err = s.proc.Push(sd) + if err != nil { + logrus.Println("process err:", err, *sd) + continue + } + } + logrus.Println("serial collector stopped") + }() } -func (s *serialCollector) Stop() { - s.active = false + + +func (s *serialCollector) Close() { + s.mu.Lock() + s.active = false + s.mu.Unlock() } func newSerial(proc Pusher, config *Configuration) *serialCollector { - return &serialCollector{ - active: false, - proc: proc, - config: config, - } + return &serialCollector{ + active: false, + proc: proc, + config: config, + } } type tcpCollector struct { - active bool - processor Pusher - //config *Configuration + active bool + processor Pusher + //config *Configuration } func (t *tcpCollector) Collect() { - t.active = true + t.active = true } -func (t *tcpCollector) Stop() { - t.active = false +func (t *tcpCollector) Close() { + t.active = false } func (t *tcpCollector) SetProcessor(p Pusher) { - t.processor = p + t.processor = p } func newTcp(proc Pusher, config *Configuration) *tcpCollector { - logrus.Println("start tcp collector") + logrus.Println("start tcp collector") - listener, err := net.Listen("tcp", config.Collectors.TcpCollectorPort) - if err != nil { - fmt.Println("Error listening:", err.Error()) - //os.Exit(1) - } - coll := &tcpCollector{ - active: false, - processor: proc, - } - go func() { - for { - // Listen for an incoming connection. - conn, err := listener.Accept() - if err != nil { - fmt.Println("Error accepting: ", err.Error()) - os.Exit(1) - } - logrus.Println("...new incoming tcp connection...") + listener, err := net.Listen("tcp", config.Collectors.TcpCollectorPort) + if err != nil { + fmt.Println("Error listening:", err) + //os.Exit(1) + } + coll := &tcpCollector{ + active: false, + processor: proc, + } + go func() { + for { + // Listen for an incoming connection. + conn, err := listener.Accept() + if err != nil { + fmt.Println("Error accepting: ", err.Error()) + os.Exit(1) + } + logrus.Println("...new incoming tcp connection...") - // Handle connections in a new goroutine. - go coll.jsonHandler(conn) - } - }() - return coll + // Handle connections in a new goroutine. + go coll.jsonHandler(conn) + } + }() + return coll } // handles incoming tcp connections with json payload. func (c *tcpCollector) jsonHandler(conn net.Conn) { - defer conn.Close() + defer conn.Close() - // TRY reader := bufio.NewReader(conn) OR NewScanner(conn) - buf := make([]byte, 2048) - for { - // Read the incoming connection into the buffer. - n, err := conn.Read(buf) - if err != nil { - fmt.Println("TCP error - reading from connection:", n, err.Error()) - break - } - //json := pretty.Pretty(buf[:n]) - //fmt.Println(string(json)) - //fmt.Println(string(buf[:n])) - sd, err := ConvertSensorDataPhone(buf[:n]) - if err != nil { - logrus.Println(err) - continue - } - if !c.active { - //time.Sleep(50 * time.Millisecond) - continue - } - err = c.processor.Push(sd) - if err != nil { - logrus.Fatalln(err) - } - } + // TRY reader := bufio.NewReader(conn) OR NewScanner(conn) + buf := make([]byte, 2048) + for { + // Read the incoming connection into the buffer. + n, err := conn.Read(buf) + if err != nil { + fmt.Println("TCP error - reading from connection:", n, err.Error()) + break + } + //json := pretty.Pretty(buf[:n]) + //fmt.Println(string(json)) + //fmt.Println(string(buf[:n])) + sd, err := ConvertSensorDataPhone(buf[:n]) + if err != nil { + logrus.Println(err) + continue + } + if !c.active { + //time.Sleep(50 * time.Millisecond) + continue + } + err = c.processor.Push(sd) + if err != nil { + logrus.Fatalln(err) + } + } } diff --git a/core/config.go b/core/config.go index 7d6255d..adabf79 100644 --- a/core/config.go +++ b/core/config.go @@ -1,16 +1,16 @@ package core type Configuration struct { - Collectors struct { - TcpCollectorPort string `mapstructure:"porttcp"` - SerialCollectorPort string `mapstructure:"portserial"` - } `mapstructure:"collectors"` - Webserver struct { - Port string `mapstructure:"port"` - } `mapstructure:"webserver"` - Pipeline struct { - PublishIntervalMs int `mapstructure:"publishintervalms"` - SyncUpdateIntervalMs int `mapstructure:"syncupdateintervalms"` - } `mapstructure:"pipeline"` - Debuglevel string `mapstructure:"debuglevel"` + Collectors struct { + TcpCollectorPort string `mapstructure:"porttcp"` + SerialCollectorPort string `mapstructure:"portserial"` + } `mapstructure:"Collectors"` + Webserver struct { + Port string `mapstructure:"port"` + } `mapstructure:"webserver"` + Pipeline struct { + PublishIntervalMs int `mapstructure:"publishintervalms"` + SyncUpdateIntervalMs int `mapstructure:"syncupdateintervalms"` + } `mapstructure:"pipeline"` + Debuglevel string `mapstructure:"debuglevel"` } diff --git a/core/dispatcher.go b/core/dispatcher.go index 94d2197..72e6429 100644 --- a/core/dispatcher.go +++ b/core/dispatcher.go @@ -1,43 +1,44 @@ package core import ( - "errors" - "github.com/sirupsen/logrus" + "errors" + "github.com/sirupsen/logrus" ) type dispatcher struct { - listeners map[int16]chan string - counter int16 + listeners map[int16]chan string + counter int16 } func NewDispatcher() *dispatcher { - return &dispatcher{ - listeners: make(map[int16]chan string), - counter: 0, - } + return &dispatcher{ + listeners: make(map[int16]chan string), + counter: 0, + } } func (d *dispatcher) Publish(message string) { - logrus.Debugf("publish to %v listeners:\n%v\n", len(d.listeners), message) - for _, ch := range d.listeners { - ch <- message - } + logrus.Debugf("publish to %v listeners:\n%v\n", len(d.listeners)) + logrus.Debug(message) + for _, ch := range d.listeners { + ch <- message + } } func (d *dispatcher) Subscribe() (id int16, receiver <-chan string) { - key := d.counter - d.counter++ - rec := make(chan string) - d.listeners[key] = rec - return key, rec + key := d.counter + d.counter++ + rec := make(chan string) + d.listeners[key] = rec + return key, rec } func (d *dispatcher) Unsubscribe(id int16) error { - receiver, ok := d.listeners[id] - if !ok { - return errors.New("no subscription with id") - } - delete(d.listeners, id) - close(receiver) - return nil + receiver, ok := d.listeners[id] + if !ok { + return errors.New("no subscription with id") + } + delete(d.listeners, id) + close(receiver) + return nil } diff --git a/core/interfaces.go b/core/interfaces.go index d2ce9af..ce10b0d 100644 --- a/core/interfaces.go +++ b/core/interfaces.go @@ -3,44 +3,40 @@ package core import "github.com/google/uuid" type Subscriber interface { - Subscribe() (int16, <-chan string) - Unsubscribe(id int16) error + Subscribe() (int16, <-chan string) + Unsubscribe(id int16) error } type Publisher interface { - Publish(message string) + Publish(message string) } type Pusher interface { - Push(data *sensorData) error + Push(data *sensorData) error } - type Storer interface { - EnqueuePair(tcp sensorData, ser sensorData) - EnqueueRaw(data sensorData) + EnqueuePair(tcp sensorData, ser sensorData) + EnqueueRaw(data sensorData) } - - type Repo interface { - Save(tracking Tracking) error - LoadAll() ([]TrackingMetadata, error) - Load(id uuid.UUID) (Tracking, error) + Save(tracking Tracking) error + LoadAll() ([]TrackingMetadata, error) + Load(id uuid.UUID) (Tracking, error) } - type Service interface { - AllTrackings() - NewTracking(cols ...CollectorType) - StartRecord() - StopRecord() - Reset() + AllTrackings() + NewSetup(cols ...CollectorType) + StartRecord() + StopRecord() + StopAll() - LoadTracking(trackingId uuid.UUID) - DeleteTracking(trackingId uuid.UUID) + LoadTracking(trackingId uuid.UUID) + DeleteTracking(trackingId uuid.UUID) - StartReplay() - PauseReplay() - StopReplay() + StartReplay() + PauseReplay() + StopReplay() } diff --git a/core/pipeline.go b/core/pipeline.go index e77bbe3..00c7f4e 100644 --- a/core/pipeline.go +++ b/core/pipeline.go @@ -1,204 +1,225 @@ package core import ( - "encoding/json" - "errors" - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "github.com/sirupsen/logrus" - "sync" - "time" + "context" + "encoding/json" + "errors" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/sirupsen/logrus" + "golang.org/x/sync/semaphore" + "sync" + "time" ) type pipeline struct { - active bool - record bool - synchroniz synchronizer - buffer pipeBuffer - publisher Publisher - storer Storer - publishTicker *time.Ticker + active bool + record bool + synchroniz synchronizer + buffer pipeBuffer + publisher Publisher + storer Storer + publishTicker *time.Ticker + mu sync.RWMutex + sema *semaphore.Weighted } // pipe implements Runner & Pusher func NewPipeline(d Publisher, s Storer, conf *Configuration) *pipeline { - return &pipeline{ - false, - false, - synchronizer{ - //bufferSize: 100, - mutex: &sync.RWMutex{}, - updateTicker: time.NewTicker(time.Duration(conf.Pipeline.SyncUpdateIntervalMs) * time.Millisecond), - }, - pipeBuffer{ - tcpMutex: &sync.Mutex{}, - serialMutex: &sync.Mutex{}, - }, - d, - s, - time.NewTicker(time.Duration(conf.Pipeline.PublishIntervalMs) * time.Millisecond), - } + return &pipeline{ + false, + false, + synchronizer{ + //bufferSize: 100, + mutex: &sync.RWMutex{}, + updateTicker: time.NewTicker(time.Duration(conf.Pipeline.SyncUpdateIntervalMs) * time.Millisecond), + }, + pipeBuffer{ + tcpMutex: &sync.Mutex{}, + serialMutex: &sync.Mutex{}, + }, + d, + s, + time.NewTicker(time.Duration(conf.Pipeline.PublishIntervalMs) * time.Millisecond), + sync.RWMutex{}, + semaphore.NewWeighted(2), + } +} + +func (p *pipeline) isPipeActive() bool { + p.mu.RLock() + defer p.mu.RUnlock() + return p.active } func (p *pipeline) Run() { - p.active = true - logrus.Println("pipe: processing service started") - go func() { - for p.active { - <-p.synchroniz.updateTicker.C - err := p.refreshDelay() - if err != nil { - logrus.Debugln(err) - } - } - logrus.Println("pipe: updater stopped") - }() - go func() { - for p.active { - <-p.publishTicker.C - err := p.publish() - if err != nil && err.Error() != "no data available" { - logrus.Debug(err) - } - } - logrus.Println("pipe: publisher stopped") - }() + p.sema.Acquire(context.Background(), 2) + p.mu.Lock() + p.active = true + p.mu.Unlock() + logrus.Println("pipe: processing service started") + go func() { + for p.isPipeActive() { + <-p.synchroniz.updateTicker.C + err := p.refreshDelay() + if err != nil { + logrus.Debugln(err) + } + } + p.sema.Release(1) + logrus.Println("pipe: updater stopped") + }() + go func() { + for p.isPipeActive() { + <-p.publishTicker.C + err := p.publish() + if err != nil && err.Error() != "no data available" { + logrus.Debug(err) + } + } + p.sema.Release(1) + logrus.Println("pipe: publisher stopped") + }() } func (p *pipeline) Record() { - p.record = true + p.record = true } -func (p *pipeline) Stop() { - p.record = false +func (p *pipeline) StopRecord() { + p.record = false } func (p *pipeline) publish() error { - p.buffer.tcpMutex.Lock() - p.buffer.serialMutex.Lock() + p.buffer.tcpMutex.Lock() + p.buffer.serialMutex.Lock() - if (p.buffer.MeasTcp == sensorData{} && p.buffer.MeasSerial == sensorData{}) { - p.buffer.tcpMutex.Unlock() - p.buffer.serialMutex.Unlock() - return errors.New("no data available") - } - if cmp.Equal(p.buffer.MeasTcp, p.buffer.LastMeasTcp, cmpopts.IgnoreUnexported(sensorData{})) && - cmp.Equal(p.buffer.MeasSerial, p.buffer.LastMeasSerial, cmpopts.IgnoreUnexported(sensorData{})) { - p.buffer.tcpMutex.Unlock() - p.buffer.serialMutex.Unlock() - return errors.New("same data") - } - logrus.Debug("––––––––––––––––––––––––––––––––––––") - logrus.Debugf("MEAS old: %v", p.buffer.LastMeasTcp) - logrus.Debugf("MEAS new: %v", p.buffer.MeasTcp) - logrus.Debug("––––––––––––––––––––––––––––––––––––") - p.buffer.LastMeasTcp = p.buffer.MeasTcp - p.buffer.LastMeasSerial = p.buffer.MeasSerial - p.storer.EnqueuePair(p.buffer.MeasTcp, p.buffer.MeasSerial) + if (p.buffer.MeasTcp == sensorData{} && p.buffer.MeasSerial == sensorData{}) { + p.buffer.tcpMutex.Unlock() + p.buffer.serialMutex.Unlock() + return errors.New("no data available") + } + if cmp.Equal(p.buffer.MeasTcp, p.buffer.LastMeasTcp, cmpopts.IgnoreUnexported(sensorData{})) && + cmp.Equal(p.buffer.MeasSerial, p.buffer.LastMeasSerial, cmpopts.IgnoreUnexported(sensorData{})) { + p.buffer.tcpMutex.Unlock() + p.buffer.serialMutex.Unlock() + return errors.New("same data") + } + logrus.Debug("––––––––––––––––––––––––––––––––––––") + logrus.Debugf("SER old: %v", p.buffer.LastMeasSerial) + logrus.Debugf("SER new: %v", p.buffer.MeasSerial) + logrus.Debugf("TCP old: %v", p.buffer.LastMeasTcp) + logrus.Debugf("TCP new: %v", p.buffer.MeasTcp) + logrus.Debug("––––––––––––––––––––––––––––––––––––") + p.buffer.LastMeasTcp = p.buffer.MeasTcp + p.buffer.LastMeasSerial = p.buffer.MeasSerial + p.storer.EnqueuePair(p.buffer.MeasTcp, p.buffer.MeasSerial) - data := map[string]sensorData{ - string(SOURCE_TCP): p.buffer.MeasTcp, - string(SOURCE_SERIAL): p.buffer.MeasSerial, - } + data := map[string]sensorData{ + string(SOURCE_TCP): p.buffer.MeasTcp, + string(SOURCE_SERIAL): p.buffer.MeasSerial, + } - p.buffer.tcpMutex.Unlock() - p.buffer.serialMutex.Unlock() + p.buffer.tcpMutex.Unlock() + p.buffer.serialMutex.Unlock() - jdata, err := json.Marshal(data) - //logrus.Println(string(pretty.Pretty(jdata))) - if err != nil { - return err - } - p.publisher.Publish(string(jdata)) - return nil + jdata, err := json.Marshal(data) + //logrus.Println(string(pretty.Pretty(jdata))) + if err != nil { + return err + } + p.publisher.Publish(string(jdata)) + return nil } type pipeBuffer struct { - MeasTcp sensorData - MeasSerial sensorData - LastMeasTcp sensorData - LastMeasSerial sensorData - tcpMutex *sync.Mutex - serialMutex *sync.Mutex + MeasTcp sensorData + MeasSerial sensorData + LastMeasTcp sensorData + LastMeasSerial sensorData + tcpMutex *sync.Mutex + serialMutex *sync.Mutex } type UnixNanoTime int64 type synchronizer struct { - tcpSerialDelayMs int64 - mutex *sync.RWMutex - updateTicker *time.Ticker + tcpSerialDelayMs int64 + mutex *sync.RWMutex + updateTicker *time.Ticker } func (p *pipeline) refreshDelay() error { - p.synchroniz.mutex.RLock() - if p.synchroniz.tcpSerialDelayMs != 0 { - logrus.Println("Delay TCP/SERIAL", p.synchroniz.tcpSerialDelayMs) - } - p.synchroniz.mutex.RUnlock() - p.buffer.serialMutex.Lock() - p.buffer.tcpMutex.Lock() - tcpTime := time.Unix(0, p.buffer.MeasTcp.Timestamp) - serTime := time.Unix(0, p.buffer.MeasSerial.Timestamp) - p.buffer.tcpMutex.Unlock() - p.buffer.serialMutex.Unlock() - if tcpTime.UnixNano() == 0 || serTime.UnixNano() == 0 { - return errors.New("no sync possible. check if both collectors running. otherwise check GPS fix") - } - currentDelay := tcpTime.Sub(serTime).Milliseconds() - if currentDelay > 5000 || currentDelay < -5000 { - p.synchroniz.mutex.Lock() - p.synchroniz.tcpSerialDelayMs = 0 - p.synchroniz.mutex.Unlock() - return errors.New("skipping synchronisation! time not properly configured or facing network problems.") - } - logrus.Debug("TCP", tcpTime.String()) - logrus.Debug("SER", serTime.String()) - logrus.Debug("Difference", tcpTime.Sub(serTime).Milliseconds(), "ms") - delay := tcpTime.Sub(serTime).Milliseconds() - p.synchroniz.mutex.Lock() - p.synchroniz.tcpSerialDelayMs += delay - p.synchroniz.mutex.Unlock() - return nil + p.synchroniz.mutex.RLock() + if p.synchroniz.tcpSerialDelayMs != 0 { + logrus.Println("Delay TCP/SERIAL", p.synchroniz.tcpSerialDelayMs) + } + p.synchroniz.mutex.RUnlock() + p.buffer.serialMutex.Lock() + p.buffer.tcpMutex.Lock() + tcpTime := time.Unix(0, p.buffer.MeasTcp.Timestamp) + serTime := time.Unix(0, p.buffer.MeasSerial.Timestamp) + p.buffer.tcpMutex.Unlock() + p.buffer.serialMutex.Unlock() + if tcpTime.UnixNano() == 0 || serTime.UnixNano() == 0 { + return errors.New("no sync possible. check if both Collectors running. otherwise check GPS fix") + } + currentDelay := tcpTime.Sub(serTime).Milliseconds() + if currentDelay > 5000 || currentDelay < -5000 { + p.synchroniz.mutex.Lock() + p.synchroniz.tcpSerialDelayMs = 0 + p.synchroniz.mutex.Unlock() + return errors.New("skipping synchronisation! time not properly configured or facing network problems.") + } + logrus.Debug("TCP", tcpTime.String()) + logrus.Debug("SER", serTime.String()) + logrus.Debug("Difference", tcpTime.Sub(serTime).Milliseconds(), "ms") + delay := tcpTime.Sub(serTime).Milliseconds() + p.synchroniz.mutex.Lock() + p.synchroniz.tcpSerialDelayMs += delay + p.synchroniz.mutex.Unlock() + return nil } func (p *pipeline) Push(data *sensorData) error { - if data == nil { - return errors.New("nil processing not allowed") - } - //logrus.Println("push data to pipe:", string(data.source)) - p.storer.EnqueueRaw(*data) - switch data.source { - case SOURCE_TCP: - go p.pushTcpDataToBuffer(*data) - case SOURCE_SERIAL: - go p.pushSerialDataToBuffer(*data) - default: - panic("pipe: invalid data source") - } - return nil + if data == nil { + return errors.New("nil processing not allowed") + } + //logrus.Println("push data to pipe:", string(data.source)) + p.storer.EnqueueRaw(*data) + switch data.source { + case SOURCE_TCP: + go p.pushTcpDataToBuffer(*data) + case SOURCE_SERIAL: + go p.pushSerialDataToBuffer(*data) + default: + panic("pipe: invalid data source") + } + return nil } func (p *pipeline) pushTcpDataToBuffer(data sensorData) { - p.synchroniz.mutex.RLock() - if p.synchroniz.tcpSerialDelayMs > 0 { - time.Sleep(time.Duration(p.synchroniz.tcpSerialDelayMs) * time.Millisecond) - } - p.synchroniz.mutex.RLock() - p.buffer.tcpMutex.Lock() - p.buffer.MeasTcp = p.buffer.MeasTcp.ConsolidateExTime(data) - p.buffer.tcpMutex.Unlock() + p.synchroniz.mutex.RLock() + if p.synchroniz.tcpSerialDelayMs > 0 { + time.Sleep(time.Duration(p.synchroniz.tcpSerialDelayMs) * time.Millisecond) + } + p.synchroniz.mutex.RLock() + p.buffer.tcpMutex.Lock() + p.buffer.MeasTcp = p.buffer.MeasTcp.ConsolidateExTime(data) + p.buffer.tcpMutex.Unlock() } func (p *pipeline) pushSerialDataToBuffer(data sensorData) { - p.synchroniz.mutex.RLock() - if p.synchroniz.tcpSerialDelayMs < 0 { - time.Sleep(time.Duration(-p.synchroniz.tcpSerialDelayMs) * time.Millisecond) - } - p.synchroniz.mutex.RUnlock() - p.buffer.serialMutex.Lock() - p.buffer.MeasSerial = p.buffer.MeasSerial.ConsolidateEpochsOnly(data) - p.buffer.serialMutex.Unlock() + p.synchroniz.mutex.RLock() + if p.synchroniz.tcpSerialDelayMs < 0 { + time.Sleep(time.Duration(-p.synchroniz.tcpSerialDelayMs) * time.Millisecond) + } + p.synchroniz.mutex.RUnlock() + p.buffer.serialMutex.Lock() + p.buffer.MeasSerial = p.buffer.MeasSerial.ConsolidateEpochsOnly(data) + p.buffer.serialMutex.Unlock() } func (p *pipeline) Close() { - p.active = false + p.mu.Lock() + p.active = false + p.mu.Unlock() } diff --git a/core/sensordata.go b/core/sensordata.go index e4d8ebf..afd1575 100644 --- a/core/sensordata.go +++ b/core/sensordata.go @@ -1,174 +1,174 @@ package core import ( - "errors" - "git.timovolkmann.de/gyrogpsc/ublox" - "github.com/sirupsen/logrus" - "github.com/tidwall/gjson" - "math" - "time" + "errors" + "git.timovolkmann.de/gyrogpsc/ublox" + "github.com/sirupsen/logrus" + "github.com/tidwall/gjson" + "math" + "time" ) type sourceId string const ( - SOURCE_TCP sourceId = "SOURCE_TCP" - SOURCE_SERIAL sourceId = "SOURCE_SERIAL" + SOURCE_TCP sourceId = "SOURCE_TCP" + SOURCE_SERIAL sourceId = "SOURCE_SERIAL" ) type sensorData struct { - itow uint32 - source sourceId - ServerTime time.Time - Timestamp int64 - Position [3]float64 - Orientation [3]float64 + itow uint32 + source sourceId + ServerTime time.Time + Timestamp int64 + Position [3]float64 + Orientation [3]float64 } type recordPair struct { - RecordTime time.Time - data map[sourceId]sensorData + RecordTime time.Time + data map[sourceId]sensorData } type rawRecord struct { - RecordTime time.Time - sensorData + RecordTime time.Time + sensorData } func (s sensorData) isSameEpoch(n sensorData) bool { - if n.itow == 0 { - return false - } - return s.itow == n.itow + if n.itow == 0 { + return false + } + return s.itow == n.itow } // Consolidates two sensordata elements if they are in the same epoch func (s sensorData) ConsolidateEpochsOnly(n sensorData) sensorData { - s.checkSources(&n) - if s.isSameEpoch(n) { - null := sensorData{} + s.checkSources(&n) + if s.isSameEpoch(n) { + null := sensorData{} - if n.Timestamp == null.Timestamp { - n.Timestamp = s.Timestamp - } - if n.Position == null.Position { - n.Position = s.Position - } - if n.Orientation == null.Orientation { - n.Orientation = s.Orientation - } - } - return n + if n.Timestamp == null.Timestamp { + n.Timestamp = s.Timestamp + } + if n.Position == null.Position { + n.Position = s.Position + } + if n.Orientation == null.Orientation { + n.Orientation = s.Orientation + } + } + return n } // Consolidates two sensordata elements but ignores timestamps func (s sensorData) ConsolidateExTime(n sensorData) sensorData { - s.checkSources(&n) - null := sensorData{} + s.checkSources(&n) + null := sensorData{} - if n.Position == null.Position { - n.Position = s.Position - } - if n.Orientation == null.Orientation { - n.Orientation = s.Orientation - } - return n + if n.Position == null.Position { + n.Position = s.Position + } + if n.Orientation == null.Orientation { + n.Orientation = s.Orientation + } + return n } func (s *sensorData) checkSources(n *sensorData) { - if (s.source != n.source && *s != sensorData{}) { - logrus.Println(s) - logrus.Println(n) - logrus.Fatalln("Do not consolidate sensorData from different Sources") - } + if (s.source != n.source && *s != sensorData{}) { + logrus.Println(s) + logrus.Println(n) + logrus.Fatalln("Do not consolidate sensorData from different Sources") + } } var ( - errNotImplemented = errors.New("message not implemented") - errRawMessage = errors.New("raw message") + errNotImplemented = errors.New("message not implemented") + errRawMessage = errors.New("raw message") ) func ConvertUbxToSensorData(msg interface{}) (*sensorData, error) { - sd := &sensorData{ - ServerTime: time.Now(), - source: SOURCE_SERIAL, - } - switch v := msg.(type) { - case *ublox.NavPvt: - //logrus.Println("NAV-PVT") - sd.itow = v.ITOW_ms - sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC).UnixNano() - sd.Position[0] = float64(v.Lat_dege7) / 1e+7 - sd.Position[1] = float64(v.Lon_dege7) / 1e+7 - sd.Position[2] = float64(v.HMSL_mm) / 1e+3 // mm in m - case *ublox.HnrPvt: - //logrus.Println("HNR-PVT") - sd.itow = v.ITOW_ms - sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC).UnixNano() - sd.Position[0] = float64(v.Lat_dege7) / 1e+7 - sd.Position[1] = float64(v.Lon_dege7) / 1e+7 - sd.Position[2] = float64(v.HMSL_mm) / 1e+3 // mm in m - case *ublox.NavAtt: - //logrus.Println("NAV-ATT") - sd.itow = v.ITOW_ms - sd.Orientation[0] = float64(v.Pitch_deg) * 1e-5 - sd.Orientation[1] = float64(v.Roll_deg) * 1e-5 - sd.Orientation[2] = float64(v.Heading_deg) * 1e-5 - case *ublox.RawMessage: - //class := make([]byte, 2) - //binary.LittleEndian.PutUint16(class, v.ClassID()) - //logrus.Printf("%#v, %#v", class[0],class[1]) - return nil, nil - default: - return nil, errNotImplemented - } - return sd, nil + sd := &sensorData{ + ServerTime: time.Now(), + source: SOURCE_SERIAL, + } + switch v := msg.(type) { + case *ublox.NavPvt: + //logrus.Println("NAV-PVT") + sd.itow = v.ITOW_ms + sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC).UnixNano() + sd.Position[0] = float64(v.Lat_dege7) / 1e+7 + sd.Position[1] = float64(v.Lon_dege7) / 1e+7 + sd.Position[2] = float64(v.HMSL_mm) / 1e+3 // mm in m + case *ublox.HnrPvt: + //logrus.Println("HNR-PVT") + sd.itow = v.ITOW_ms + sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC).UnixNano() + sd.Position[0] = float64(v.Lat_dege7) / 1e+7 + sd.Position[1] = float64(v.Lon_dege7) / 1e+7 + sd.Position[2] = float64(v.HMSL_mm) / 1e+3 // mm in m + case *ublox.NavAtt: + //logrus.Println("NAV-ATT") + sd.itow = v.ITOW_ms + sd.Orientation[0] = float64(v.Pitch_deg) * 1e-5 + sd.Orientation[1] = float64(v.Roll_deg) * 1e-5 + sd.Orientation[2] = float64(v.Heading_deg) * 1e-5 + case *ublox.RawMessage: + //class := make([]byte, 2) + //binary.LittleEndian.PutUint16(class, v.ClassID()) + //logrus.Printf("%#v, %#v", class[0],class[1]) + return nil, nil + default: + return nil, errNotImplemented + } + return sd, nil } func ConvertSensorDataPhone(jsonData []byte) (*sensorData, error) { - if gjson.Get(string(jsonData), "os").String() == "hyperimu" { - return convertAndroidHyperImu(jsonData) - } - return convertIPhoneSensorLog(jsonData) + if gjson.Get(string(jsonData), "os").String() == "hyperimu" { + return convertAndroidHyperImu(jsonData) + } + return convertIPhoneSensorLog(jsonData) } func convertIPhoneSensorLog(jsonData []byte) (*sensorData, error) { - timestamp := gjson.Get(string(jsonData), "locationTimestamp_since1970").Float() - lat := gjson.Get(string(jsonData), "locationLatitude").Float() - lon := gjson.Get(string(jsonData), "locationLongitude").Float() - alt := gjson.Get(string(jsonData), "locationAltitude").Float() - pitch := gjson.Get(string(jsonData), "motionPitch").Float() * 180 / math.Pi - roll := gjson.Get(string(jsonData), "motionRoll").Float() * 180 / math.Pi - yaw := gjson.Get(string(jsonData), "motionYaw").Float() * 180 / math.Pi - sd := &sensorData{ - ServerTime: time.Now(), - source: SOURCE_TCP, - Timestamp: int64(timestamp * float64(time.Second)), - Position: [3]float64{lat, lon, alt}, - Orientation: [3]float64{pitch, roll, yaw}, - //Timestamp: time.Unix(0, prep.Timestamp * int64(time.Millisecond)), - } - //logrus.Println(string(pretty.Pretty(jsonData))) - //logrus.Println(sd) - return sd, nil + timestamp := gjson.Get(string(jsonData), "locationTimestamp_since1970").Float() + lat := gjson.Get(string(jsonData), "locationLatitude").Float() + lon := gjson.Get(string(jsonData), "locationLongitude").Float() + alt := gjson.Get(string(jsonData), "locationAltitude").Float() + pitch := gjson.Get(string(jsonData), "motionPitch").Float() * 180 / math.Pi + roll := gjson.Get(string(jsonData), "motionRoll").Float() * 180 / math.Pi + yaw := gjson.Get(string(jsonData), "motionYaw").Float() * 180 / math.Pi + sd := &sensorData{ + ServerTime: time.Now(), + source: SOURCE_TCP, + Timestamp: int64(timestamp * float64(time.Second)), + Position: [3]float64{lat, lon, alt}, + Orientation: [3]float64{pitch, roll, yaw}, + //Timestamp: time.Unix(0, prep.Timestamp * int64(time.Millisecond)), + } + //logrus.Println(string(pretty.Pretty(jsonData))) + //logrus.Println(sd) + return sd, nil } func convertAndroidHyperImu(jsonData []byte) (*sensorData, error) { - timestamp := gjson.Get(string(jsonData), "Timestamp").Int() - lat := gjson.Get(string(jsonData), "GPS.0").Float() - lon := gjson.Get(string(jsonData), "GPS.1").Float() - alt := gjson.Get(string(jsonData), "GPS.2").Float() - pitch := gjson.Get(string(jsonData), "orientation.0").Float() - roll := gjson.Get(string(jsonData), "orientation.1").Float() - yaw := gjson.Get(string(jsonData), "orientation.2").Float() + timestamp := gjson.Get(string(jsonData), "Timestamp").Int() + lat := gjson.Get(string(jsonData), "GPS.0").Float() + lon := gjson.Get(string(jsonData), "GPS.1").Float() + alt := gjson.Get(string(jsonData), "GPS.2").Float() + pitch := gjson.Get(string(jsonData), "orientation.0").Float() + roll := gjson.Get(string(jsonData), "orientation.1").Float() + yaw := gjson.Get(string(jsonData), "orientation.2").Float() - sd := &sensorData{ - ServerTime: time.Now(), - source: SOURCE_TCP, - Timestamp: timestamp * int64(time.Millisecond), - Position: [3]float64{lat, lon, alt}, - Orientation: [3]float64{pitch, roll, yaw}, - //Timestamp: time.Unix(0, prep.Timestamp * int64(time.Millisecond)), - } - return sd, nil + sd := &sensorData{ + ServerTime: time.Now(), + source: SOURCE_TCP, + Timestamp: timestamp * int64(time.Millisecond), + Position: [3]float64{lat, lon, alt}, + Orientation: [3]float64{pitch, roll, yaw}, + //Timestamp: time.Unix(0, prep.Timestamp * int64(time.Millisecond)), + } + return sd, nil } diff --git a/core/service.go b/core/service.go index cbb6bb1..d67c702 100644 --- a/core/service.go +++ b/core/service.go @@ -1,104 +1,149 @@ package core import ( - "github.com/google/uuid" - "github.com/sirupsen/logrus" - "time" + "github.com/google/uuid" + "github.com/sirupsen/logrus" + "time" ) type OpMode uint8 const ( - STOPPED OpMode = iota - LIVE - REPLAY + STOPPED OpMode = iota + LIVE + RECORDING + REPLAY ) type trackingService struct { - current *Tracking - config *Configuration - pipe *pipeline - repo Repo - opMode OpMode - collectors []Collector + current *Tracking + config *Configuration + pipe *pipeline + repo Repo + opMode OpMode + collectors []Collector } func TrackingService(r Repo, d Publisher, c *Configuration) *trackingService { - t := &Tracking{} - return &trackingService{ - current: t, - opMode: STOPPED, - config: c, - repo: r, - pipe: NewPipeline(d, t, c), - collectors: nil, - } + t := &Tracking{} + ts := &trackingService{ + current: t, + opMode: STOPPED, + config: c, + repo: r, + pipe: NewPipeline(d, t, c), + collectors: nil, + } + //ts.pipe.Run() + return ts } +//const( +// errA error = errors.New("A") +//) + func (t *trackingService) AllTrackings() { - panic("implement me") + panic("implement me") } -func (t *trackingService) NewTracking(cols ...CollectorType) { - logrus.Debug("new tracking:", cols) - t.opMode = LIVE - t.collectors = nil - for _, col := range cols { - t.collectors = append(t.collectors, NewCollector(col, t.pipe, t.config)) - } - *t.current = emptyTracking() - t.current.collectors = cols - for _, e := range t.collectors { - e.Collect() - } - t.pipe.Run() - +func (t *trackingService) NewSetup(cols ...CollectorType) { + logrus.Info("SERVICE: NEW SETUP") + if t.opMode == RECORDING { + logrus.Println("trackingservice: no reset while recording") + return + } + if t.opMode == LIVE { + logrus.Println("trackingservice: stop currently running setup before creating new one") + t.StopAll() + } + logrus.Debug("new tracking:", cols) + t.opMode = LIVE + t.collectors = nil + for _, col := range cols { + t.collectors = append(t.collectors, NewCollector(col, t.pipe, t.config)) + } + t.safelyReplaceTracking(emptyTracking()) + t.current.Collectors = cols + for _, e := range t.collectors { + e.Collect() + } + t.pipe.Run() + //time.Sleep(3 * time.Second) } func (t *trackingService) StartRecord() { - if t.opMode != LIVE { - logrus.Println("trackingservice: wrong mode of operation") - } - t.current.TimeCreated = time.Now() - t.pipe.Record() + logrus.Info("SERVICE: START RECORD") + if t.opMode != LIVE { + logrus.Println("trackingservice: wrong mode of operation") + return + } + t.opMode = RECORDING + t.current.TimeCreated = time.Now() + t.pipe.Record() } func (t *trackingService) StopRecord() { - if t.opMode != LIVE { - logrus.Println("trackingservice: wrong mode of operation") - } - t.pipe.Stop() - for _, e := range t.collectors { - e.Stop() - } - err := t.repo.Save(*t.current) - if err != nil { - logrus.Println(err) - } - t.NewTracking(t.current.collectors...) + logrus.Info("SERVICE: STOP RECORD") + if t.opMode != RECORDING { + logrus.Println("trackingservice: couldn't stop. not recording") + return + } + t.opMode = LIVE + t.pipe.StopRecord() + + m1.Lock() + m2.Lock() + err := t.repo.Save(*t.current) + m2.Unlock() + m1.Unlock() + + if err != nil { + logrus.Println(err) + } + t.safelyReplaceTracking(emptyTracking()) } -func (t *trackingService) Reset() { - t.opMode = STOPPED - t.collectors = nil +func (t *trackingService) StopAll() { + logrus.Info("SERVICE: STOP ALL") + if t.opMode == RECORDING { + logrus.Println("trackingservice: stop recording gracefully") + t.StopRecord() + } + t.opMode = STOPPED + t.pipe.Close() + for _, e := range t.collectors { + e.Close() + } + t.collectors = nil + t.safelyReplaceTracking(emptyTracking()) } func (t *trackingService) DeleteTracking(trackingId uuid.UUID) { - panic("implement me") + panic("implement me") } func (t *trackingService) StartReplay() { - panic("implement me") + panic("implement me") } func (t *trackingService) PauseReplay() { - panic("implement me") + panic("implement me") } func (t *trackingService) StopReplay() { - panic("implement me") + panic("implement me") } func (t *trackingService) LoadTracking(trackingId uuid.UUID) { } + + +func (t *trackingService) safelyReplaceTracking(tr Tracking) { + m1.Lock() + m2.Lock() + *t.current = tr + m2.Unlock() + m1.Unlock() +} + diff --git a/core/trackings.go b/core/trackings.go index beb1f98..12660de 100644 --- a/core/trackings.go +++ b/core/trackings.go @@ -1,60 +1,62 @@ package core import ( - "github.com/google/uuid" - "github.com/sirupsen/logrus" - "sync" - "time" + "github.com/google/uuid" + "github.com/sirupsen/logrus" + "sync" + "time" ) +var m1 sync.RWMutex +var m2 sync.RWMutex + type Tracking struct { - TrackingMetadata - Records []recordPair - Rawdata []rawRecord - mu sync.Mutex + TrackingMetadata + Records []recordPair + Rawdata []rawRecord } type TrackingMetadata struct { - UUID uuid.UUID - TimeCreated time.Time - collectors []CollectorType + UUID uuid.UUID + TimeCreated time.Time + Collectors []CollectorType } func (s *Tracking) EnqueuePair(tcp sensorData, ser sensorData) { - s.mu.Lock() - defer s.mu.Unlock() - rp := recordPair{ - RecordTime: time.Now(), - data: map[sourceId]sensorData{ - tcp.source: tcp, - ser.source: ser, - }, - } - s.Records = append(s.Records, rp) - logrus.Debugln("tracking Records: len->", len(s.Records)) + rp := recordPair{ + RecordTime: time.Now(), + data: map[sourceId]sensorData{ + tcp.source: tcp, + ser.source: ser, + }, + } + m1.Lock() + s.Records = append(s.Records, rp) + logrus.Debugln("tracking Records: len->", len(s.Records)) + m1.Unlock() } func (s *Tracking) EnqueueRaw(data sensorData) { - s.mu.Lock() - defer s.mu.Unlock() - sr := rawRecord{ - time.Now(), - data, - } - s.Rawdata = append(s.Rawdata, sr) - logrus.Debugln("raw data points: len->", len(s.Records)) + sr := rawRecord{ + time.Now(), + data, + } + m1.Lock() + s.Rawdata = append(s.Rawdata, sr) + logrus.Debugln("raw data points: len->", len(s.Rawdata)) + m1.Unlock() } func emptyTracking() Tracking { - return Tracking{ - TrackingMetadata: TrackingMetadata{ - UUID: uuid.New(), - }, - Records: []recordPair{}, - Rawdata: []rawRecord{}, - } + return Tracking{ + TrackingMetadata: TrackingMetadata{ + UUID: uuid.New(), + }, + Records: []recordPair{}, + Rawdata: []rawRecord{}, + } } func (s *Tracking) isEmpty() bool { - return len(s.Rawdata)+len(s.Records) == 0 + return len(s.Rawdata)+len(s.Records) == 0 } diff --git a/go.mod b/go.mod index 67522e3..4d61c0a 100644 --- a/go.mod +++ b/go.mod @@ -14,4 +14,5 @@ require ( github.com/tidwall/gjson v1.6.0 github.com/tidwall/pretty v1.0.2 // indirect go.bug.st/serial v1.1.1 + golang.org/x/sync v0.0.0-20190423024810-112230192c58 ) diff --git a/go.sum b/go.sum index 9f012a5..ac10ab6 100644 --- a/go.sum +++ b/go.sum @@ -304,6 +304,7 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/storage/kvstore.go b/storage/kvstore.go index c1fcae8..4be90ae 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -1,33 +1,197 @@ package storage import ( - "git.timovolkmann.de/gyrogpsc/core" - "github.com/dgraph-io/badger/v2" - "github.com/google/uuid" - "github.com/sirupsen/logrus" + "encoding/binary" + "encoding/json" + "git.timovolkmann.de/gyrogpsc/core" + "github.com/dgraph-io/badger/v2" + "github.com/google/uuid" + "github.com/sirupsen/logrus" + "os" + "path/filepath" ) // Must implement Repo type badgerStore struct { - db *badger.DB + trackings *badger.DB + records *badger.DB + rawdata *badger.DB } func NewRepository(c *core.Configuration) *badgerStore { - db, err := badger.Open(badger.DefaultOptions(".")) - if err != nil { - logrus.Warn(err) - } - return &badgerStore{db} + dir, _ := os.Getwd() + logrus.Debug(dir) + if _, err := os.Stat(filepath.Join(dir,"_db")); os.IsNotExist(err) { + os.Mkdir(filepath.Join(dir,"_db"), os.ModePerm) + } + + tr, err := badger.Open(badger.DefaultOptions("_db/trackings")) + dp, err := badger.Open(badger.DefaultOptions("_db/records")) + rd, err := badger.Open(badger.DefaultOptions("_db/raw")) + + if err != nil { + logrus.Error(err) + } + return &badgerStore{trackings: tr, records: dp, rawdata: rd} } -func (r *badgerStore) Save(tracking core.Tracking) error { - panic("implement me") +func (r *badgerStore) isDbAvailable() bool { + return r.trackings.IsClosed() || r.records.IsClosed() || r.rawdata.IsClosed() } +func (r *badgerStore) Save(tr core.Tracking) error { + if ok := r.isDbAvailable(); ok { + logrus.Error("unable to write to database. database closed!") + return badger.ErrDBClosed + } + ts, err := tr.TimeCreated.MarshalText() + if err != nil { + logrus.Error(err, tr) + } + logrus.Info("save tracking:", tr.TimeCreated) + meta, err := json.Marshal(tr.TrackingMetadata) + if err != nil { + logrus.Error(err, tr) + return err + } + err = r.records.Update(func(txn *badger.Txn) error { + for _, v := range tr.Records { + k := createDataKey(tr.UUID, v.RecordTime.UnixNano()) + j, err := json.Marshal(v) + if err != nil { + return err + } + txn.Set(k, j) + } + return nil + }) + if err != nil { + logrus.Error(err, tr) + return err + } + err = r.records.Update(func(txn *badger.Txn) error { + for _, v := range tr.Rawdata { + k := createDataKey(tr.UUID, v.Timestamp) + j, err := json.Marshal(v) + if err != nil { + return err + } + txn.Set(k, j) + } + return nil + }) + if err != nil { + logrus.Error(err, tr) + return err + } + err = r.trackings.Update(func(txn *badger.Txn) error { + err := txn.Set(ts, meta) + return err + }) + if err != nil { + logrus.Error(err, tr) + return err + } + logrus.Info("sucessfully saved tracking") + return nil +} + +//func (r *badgerStore) Save(tracking *core.Tracking) error { +// ts, err := tracking.TimeCreated.MarshalText() +// if err != nil { +// logrus.Error(err, tracking) +// } +// logrus.Info("save tracking:", ts) +// meta, err := json.Marshal(tracking.TrackingMetadata) +// if err != nil { +// logrus.Error(err, tracking) +// return err +// } +// wg := sync.WaitGroup{} +// wg.Add(3) +// ch := make(chan error, 3) +// go func() { +// defer wg.Done() +// err = r.records.Update(func(txn *badger.Txn) error { +// for _, v := range tracking.Records { +// k := createDataKey(tracking.UUID, v.RecordTime.UnixNano()) +// j, err := json.Marshal(v) +// if err != nil { +// return err +// } +// txn.Set(k, j) +// } +// return nil +// }) +// ch <- err +// }() +// go func() { +// defer wg.Done() +// err = r.records.Update(func(txn *badger.Txn) error { +// for _, v := range tracking.Rawdata { +// k := createDataKey(tracking.UUID, v.Timestamp) +// j, err := json.Marshal(v) +// if err != nil { +// return err +// } +// txn.Set(k, j) +// } +// return nil +// }) +// ch <- err +// }() +// go func() { +// defer wg.Done() +// err = r.trackings.Update(func(txn *badger.Txn) error { +// err := txn.Set(ts, meta) +// return err +// }) +// ch <- err +// }() +// wg.Wait() +// for { +// select { +// case err := <-ch: +// if err != nil { +// logrus.Error(err, tracking) +// return err +// } +// default: +// close(ch) +// break +// } +// } +// return nil +//} + func (r *badgerStore) LoadAll() ([]core.TrackingMetadata, error) { - panic("implement me") + panic("implement me") } func (r *badgerStore) Load(id uuid.UUID) (core.Tracking, error) { - panic("implement me") + panic("implement me") +} + +func createDataKey(uid uuid.UUID, timestamp int64) []byte { + prefix, err := uid.MarshalText() + if err != nil || timestamp < 0 { + logrus.Error("unable to create key", err) + } + suffix := make([]byte, 8) + binary.LittleEndian.PutUint64(suffix, uint64(timestamp)) + return append(prefix, suffix...) +} + +func unmarshalDataKey(key []byte) (uuid.UUID, int64) { + if len(key) != 24 { + panic("corrupted key") + } + prefix := key[0:15] + suffix := key[15:24] + uid, err := uuid.FromBytes(prefix) + if err != nil { + panic("corrupted key") + } + timestamp := int64(binary.LittleEndian.Uint64(suffix)) + return uid, timestamp } diff --git a/ublox/decode.go b/ublox/decode.go index 3e0ccab..5e98eba 100644 --- a/ublox/decode.go +++ b/ublox/decode.go @@ -6,152 +6,152 @@ package ublox import ( - "bufio" - "bytes" - "encoding/binary" - "errors" - "fmt" - "io" + "bufio" + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" ) // A decoder scans an io stream into UBX (0xB5-0x62 separated) or NMEA ("$xxx,,,,*FF\r\n") frames. // If you have an unmixed stream of NMEA-only data you can use nmea.Decode() on bufio.Scanner.Bytes() directly. type decoder struct { - s *bufio.Scanner + s *bufio.Scanner } // NewDecoder creates a new bufio Scanner with a splitfunc that can handle both UBX and NMEA frames. func NewDecoder(r io.Reader) *decoder { - d := bufio.NewScanner(r) - d.Split(splitFunc) - return &decoder{s: d} + d := bufio.NewScanner(r) + d.Split(splitFunc) + return &decoder{s: d} } // Assume we're either at the start of an NMEA sentence or at the start of a UBX message // if not, skip to the first $ or UBX SOM. func splitFunc(data []byte, atEOF bool) (advance int, token []byte, err error) { - if len(data) == 0 { - return 0, nil, nil - } + if len(data) == 0 { + return 0, nil, nil + } - switch data[0] { - case '$': - return bufio.ScanLines(data, atEOF) + switch data[0] { + case '$': + return bufio.ScanLines(data, atEOF) - case 0xB5: - if len(data) < 8 { - if atEOF { - return len(data), nil, io.ErrUnexpectedEOF - } - return 0, nil, nil - } + case 0xB5: + if len(data) < 8 { + if atEOF { + return len(data), nil, io.ErrUnexpectedEOF + } + return 0, nil, nil + } - sz := 8 + int(data[4]) + int(data[5])*256 - if data[1] == 0x62 { - if sz <= len(data) { - return sz, data[:sz], nil - } - if sz <= bufio.MaxScanTokenSize { - return 0, nil, nil - } - } - } + sz := 8 + int(data[4]) + int(data[5])*256 + if data[1] == 0x62 { + if sz <= len(data) { + return sz, data[:sz], nil + } + if sz <= bufio.MaxScanTokenSize { + return 0, nil, nil + } + } + } - // resync to SOM or $ - data = data[1:] - i1 := bytes.IndexByte(data, '$') - if i1 < 0 { - i1 = len(data) - } + // resync to SOM or $ + data = data[1:] + i1 := bytes.IndexByte(data, '$') + if i1 < 0 { + i1 = len(data) + } - i2 := bytes.IndexByte(data, 0xB5) - if i2 < 0 { - i2 = len(data) - } - if i1 > i2 { - i1 = i2 - } - return 1 + i1, nil, nil + i2 := bytes.IndexByte(data, 0xB5) + if i2 < 0 { + i2 = len(data) + } + if i1 > i2 { + i1 = i2 + } + return 1 + i1, nil, nil } // Decode reads on NMEA or UBX frame and calls decodeUbx accordingly to parse the message, while skipping NMEA. func (d *decoder) Decode() (msg interface{}, err error) { - if !d.s.Scan() { - if err = d.s.Err(); err == nil { - err = io.EOF - } - return nil, err - } + if !d.s.Scan() { + if err = d.s.Err(); err == nil { + err = io.EOF + } + return nil, err + } - switch d.s.Bytes()[0] { - case '$': - return nil, errors.New("NMEA not implemented") - //return nmea.Decode(d.s.Bytes()) - case 0xB5: - return decodeUbx(d.s.Bytes()) - } - panic("impossible frame") + switch d.s.Bytes()[0] { + case '$': + return nil, errors.New("NMEA not implemented") + //return nmea.Decode(d.s.Bytes()) + case 0xB5: + return decodeUbx(d.s.Bytes()) + } + panic("impossible frame") } var ( - errInvalidFrame = errors.New("invalid UBX frame") - errInvalidChkSum = errors.New("invalid UBX checksum") + errInvalidFrame = errors.New("invalid UBX frame") + errInvalidChkSum = errors.New("invalid UBX checksum") ) func decodeUbx(frame []byte) (msg Message, err error) { - buf := bytes.NewReader(frame) + buf := bytes.NewReader(frame) - var header struct { - Preamble uint16 - ClassID uint16 - Length uint16 - } + var header struct { + Preamble uint16 + ClassID uint16 + Length uint16 + } - if err := binary.Read(buf, binary.LittleEndian, &header); err != nil { - return nil, err - } + if err := binary.Read(buf, binary.LittleEndian, &header); err != nil { + return nil, err + } - if header.Preamble != 0x62B5 { - return nil, errInvalidFrame - } + if header.Preamble != 0x62B5 { + return nil, errInvalidFrame + } - if buf.Len()+2 < int(header.Length) { - return nil, io.ErrShortBuffer - } + if buf.Len()+2 < int(header.Length) { + return nil, io.ErrShortBuffer + } - var a, b byte - for _, v := range frame[2 : header.Length+6] { - a += byte(v) - b += a - } + var a, b byte + for _, v := range frame[2 : header.Length+6] { + a += byte(v) + b += a + } - if frame[header.Length+6] != a || frame[header.Length+7] != b { - return nil, errInvalidChkSum - } + if frame[header.Length+6] != a || frame[header.Length+7] != b { + return nil, errInvalidChkSum + } - switch header.ClassID { - case 0x0105: // ACK-ACK - fmt.Println("ACK-ACK not implemented") - //msg = &AckAck{} - case 0x0005: // ACK-NAK - fmt.Println("ACK-NAK not implemented") - //msg = &AckNak{} - case 0x0701: // NAV-PVT - msg = &NavPvt{} - case 0x0028: // HNR-PVT - msg = &HnrPvt{} - case 0x0501: // NAV-ATT - msg = &NavAtt{} - default: - } - if msg != nil { - err = binary.Read(buf, binary.LittleEndian, msg) - } else { - msg = &RawMessage{classID: header.ClassID, Data: append([]byte(nil), frame[6:len(frame)-2]...)} - } - //fmt.Println(msg) + switch header.ClassID { + case 0x0105: // ACK-ACK + fmt.Println("ACK-ACK not implemented") + //msg = &AckAck{} + case 0x0005: // ACK-NAK + fmt.Println("ACK-NAK not implemented") + //msg = &AckNak{} + case 0x0701: // NAV-PVT + msg = &NavPvt{} + case 0x0028: // HNR-PVT + msg = &HnrPvt{} + case 0x0501: // NAV-ATT + msg = &NavAtt{} + default: + } + if msg != nil { + err = binary.Read(buf, binary.LittleEndian, msg) + } else { + msg = &RawMessage{classID: header.ClassID, Data: append([]byte(nil), frame[6:len(frame)-2]...)} + } + //fmt.Println(msg) - return msg, err + return msg, err } diff --git a/ublox/messages.go b/ublox/messages.go index 610f7b2..fbebb09 100644 --- a/ublox/messages.go +++ b/ublox/messages.go @@ -1,7 +1,7 @@ package ublox type Message interface { - ClassID() uint16 + ClassID() uint16 } //type UbxMessage interface { @@ -11,90 +11,90 @@ type Message interface { //} type RawMessage struct { - classID uint16 - Data []byte + classID uint16 + Data []byte } func (msg *RawMessage) ClassID() uint16 { return msg.classID } type NavPvt struct { - ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details. - Year_y uint16 // - Year (UTC) - Month_month byte // - Month, range 1..12 (UTC) - Day_d byte // - Day of month, range 1..31 (UTC) - Hour_h byte // - Hour of day, range 0..23 (UTC) - Min_min byte // - Minute of hour, range 0..59 (UTC) - Sec_s byte // - Seconds of minute, range 0..60 (UTC) - Valid NavPVTValid // - Validity flags (see graphic below) - TAcc_ns uint32 // - Time accuracy estimate (UTC) - Nano_ns int32 // - Fraction of second, range -1e9 .. 1e9 (UTC) - FixType NavPVTFixType // - GNSSfix Type - Flags NavPVTFlags // - Fix status flags (see graphic below) - Flags2 NavPVTFlags2 // - Additional flags (see graphic below) - NumSV byte // - Number of satellites used in Nav Solution - Lon_dege7 int32 // 1e-7 Longitude - Lat_dege7 int32 // 1e-7 Latitude - Height_mm int32 // - Height above ellipsoid - HMSL_mm int32 // - Height above mean sea level - HAcc_mm uint32 // - Horizontal accuracy estimate - VAcc_mm uint32 // - Vertical accuracy estimate - VelN_mm_s int32 // - NED north velocity - VelE_mm_s int32 // - NED east velocity - VelD_mm_s int32 // - NED down velocity - GSpeed_mm_s int32 // - Ground Speed (2-D) - HeadMot_dege5 int32 // 1e-5 Heading of motion (2-D) - SAcc_mm_s uint32 // - Speed accuracy estimate - HeadAcc_dege5 uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle) - PDOPe2 uint16 // 0.01 Position DOP - Flags3 NavPVTFlags3 // - Additional flags (see graphic below) - Reserved1 [5]byte // - Reserved - HeadVeh_dege5 int32 // 1e-5 Heading of vehicle (2-D), this is only valid when headVehValid is set, otherwise the output is set to the heading of motion - MagDec_dege2 int16 // 1e-2 Magnetic declination. Only supported in ADR 4.10 and later. - MagAcc_deg2e uint16 // 1e-2 Magnetic declination accuracy. Only supported in ADR 4.10 and later. + ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details. + Year_y uint16 // - Year (UTC) + Month_month byte // - Month, range 1..12 (UTC) + Day_d byte // - Day of month, range 1..31 (UTC) + Hour_h byte // - Hour of day, range 0..23 (UTC) + Min_min byte // - Minute of hour, range 0..59 (UTC) + Sec_s byte // - Seconds of minute, range 0..60 (UTC) + Valid NavPVTValid // - Validity flags (see graphic below) + TAcc_ns uint32 // - Time accuracy estimate (UTC) + Nano_ns int32 // - Fraction of second, range -1e9 .. 1e9 (UTC) + FixType NavPVTFixType // - GNSSfix Type + Flags NavPVTFlags // - Fix status flags (see graphic below) + Flags2 NavPVTFlags2 // - Additional flags (see graphic below) + NumSV byte // - Number of satellites used in Nav Solution + Lon_dege7 int32 // 1e-7 Longitude + Lat_dege7 int32 // 1e-7 Latitude + Height_mm int32 // - Height above ellipsoid + HMSL_mm int32 // - Height above mean sea level + HAcc_mm uint32 // - Horizontal accuracy estimate + VAcc_mm uint32 // - Vertical accuracy estimate + VelN_mm_s int32 // - NED north velocity + VelE_mm_s int32 // - NED east velocity + VelD_mm_s int32 // - NED down velocity + GSpeed_mm_s int32 // - Ground Speed (2-D) + HeadMot_dege5 int32 // 1e-5 Heading of motion (2-D) + SAcc_mm_s uint32 // - Speed accuracy estimate + HeadAcc_dege5 uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle) + PDOPe2 uint16 // 0.01 Position DOP + Flags3 NavPVTFlags3 // - Additional flags (see graphic below) + Reserved1 [5]byte // - Reserved + HeadVeh_dege5 int32 // 1e-5 Heading of vehicle (2-D), this is only valid when headVehValid is set, otherwise the output is set to the heading of motion + MagDec_dege2 int16 // 1e-2 Magnetic declination. Only supported in ADR 4.10 and later. + MagAcc_deg2e uint16 // 1e-2 Magnetic declination accuracy. Only supported in ADR 4.10 and later. } func (NavPvt) ClassID() uint16 { return 0x0701 } type HnrPvt struct { - ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details. - Year_y uint16 // - Year (UTC) - Month_month byte // - Month, range 1..12 (UTC) - Day_d byte // - Day of month, range 1..31 (UTC) - Hour_h byte // - Hour of day, range 0..23 (UTC) - Min_min byte // - Minute of hour, range 0..59 (UTC) - Sec_s byte // - Seconds of minute, range 0..60 (UTC) - Valid byte // - Validity flags (see graphic below) - Nano_ns int32 // - Fraction of second, range -1e9 .. 1e9 (UTC) - FixType byte // - GNSSfix Type - Flags byte // - Fix status flags (see graphic below) - Reserved [2]byte - Lon_dege7 int32 // 1e-7 Longitude - Lat_dege7 int32 // 1e-7 Latitude - Height_mm int32 // - Height above ellipsoid - HMSL_mm int32 // - Height above mean sea level - GSpeed_mm_s int32 // - Ground Speed (2-D) - Speed_mm_s int32 // Speed (3-D) - HeadMot_dege5 int32 // 1e-5 Heading of motion (2-D) - HeadVeh_dege5 int32 // 1e-5 Heading of vehicle (2-D), this is only valid when headVehValid is set, otherwise the output is set to the heading of motion - HAcc uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle) - VAcc uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle) - SAcc uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle) - HeadAcc_dege5 uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle) - Reserved1 [4]byte // - Reserved + ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details. + Year_y uint16 // - Year (UTC) + Month_month byte // - Month, range 1..12 (UTC) + Day_d byte // - Day of month, range 1..31 (UTC) + Hour_h byte // - Hour of day, range 0..23 (UTC) + Min_min byte // - Minute of hour, range 0..59 (UTC) + Sec_s byte // - Seconds of minute, range 0..60 (UTC) + Valid byte // - Validity flags (see graphic below) + Nano_ns int32 // - Fraction of second, range -1e9 .. 1e9 (UTC) + FixType byte // - GNSSfix Type + Flags byte // - Fix status flags (see graphic below) + Reserved [2]byte + Lon_dege7 int32 // 1e-7 Longitude + Lat_dege7 int32 // 1e-7 Latitude + Height_mm int32 // - Height above ellipsoid + HMSL_mm int32 // - Height above mean sea level + GSpeed_mm_s int32 // - Ground Speed (2-D) + Speed_mm_s int32 // Speed (3-D) + HeadMot_dege5 int32 // 1e-5 Heading of motion (2-D) + HeadVeh_dege5 int32 // 1e-5 Heading of vehicle (2-D), this is only valid when headVehValid is set, otherwise the output is set to the heading of motion + HAcc uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle) + VAcc uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle) + SAcc uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle) + HeadAcc_dege5 uint32 // 1e-5 Heading accuracy estimate (both motion and vehicle) + Reserved1 [4]byte // - Reserved } func (HnrPvt) ClassID() uint16 { return 0x0028 } type NavAtt struct { - ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details. - Version byte - Reserved1 [3]byte - Roll_deg int32 - Pitch_deg int32 - Heading_deg int32 - AccRoll_deg uint32 - AccPitch_deg uint32 - AccHeading_deg uint32 + ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details. + Version byte + Reserved1 [3]byte + Roll_deg int32 + Pitch_deg int32 + Heading_deg int32 + AccRoll_deg uint32 + AccPitch_deg uint32 + AccHeading_deg uint32 } func (NavAtt) ClassID() uint16 { return 0x0501 } @@ -104,43 +104,43 @@ func (NavAtt) ClassID() uint16 { return 0x0501 } type NavPVTFixType byte const ( - NavPVTNoFix NavPVTFixType = iota - NavPVTDeadReckoning - NavPVTFix2D - NavPVTFix3D - NavPVTGNSS - NavPVTTimeOnly + NavPVTNoFix NavPVTFixType = iota + NavPVTDeadReckoning + NavPVTFix2D + NavPVTFix3D + NavPVTGNSS + NavPVTTimeOnly ) type NavPVTValid byte const ( - NavPVTValidDate NavPVTValid = (1 << iota) // valid UTC Date (see Time Validity section for details) - NavPVTValidTime // valid UTC time of day (see Time Validity section for details) - NavPVTFullyResolved // UTC time of day has been fully resolved (no seconds uncertainty). Cannot be used to check if time is completely solved. - NavPVTValidMag // valid magnetic declination + NavPVTValidDate NavPVTValid = (1 << iota) // valid UTC Date (see Time Validity section for details) + NavPVTValidTime // valid UTC time of day (see Time Validity section for details) + NavPVTFullyResolved // UTC time of day has been fully resolved (no seconds uncertainty). Cannot be used to check if time is completely solved. + NavPVTValidMag // valid magnetic declination ) type NavPVTFlags byte const ( - NavPVTGnssFixOK NavPVTFlags = 1 << 0 // valid fix (i.e within DOP & accuracy masks) - NavPVTDiffSoln NavPVTFlags = 1 << 1 // differential corrections were applied - NavPVTHeadVehValid NavPVTFlags = 1 << 5 // heading of vehicle is valid, only set if the receiver is in sensor fusion mode - NavPVTCarrSolnFloat NavPVTFlags = 1 << 6 // carrier phase range solution with floating ambiguities - NavPVTCarrSolnFixed NavPVTFlags = 1 << 7 // carrier phase range solution with fixed ambiguities + NavPVTGnssFixOK NavPVTFlags = 1 << 0 // valid fix (i.e within DOP & accuracy masks) + NavPVTDiffSoln NavPVTFlags = 1 << 1 // differential corrections were applied + NavPVTHeadVehValid NavPVTFlags = 1 << 5 // heading of vehicle is valid, only set if the receiver is in sensor fusion mode + NavPVTCarrSolnFloat NavPVTFlags = 1 << 6 // carrier phase range solution with floating ambiguities + NavPVTCarrSolnFixed NavPVTFlags = 1 << 7 // carrier phase range solution with fixed ambiguities ) type NavPVTFlags2 byte const ( - NavPVTConfirmedAvai NavPVTFlags2 = 1 << 5 // information about UTC Date and Time of Day validity confirmation is available (see Time Validity section for details) - NavPVTConfirmedDate NavPVTFlags2 = 1 << 6 // UTC Date validity could be confirmed (see Time Validity section for details) - NavPVTConfirmedTime NavPVTFlags2 = 1 << 7 // UTC Time of Day could be confirmed (see Time Validity section for details) + NavPVTConfirmedAvai NavPVTFlags2 = 1 << 5 // information about UTC Date and Time of Day validity confirmation is available (see Time Validity section for details) + NavPVTConfirmedDate NavPVTFlags2 = 1 << 6 // UTC Date validity could be confirmed (see Time Validity section for details) + NavPVTConfirmedTime NavPVTFlags2 = 1 << 7 // UTC Time of Day could be confirmed (see Time Validity section for details) ) type NavPVTFlags3 byte const ( - NavPVTInvalidLlh NavPVTFlags3 = (1 << iota) // 1 = Invalid lon, lat, height and hMSL + NavPVTInvalidLlh NavPVTFlags3 = (1 << iota) // 1 = Invalid lon, lat, height and hMSL ) diff --git a/web/http.go b/web/http.go index c68f38f..9478fe4 100644 --- a/web/http.go +++ b/web/http.go @@ -1,85 +1,85 @@ package web import ( - "git.timovolkmann.de/gyrogpsc/core" - "github.com/gofiber/fiber/v2" - "github.com/gofiber/websocket/v2" - "github.com/sirupsen/logrus" - "html/template" + "git.timovolkmann.de/gyrogpsc/core" + "github.com/gofiber/fiber/v2" + "github.com/gofiber/websocket/v2" + "github.com/sirupsen/logrus" + "html/template" ) func CreateServer(s core.Service, sub core.Subscriber, c *core.Configuration) { - app := fiber.New() - app.Static("/static", "static") + app := fiber.New() + app.Static("/static", "static") - // Application Main Page - app.Get("/", fiberHomeHandler) + // Application Main Page + app.Get("/", fiberHomeHandler) - // Websocket - app.Get("/", websocket.New(createFiberWebsocketHandler(sub))) + // Websocket + app.Get("/", websocket.New(createFiberWebsocketHandler(sub))) - // TODO: Get all SerialPorts - // app.Get("/serialports") + // TODO: Get all SerialPorts + // app.Get("/serialports") - // Tracking persistence controls - trackings := app.Group("/trackings") - trackings.Get("/", stubhander()) // Get all trackings Metadata - trackings.Post("/", stubhander()) // Initialize new tracking, open websocket and prepare for automatic recording. Toggle ?serial=true and ?tcp=true. Returns trackingId - trackings.Patch("/", stubhander()) // Starts recording - trackings.Put("/", stubhander()) // Stops current recording. Returns trackingId if record was successful - trackings.Delete("/", stubhander()) // Stops websocket connection, pipelines and collectors + // Tracking persistence controls + trackings := app.Group("/trackings") + trackings.Get("/", stubhander()) // Get all trackings Metadata + trackings.Post("/", stubhander()) // Initialize new tracking, open websocket and prepare for automatic recording. Toggle ?serial=true and ?tcp=true. Returns trackingId + trackings.Patch("/", stubhander()) // Starts recording + trackings.Put("/", stubhander()) // Stops current recording. Returns trackingId if record was successful + trackings.Delete("/", stubhander()) // Stops websocket connection, pipelines and collectors - trackings.Get("/:trackingId", stubhander()) // Gets Tracking Metadata and loads sensorRecords from storage. - trackings.Delete("/:trackingId", stubhander()) // Deletes Tracking from storage + trackings.Get("/:trackingId", stubhander()) // Gets Tracking Metadata and loads sensorRecords from storage. + trackings.Delete("/:trackingId", stubhander()) // Deletes Tracking from storage - trackings.Post("/current", stubhander()) // Starts Replay. - trackings.Patch("/current", stubhander()) // Pauses Replay. - trackings.Put("/current", stubhander()) // Stops Replay. + trackings.Post("/current", stubhander()) // Starts Replay. + trackings.Patch("/current", stubhander()) // Pauses Replay. + trackings.Put("/current", stubhander()) // Stops Replay. - logrus.Fatal(app.Listen(c.Webserver.Port)) + logrus.Fatal(app.Listen(c.Webserver.Port)) } func stubhander() fiber.Handler { - return func(ctx *fiber.Ctx) error { - return nil - } + return func(ctx *fiber.Ctx) error { + return nil + } } func createFiberWebsocketHandler(s core.Subscriber) func(conn *websocket.Conn) { - return func(c *websocket.Conn) { - // Handle and discard inbound messages - go func() { - for { - if _, _, err := c.NextReader(); err != nil { - c.Close() - break - } - } - }() + return func(c *websocket.Conn) { + // Handle and discard inbound messages + go func() { + for { + if _, _, err := c.NextReader(); err != nil { + c.Close() + break + } + } + }() - dispatcherId, channel := s.Subscribe() - defer s.Unsubscribe(dispatcherId) - for { - cmsg := <-channel - err := c.WriteMessage(websocket.TextMessage, []byte(cmsg)) - if err != nil { - logrus.Println("write:", err) + dispatcherId, channel := s.Subscribe() + defer s.Unsubscribe(dispatcherId) + for { + cmsg := <-channel + err := c.WriteMessage(websocket.TextMessage, []byte(cmsg)) + if err != nil { + logrus.Println("write:", err) - break - } - } - } + break + } + } + } } func fiberHomeHandler(c *fiber.Ctx) error { - tpl, err := template.ParseFiles("static/index.html") - if err != nil { - return err - } - err = tpl.Execute(c, "://"+c.Hostname()+"/") - if err != nil { - return err - } - return nil + tpl, err := template.ParseFiles("static/index.html") + if err != nil { + return err + } + err = tpl.Execute(c, "://"+c.Hostname()+"/") + if err != nil { + return err + } + return nil }