From c3f6a9ff5b7dd918732d7ac3cafdafbf1c1500c2 Mon Sep 17 00:00:00 2001 From: Timo Volkmann Date: Fri, 11 Dec 2020 18:38:39 +0100 Subject: [PATCH] [WIP] bugfixes --- cmd/autostart/autostart.go | 4 +- core/collectors.go | 15 +- core/pipeline.go | 282 +++++++++++++++++++------------------ go.mod | 1 + go.sum | 1 + 5 files changed, 160 insertions(+), 143 deletions(-) diff --git a/cmd/autostart/autostart.go b/cmd/autostart/autostart.go index ad55990..0f4438a 100644 --- a/cmd/autostart/autostart.go +++ b/cmd/autostart/autostart.go @@ -18,11 +18,11 @@ func main() { service := core.TrackingService(repo, disp, conf) go func() { - service.NewTracking(core.TCP, core.SERIAL) + service.NewTracking(core.TCP) time.Sleep(5 * time.Second) service.StartRecord() time.Sleep(15 * time.Second) - service.StopRecord() + //service.StopRecord() }() web.CreateServer(service, disp, conf) diff --git a/core/collectors.go b/core/collectors.go index d4bc595..fb3ed1a 100644 --- a/core/collectors.go +++ b/core/collectors.go @@ -2,13 +2,11 @@ package core import ( "fmt" + "git.timovolkmann.de/gyrogpsc/ublox" + "go.bug.st/serial" "log" "net" "os" - "time" - - "git.timovolkmann.de/gyrogpsc/ublox" - "go.bug.st/serial" ) type Collector interface { @@ -74,11 +72,14 @@ func (s *serialCollector) Collect() { break } sd, err := ConvertUbxToSensorData(meas) - if err != nil || sd == nil { + if err != nil { log.Println("convert err:", err, meas, sd) continue } - + // skip irrelevant messages + if sd == nil { + continue + } err = s.proc.Push(sd) if err != nil { @@ -172,7 +173,7 @@ func (c *tcpCollector) jsonHandler(conn net.Conn) { continue } if !c.active { - time.Sleep(50 * time.Millisecond) + //time.Sleep(50 * time.Millisecond) continue } err = c.processor.Push(sd) diff --git a/core/pipeline.go b/core/pipeline.go index 23a9a5e..faf2032 100644 --- a/core/pipeline.go +++ b/core/pipeline.go @@ -1,180 +1,194 @@ package core import ( - "encoding/json" - "errors" - "fmt" - "log" - "sync" - "time" + "encoding/json" + "errors" + "fmt" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "log" + "sync" + "time" ) type pipeline struct { - active bool - record bool - syn synchronizer - agr aggregator - pub Publisher - stor Storer - publishTicker *time.Ticker + active bool + record bool + synchroniz synchronizer + buffer pipeBuffer + publisher Publisher + storer Storer + publishTicker *time.Ticker } // pipe implements Runner & Pusher func NewPipeline(d Publisher, s Storer, conf *Configuration) *pipeline { - return &pipeline{ - false, - false, - synchronizer{ - //bufferSize: 100, - mutex: &sync.Mutex{}, - updateTicker: time.NewTicker(time.Duration(conf.Pipeline.SyncUpdateIntervalMs) * time.Millisecond), - }, - aggregator{ - tcpMutex: &sync.Mutex{}, - serialMutex: &sync.Mutex{}, - }, - d, - s, - time.NewTicker(time.Duration(conf.Pipeline.PublishIntervalMs) * time.Millisecond), - } + return &pipeline{ + false, + false, + synchronizer{ + //bufferSize: 100, + mutex: &sync.Mutex{}, + updateTicker: time.NewTicker(time.Duration(conf.Pipeline.SyncUpdateIntervalMs) * time.Millisecond), + }, + pipeBuffer{ + tcpMutex: &sync.Mutex{}, + serialMutex: &sync.Mutex{}, + }, + d, + s, + time.NewTicker(time.Duration(conf.Pipeline.PublishIntervalMs) * time.Millisecond), + } } func (p *pipeline) Run() { - p.active = true - log.Println("pipe: processing service started") - go func() { - for p.active { - <-p.syn.updateTicker.C - err := p.refreshDelay() - if err != nil { - log.Println(err) - } - } - log.Println("pipe: updater stopped") - }() - go func() { - for p.active { - <-p.publishTicker.C - err := p.Publish() - if err != nil && err.Error() != "no data available" { - log.Println(err) - } - } - log.Println("pipe: publisher stopped") - }() + p.active = true + log.Println("pipe: processing service started") + go func() { + for p.active { + <-p.synchroniz.updateTicker.C + err := p.refreshDelay() + if err != nil { + log.Println(err) + } + } + log.Println("pipe: updater stopped") + }() + go func() { + for p.active { + <-p.publishTicker.C + err := p.publish() + if err != nil && err.Error() != "no data available" { + log.Println(err) + } + } + log.Println("pipe: publisher stopped") + }() } func (p *pipeline) Record() { - p.record = true + p.record = true } func (p *pipeline) Stop() { - p.record = false + p.record = false } -func (p *pipeline) Publish() error { - p.agr.tcpMutex.Lock() - p.agr.serialMutex.Lock() +func (p *pipeline) publish() error { + p.buffer.tcpMutex.Lock() + p.buffer.serialMutex.Lock() - if (p.agr.tcpSensorData == sensorData{} && p.agr.serialSensorData == sensorData{}) { - p.agr.tcpMutex.Unlock() - p.agr.serialMutex.Unlock() - return errors.New("no data available") - } + if (p.buffer.MeasTcp == sensorData{} && p.buffer.MeasSerial == sensorData{}) { + p.buffer.tcpMutex.Unlock() + p.buffer.serialMutex.Unlock() + return errors.New("no data available") + } + if cmp.Equal(p.buffer.MeasTcp, p.buffer.LastMeasTcp, cmpopts.IgnoreUnexported(sensorData{})) && + cmp.Equal(p.buffer.MeasSerial, p.buffer.LastMeasSerial, cmpopts.IgnoreUnexported(sensorData{})) { + p.buffer.tcpMutex.Unlock() + p.buffer.serialMutex.Unlock() + return errors.New("same data") + } + log.Println("––––––––––––––––––––––––––––––––––––") + log.Printf("MEAS old: %v", p.buffer.LastMeasTcp) + log.Printf("MEAS new: %v", p.buffer.MeasTcp) + log.Println("––––––––––––––––––––––––––––––––––––") + p.buffer.LastMeasTcp = p.buffer.MeasTcp + p.buffer.LastMeasSerial = p.buffer.MeasSerial + p.storer.EnqueuePair(p.buffer.MeasTcp, p.buffer.MeasSerial) - p.stor.EnqueuePair(p.agr.tcpSensorData, p.agr.serialSensorData) + data := map[string]sensorData{ + string(SOURCE_TCP): p.buffer.MeasTcp, + string(SOURCE_SERIAL): p.buffer.MeasSerial, + } - data := map[string]sensorData{ - string(SOURCE_TCP): p.agr.tcpSensorData, - string(SOURCE_SERIAL): p.agr.serialSensorData, - } + p.buffer.tcpMutex.Unlock() + p.buffer.serialMutex.Unlock() - p.agr.tcpMutex.Unlock() - p.agr.serialMutex.Unlock() - - jdata, err := json.Marshal(data) - //log.Println(string(pretty.Pretty(jdata))) - if err != nil { - return err - } - p.pub.Publish(string(jdata)) - return nil + jdata, err := json.Marshal(data) + //log.Println(string(pretty.Pretty(jdata))) + if err != nil { + return err + } + p.publisher.Publish(string(jdata)) + return nil } -type aggregator struct { - tcpSensorData sensorData - serialSensorData sensorData - tcpMutex *sync.Mutex - serialMutex *sync.Mutex +type pipeBuffer struct { + MeasTcp sensorData + MeasSerial sensorData + LastMeasTcp sensorData + LastMeasSerial sensorData + tcpMutex *sync.Mutex + serialMutex *sync.Mutex } type UnixNanoTime int64 type synchronizer struct { - tcpSerialDelayMs int64 - mutex *sync.Mutex - updateTicker *time.Ticker + tcpSerialDelayMs int64 + mutex *sync.Mutex + updateTicker *time.Ticker } func (p *pipeline) refreshDelay() error { - log.Println("refreshing delay....") - fmt.Println("Delay TCP/SERIAL", p.syn.tcpSerialDelayMs) - p.agr.serialMutex.Lock() - p.agr.tcpMutex.Lock() - tcpTime := time.Unix(0, p.agr.tcpSensorData.Timestamp) - serTime := time.Unix(0, p.agr.serialSensorData.Timestamp) - p.agr.serialMutex.Unlock() - p.agr.tcpMutex.Unlock() - if tcpTime.UnixNano() == 0 || serTime.UnixNano() == 0 { - return errors.New("no sync possible. check if both collectors running. otherwise check GPS fix") - } - currentDelay := tcpTime.Sub(serTime).Milliseconds() - if currentDelay > 5000 || currentDelay < -5000 { - p.syn.tcpSerialDelayMs = 0 - return errors.New("skipping synchronisation! time not properly configured or facing network problems.") - } - log.Println("TCP", tcpTime.String()) - log.Println("SER", serTime.String()) - log.Println("Difference", tcpTime.Sub(serTime).Milliseconds(), "ms") - delay := tcpTime.Sub(serTime).Milliseconds() - p.syn.tcpSerialDelayMs += delay - return nil + log.Println("refreshing delay....") + fmt.Println("Delay TCP/SERIAL", p.synchroniz.tcpSerialDelayMs) + p.buffer.serialMutex.Lock() + p.buffer.tcpMutex.Lock() + tcpTime := time.Unix(0, p.buffer.MeasTcp.Timestamp) + serTime := time.Unix(0, p.buffer.MeasSerial.Timestamp) + p.buffer.serialMutex.Unlock() + p.buffer.tcpMutex.Unlock() + if tcpTime.UnixNano() == 0 || serTime.UnixNano() == 0 { + return errors.New("no sync possible. check if both collectors running. otherwise check GPS fix") + } + currentDelay := tcpTime.Sub(serTime).Milliseconds() + if currentDelay > 5000 || currentDelay < -5000 { + p.synchroniz.tcpSerialDelayMs = 0 + return errors.New("skipping synchronisation! time not properly configured or facing network problems.") + } + log.Println("TCP", tcpTime.String()) + log.Println("SER", serTime.String()) + log.Println("Difference", tcpTime.Sub(serTime).Milliseconds(), "ms") + delay := tcpTime.Sub(serTime).Milliseconds() + p.synchroniz.tcpSerialDelayMs += delay + return nil } func (p *pipeline) Push(data *sensorData) error { - if data == nil { - return errors.New("nil processing not allowed") - } - //log.Println(string(data.source)) - // TODO: persist data here with current timestamp - p.stor.EnqueueRaw(*data) - switch data.source { - case SOURCE_TCP: - go p.pushTcpDataToBuffer(*data) - case SOURCE_SERIAL: - go p.pushSerialDataToBuffer(*data) - default: - panic("pipe: invalid data source") - } - return nil + if data == nil { + return errors.New("nil processing not allowed") + } + //log.Println("push data to pipe:", string(data.source)) + p.storer.EnqueueRaw(*data) + switch data.source { + case SOURCE_TCP: + go p.pushTcpDataToBuffer(*data) + case SOURCE_SERIAL: + go p.pushSerialDataToBuffer(*data) + default: + panic("pipe: invalid data source") + } + return nil } func (p *pipeline) pushTcpDataToBuffer(data sensorData) { - if p.syn.tcpSerialDelayMs > 0 { - time.Sleep(time.Duration(p.syn.tcpSerialDelayMs) * time.Millisecond) - } - p.agr.tcpMutex.Lock() - p.agr.tcpSensorData = p.agr.tcpSensorData.ConsolidateExTime(data) - p.agr.tcpMutex.Unlock() + if p.synchroniz.tcpSerialDelayMs > 0 { + time.Sleep(time.Duration(p.synchroniz.tcpSerialDelayMs) * time.Millisecond) + } + p.buffer.tcpMutex.Lock() + p.buffer.MeasTcp = p.buffer.MeasTcp.ConsolidateExTime(data) + p.buffer.tcpMutex.Unlock() } func (p *pipeline) pushSerialDataToBuffer(data sensorData) { - if p.syn.tcpSerialDelayMs < 0 { - time.Sleep(time.Duration(-p.syn.tcpSerialDelayMs) * time.Millisecond) - } - p.agr.serialMutex.Lock() - p.agr.serialSensorData = p.agr.serialSensorData.ConsolidateEpochsOnly(data) - p.agr.serialMutex.Unlock() + if p.synchroniz.tcpSerialDelayMs < 0 { + time.Sleep(time.Duration(-p.synchroniz.tcpSerialDelayMs) * time.Millisecond) + } + p.buffer.serialMutex.Lock() + p.buffer.MeasSerial = p.buffer.MeasSerial.ConsolidateEpochsOnly(data) + p.buffer.serialMutex.Unlock() } func (p *pipeline) Close() { - p.active = false + p.active = false } diff --git a/go.mod b/go.mod index a034228..201ab81 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.15 require ( github.com/gofiber/fiber/v2 v2.2.4 github.com/gofiber/websocket/v2 v2.0.2 + github.com/google/go-cmp v0.3.0 github.com/google/uuid v1.1.2 github.com/gorilla/websocket v1.4.2 github.com/spf13/viper v1.7.1 diff --git a/go.sum b/go.sum index 321eef8..de176c1 100644 --- a/go.sum +++ b/go.sum @@ -74,6 +74,7 @@ github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=