From b03d7fcfa26e34a6380f31436099e3a1b42e7858 Mon Sep 17 00:00:00 2001 From: Timo Volkmann Date: Thu, 10 Dec 2020 09:43:44 +0100 Subject: [PATCH] small changes in sensordata logic --- core/pipeline.go | 12 +++++------ core/sensordata.go | 50 ++++++++++++++++++++++------------------------ 2 files changed, 30 insertions(+), 32 deletions(-) diff --git a/core/pipeline.go b/core/pipeline.go index 4be7863..c677c5f 100644 --- a/core/pipeline.go +++ b/core/pipeline.go @@ -90,12 +90,12 @@ type aggregator struct { type UnixNanoTime int64 type synchronizer struct { - tcpSerialDelayMs int64 + tcpSerialDelayMs int64 //tcpBuffer map[UnixNanoTime]Sensordata //serialBuffer map[UnixNanoTime]Sensordata //bufferSize int - mutex *sync.Mutex - updateTicker *time.Ticker + mutex *sync.Mutex + updateTicker *time.Ticker // should run concurrently // // Methods: @@ -146,7 +146,7 @@ func (p *pipeline) Process(data *Sensordata) error { case SOURCE_SERIAL: go p.pushSerialDataToBuffer(*data) default: - return errors.New("invalid data source") + panic("pipeline: invalid data source") } return nil } @@ -156,7 +156,7 @@ func (p *pipeline) pushTcpDataToBuffer(data Sensordata) { time.Sleep(time.Duration(p.syn.tcpSerialDelayMs) * time.Millisecond) } p.agr.tcpMutex.Lock() - p.agr.tcpSensorData = p.agr.tcpSensorData.Consolidate2(data) + p.agr.tcpSensorData = p.agr.tcpSensorData.ConsolidateExTime(data) p.agr.tcpMutex.Unlock() } func (p *pipeline) pushSerialDataToBuffer(data Sensordata) { @@ -164,6 +164,6 @@ func (p *pipeline) pushSerialDataToBuffer(data Sensordata) { time.Sleep(time.Duration(-p.syn.tcpSerialDelayMs) * time.Millisecond) } p.agr.serialMutex.Lock() - p.agr.serialSensorData = p.agr.serialSensorData.Consolidate2(data) + p.agr.serialSensorData = p.agr.serialSensorData.ConsolidateEpochsOnly(data) p.agr.serialMutex.Unlock() } diff --git a/core/sensordata.go b/core/sensordata.go index a2cb8cc..c9a9d85 100644 --- a/core/sensordata.go +++ b/core/sensordata.go @@ -31,40 +31,30 @@ func (s Sensordata) isSameEpoch(n Sensordata) bool { return s.itow == n.itow } -func (s Sensordata) Consolidate(n Sensordata) Sensordata { - if (s.SourceId != n.SourceId && s != Sensordata{}) { - log.Println(s) - log.Println(n) - log.Fatalln("Do not consolidate Sensordata from different Sources") - } +// Consolidates two sensordata elements if they are in the same epoch +func (s Sensordata) ConsolidateEpochsOnly(n Sensordata) Sensordata { + s.checkSources(&n) if s.isSameEpoch(n) { null := Sensordata{} - //if s.Timestamp == null.Timestamp { s.Timestamp = n.Timestamp } - //if s.Position == null.Position { s.Position = n.Position } - //if s.Orientation == null.Orientation { s.Orientation = n.Orientation } - if n.Timestamp != null.Timestamp && s.Timestamp != n.Timestamp { - s.Timestamp = n.Timestamp - } - if n.Position != null.Position && s.Position != n.Position { - s.Position = n.Position - } - if n.Orientation != null.Orientation && s.Orientation != n.Orientation { - s.Orientation = n.Orientation - } - return s + if n.Timestamp == null.Timestamp { + n.Timestamp = s.Timestamp + } + if n.Position == null.Position { + n.Position = s.Position + } + if n.Orientation == null.Orientation { + n.Orientation = s.Orientation + } } return n } -func (s Sensordata) Consolidate2(n Sensordata) Sensordata { +// Consolidates two sensordata elements but ignores timestamps +func (s Sensordata) ConsolidateExTime(n Sensordata) Sensordata { + s.checkSources(&n) null := Sensordata{} - //if s.Timestamp == null.Timestamp { s.Timestamp = n.Timestamp } - //if s.Position == null.Position { s.Position = n.Position } - //if s.Orientation == null.Orientation { s.Orientation = n.Orientation } - //if n.Timestamp == null.Timestamp { - // n.Timestamp = s.Timestamp - //} + if n.Position == null.Position { n.Position = s.Position } @@ -74,6 +64,14 @@ func (s Sensordata) Consolidate2(n Sensordata) Sensordata { return n } +func (s *Sensordata) checkSources(n *Sensordata) { + if (s.SourceId != n.SourceId && *s != Sensordata{}) { + log.Println(s) + log.Println(n) + log.Fatalln("Do not consolidate Sensordata from different Sources") + } +} + var ( errNotImplemented = errors.New("message not implemented") errRawMessage = errors.New("raw message")