gyrogpsc/core/pipeline.go

240 lines
7.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package core
import (
"context"
"encoding/json"
"errors"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
"sync"
"time"
)
type pipeline struct {
active bool
record bool
synchroniz synchronizer
buffer pipeBuffer
publisher Publisher
storer Tracker
publishTicker *time.Ticker
mu sync.RWMutex
sema *semaphore.Weighted
}
// pipeline implements Runner & Processor
func NewPipeline(d Publisher, s Tracker, conf *Configuration) *pipeline {
return &pipeline{
false,
false,
synchronizer{
//bufferSize: 100,
mutex: &sync.RWMutex{},
updateTicker: time.NewTicker(time.Duration(conf.Pipeline.SyncUpdateIntervalMs) * time.Millisecond),
},
pipeBuffer{
tcpMutex: &sync.Mutex{},
serialMutex: &sync.Mutex{},
},
d,
s,
time.NewTicker(time.Duration(conf.Pipeline.PublishIntervalMs) * time.Millisecond),
sync.RWMutex{},
semaphore.NewWeighted(2),
}
}
func (p *pipeline) isPipeActive() bool {
p.mu.RLock()
defer p.mu.RUnlock()
return p.active
}
func (p *pipeline) isPipeRecording() bool {
p.mu.RLock()
defer p.mu.RUnlock()
return p.record
}
func (p *pipeline) Run() {
p.sema.Acquire(context.Background(), 1) // !!! n=2 wenn synchronizer mitläuft
p.mu.Lock()
p.active = true
p.mu.Unlock()
logrus.Println("pipeline: processing service started")
//go func() {
// for p.isPipeActive() {
// <-p.synchroniz.updateTicker.C
// err := p.refreshDelay()
// if err != nil {
// logrus.Debugln(err)
// }
// }
// p.sema.Release(1)
// logrus.Println("pipeline: updater stopped")
//}()
go func() {
for p.isPipeActive() {
<-p.publishTicker.C
err := p.publish()
if err != nil && err.Error() != "no data available" {
logrus.Trace(err)
}
}
p.sema.Release(1)
logrus.Println("pipeline: publisher stopped")
}()
}
func (p *pipeline) Record() {
p.record = true
}
func (p *pipeline) StopRecord() {
p.record = false
}
func (p *pipeline) Push(data *SensorData) error {
if (data == nil || *data == SensorData{}) {
return errors.New("no data")
}
//logrus.Println("push data to pipeline:", string(data.source))
switch data.source {
case SOURCE_TCP:
go p.pushTcpDataToBuffer(*data)
case SOURCE_SERIAL:
go p.pushSerialDataToBuffer(*data)
default:
panic("pipeline: invalid data source")
}
return nil
}
func (p *pipeline) publish() error {
p.buffer.serialMutex.Lock()
p.buffer.tcpMutex.Lock()
if (p.buffer.MeasTcp == SensorData{} && p.buffer.MeasSerial == SensorData{}) {
p.buffer.tcpMutex.Unlock()
p.buffer.serialMutex.Unlock()
return errors.New("no data available")
}
if cmp.Equal(p.buffer.MeasTcp, p.buffer.LastMeasTcp, cmpopts.IgnoreUnexported(SensorData{})) &&
cmp.Equal(p.buffer.MeasSerial, p.buffer.LastMeasSerial, cmpopts.IgnoreUnexported(SensorData{})) {
p.buffer.tcpMutex.Unlock()
p.buffer.serialMutex.Unlock()
return errors.New("same data")
}
logrus.Debugf("")
logrus.Tracef("SER old: %-40s %-40s %v %v", p.buffer.LastMeasSerial.Timestamp.Format(time.RFC3339Nano), p.buffer.LastMeasSerial.Servertime.Format(time.RFC3339Nano), p.buffer.LastMeasSerial.Position, p.buffer.LastMeasSerial.Orientation)
logrus.Debugf("SER new: %-40s %-40s %v %v", p.buffer.MeasSerial.Timestamp.Format(time.RFC3339Nano), p.buffer.MeasSerial.Servertime.Format(time.RFC3339Nano), p.buffer.MeasSerial.Position, p.buffer.MeasSerial.Orientation)
logrus.Tracef("TCP old: %-40s %-40s %v %v", p.buffer.LastMeasTcp.Timestamp.Format(time.RFC3339Nano), p.buffer.LastMeasTcp.Servertime.Format(time.RFC3339Nano), p.buffer.LastMeasTcp.Position, p.buffer.LastMeasTcp.Orientation)
logrus.Debugf("TCP new: %-40s %-40s %v %v", p.buffer.MeasTcp.Timestamp.Format(time.RFC3339Nano), p.buffer.MeasTcp.Servertime.Format(time.RFC3339Nano), p.buffer.MeasTcp.Position, p.buffer.MeasTcp.Orientation)
p.buffer.LastMeasTcp = p.buffer.MeasTcp
p.buffer.LastMeasSerial = p.buffer.MeasSerial
data := map[string]interface{}{}
if p.buffer.MeasTcp.source == SOURCE_TCP {
data[string(SOURCE_TCP)] = p.buffer.MeasTcp
}
if p.buffer.MeasSerial.source == SOURCE_SERIAL {
data[string(SOURCE_SERIAL)] = p.buffer.MeasSerial
}
p.buffer.tcpMutex.Unlock()
p.buffer.serialMutex.Unlock()
jdata, err := json.Marshal(data)
//logrus.Println(string(pretty.Pretty(jdata)))
if err != nil {
return err
}
p.publisher.Publish(string(jdata))
return nil
}
type pipeBuffer struct {
MeasTcp SensorData
MeasSerial SensorData
LastMeasTcp SensorData
LastMeasSerial SensorData
tcpMutex *sync.Mutex
serialMutex *sync.Mutex
}
type UnixNanoTime int64
type synchronizer struct {
tcpSerialDelayMs int64
mutex *sync.RWMutex
updateTicker *time.Ticker
}
func (p *pipeline) refreshDelay() error {
logrus.Debugf("refreshing delay...")
p.buffer.serialMutex.Lock()
p.buffer.tcpMutex.Lock()
tcpTime := p.buffer.MeasTcp.Timestamp
serTime := p.buffer.MeasSerial.Timestamp
p.buffer.tcpMutex.Unlock()
p.buffer.serialMutex.Unlock()
if tcpTime.IsZero() || serTime.IsZero() || tcpTime.UnixNano() == 0 || serTime.UnixNano() == 0 {
return errors.New("sync not possible. zero time value detected")
}
logrus.Debug("TCP", tcpTime.Format(time.RFC3339Nano))
logrus.Debug("SER", serTime.Format(time.RFC3339Nano))
currentDelay := tcpTime.Sub(serTime).Milliseconds()
p.synchroniz.mutex.Lock()
defer p.synchroniz.mutex.Unlock()
logrus.Debugf("old delay-> %vms...", p.synchroniz.tcpSerialDelayMs)
if currentDelay > 5000 || currentDelay < -5000 {
p.synchroniz.tcpSerialDelayMs = 0
return errors.New("skipping synchronisation! time not properly configured or facing network problems.")
}
p.synchroniz.tcpSerialDelayMs += currentDelay
logrus.Infof("new delay-> %vms", p.synchroniz.tcpSerialDelayMs)
return nil
}
func (p *pipeline) pushTcpDataToBuffer(data SensorData) {
data.Servertime = time.Now().UTC()
if p.isPipeRecording() {
p.storer.Put(data)
}
p.synchroniz.mutex.RLock()
if p.synchroniz.tcpSerialDelayMs > 0 {
time.Sleep(time.Duration(p.synchroniz.tcpSerialDelayMs) * time.Millisecond)
}
p.synchroniz.mutex.RUnlock()
p.buffer.tcpMutex.Lock()
p.buffer.MeasTcp = data
//p.buffer.MeasTcp = p.buffer.MeasTcp.ConsolidateExTime(data)
p.buffer.tcpMutex.Unlock()
}
func (p *pipeline) pushSerialDataToBuffer(data SensorData) {
data.Servertime = time.Now().UTC()
if p.isPipeRecording() {
p.storer.Put(data)
}
p.synchroniz.mutex.RLock()
if p.synchroniz.tcpSerialDelayMs < 0 {
time.Sleep(time.Duration(-p.synchroniz.tcpSerialDelayMs) * time.Millisecond)
}
p.synchroniz.mutex.RUnlock()
p.buffer.serialMutex.Lock()
p.buffer.MeasSerial = data
//p.buffer.MeasSerial = p.buffer.MeasSerial.ConsolidateEpochsOnly(data)
p.buffer.serialMutex.Unlock()
}
func (p *pipeline) Close() {
p.mu.Lock()
p.active = false
p.mu.Unlock()
}