small changes in sensordata logic

This commit is contained in:
Timo Volkmann 2020-12-10 09:43:44 +01:00
parent f5f98ced5b
commit b03d7fcfa2
2 changed files with 30 additions and 32 deletions

View File

@ -90,12 +90,12 @@ type aggregator struct {
type UnixNanoTime int64
type synchronizer struct {
tcpSerialDelayMs int64
tcpSerialDelayMs int64
//tcpBuffer map[UnixNanoTime]Sensordata
//serialBuffer map[UnixNanoTime]Sensordata
//bufferSize int
mutex *sync.Mutex
updateTicker *time.Ticker
mutex *sync.Mutex
updateTicker *time.Ticker
// should run concurrently
//
// Methods:
@ -146,7 +146,7 @@ func (p *pipeline) Process(data *Sensordata) error {
case SOURCE_SERIAL:
go p.pushSerialDataToBuffer(*data)
default:
return errors.New("invalid data source")
panic("pipeline: invalid data source")
}
return nil
}
@ -156,7 +156,7 @@ func (p *pipeline) pushTcpDataToBuffer(data Sensordata) {
time.Sleep(time.Duration(p.syn.tcpSerialDelayMs) * time.Millisecond)
}
p.agr.tcpMutex.Lock()
p.agr.tcpSensorData = p.agr.tcpSensorData.Consolidate2(data)
p.agr.tcpSensorData = p.agr.tcpSensorData.ConsolidateExTime(data)
p.agr.tcpMutex.Unlock()
}
func (p *pipeline) pushSerialDataToBuffer(data Sensordata) {
@ -164,6 +164,6 @@ func (p *pipeline) pushSerialDataToBuffer(data Sensordata) {
time.Sleep(time.Duration(-p.syn.tcpSerialDelayMs) * time.Millisecond)
}
p.agr.serialMutex.Lock()
p.agr.serialSensorData = p.agr.serialSensorData.Consolidate2(data)
p.agr.serialSensorData = p.agr.serialSensorData.ConsolidateEpochsOnly(data)
p.agr.serialMutex.Unlock()
}

View File

@ -31,40 +31,30 @@ func (s Sensordata) isSameEpoch(n Sensordata) bool {
return s.itow == n.itow
}
func (s Sensordata) Consolidate(n Sensordata) Sensordata {
if (s.SourceId != n.SourceId && s != Sensordata{}) {
log.Println(s)
log.Println(n)
log.Fatalln("Do not consolidate Sensordata from different Sources")
}
// Consolidates two sensordata elements if they are in the same epoch
func (s Sensordata) ConsolidateEpochsOnly(n Sensordata) Sensordata {
s.checkSources(&n)
if s.isSameEpoch(n) {
null := Sensordata{}
//if s.Timestamp == null.Timestamp { s.Timestamp = n.Timestamp }
//if s.Position == null.Position { s.Position = n.Position }
//if s.Orientation == null.Orientation { s.Orientation = n.Orientation }
if n.Timestamp != null.Timestamp && s.Timestamp != n.Timestamp {
s.Timestamp = n.Timestamp
}
if n.Position != null.Position && s.Position != n.Position {
s.Position = n.Position
}
if n.Orientation != null.Orientation && s.Orientation != n.Orientation {
s.Orientation = n.Orientation
}
return s
if n.Timestamp == null.Timestamp {
n.Timestamp = s.Timestamp
}
if n.Position == null.Position {
n.Position = s.Position
}
if n.Orientation == null.Orientation {
n.Orientation = s.Orientation
}
}
return n
}
func (s Sensordata) Consolidate2(n Sensordata) Sensordata {
// Consolidates two sensordata elements but ignores timestamps
func (s Sensordata) ConsolidateExTime(n Sensordata) Sensordata {
s.checkSources(&n)
null := Sensordata{}
//if s.Timestamp == null.Timestamp { s.Timestamp = n.Timestamp }
//if s.Position == null.Position { s.Position = n.Position }
//if s.Orientation == null.Orientation { s.Orientation = n.Orientation }
//if n.Timestamp == null.Timestamp {
// n.Timestamp = s.Timestamp
//}
if n.Position == null.Position {
n.Position = s.Position
}
@ -74,6 +64,14 @@ func (s Sensordata) Consolidate2(n Sensordata) Sensordata {
return n
}
func (s *Sensordata) checkSources(n *Sensordata) {
if (s.SourceId != n.SourceId && *s != Sensordata{}) {
log.Println(s)
log.Println(n)
log.Fatalln("Do not consolidate Sensordata from different Sources")
}
}
var (
errNotImplemented = errors.New("message not implemented")
errRawMessage = errors.New("raw message")