gyrogpsc/core/pipeline.go

226 lines
6.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package core
import (
"context"
"encoding/json"
"errors"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
"sync"
"time"
)
type pipeline struct {
active bool
record bool
synchroniz synchronizer
buffer pipeBuffer
publisher Publisher
storer Storer
publishTicker *time.Ticker
mu sync.RWMutex
sema *semaphore.Weighted
}
// pipe implements Runner & Pusher
func NewPipeline(d Publisher, s Storer, conf *Configuration) *pipeline {
return &pipeline{
false,
false,
synchronizer{
//bufferSize: 100,
mutex: &sync.RWMutex{},
updateTicker: time.NewTicker(time.Duration(conf.Pipeline.SyncUpdateIntervalMs) * time.Millisecond),
},
pipeBuffer{
tcpMutex: &sync.Mutex{},
serialMutex: &sync.Mutex{},
},
d,
s,
time.NewTicker(time.Duration(conf.Pipeline.PublishIntervalMs) * time.Millisecond),
sync.RWMutex{},
semaphore.NewWeighted(2),
}
}
func (p *pipeline) isPipeActive() bool {
p.mu.RLock()
defer p.mu.RUnlock()
return p.active
}
func (p *pipeline) Run() {
p.sema.Acquire(context.Background(), 2)
p.mu.Lock()
p.active = true
p.mu.Unlock()
logrus.Println("pipe: processing service started")
go func() {
for p.isPipeActive() {
<-p.synchroniz.updateTicker.C
err := p.refreshDelay()
if err != nil {
logrus.Debugln(err)
}
}
p.sema.Release(1)
logrus.Println("pipe: updater stopped")
}()
go func() {
for p.isPipeActive() {
<-p.publishTicker.C
err := p.publish()
if err != nil && err.Error() != "no data available" {
logrus.Debug(err)
}
}
p.sema.Release(1)
logrus.Println("pipe: publisher stopped")
}()
}
func (p *pipeline) Record() {
p.record = true
}
func (p *pipeline) StopRecord() {
p.record = false
}
func (p *pipeline) publish() error {
p.buffer.tcpMutex.Lock()
p.buffer.serialMutex.Lock()
if (p.buffer.MeasTcp == sensorData{} && p.buffer.MeasSerial == sensorData{}) {
p.buffer.tcpMutex.Unlock()
p.buffer.serialMutex.Unlock()
return errors.New("no data available")
}
if cmp.Equal(p.buffer.MeasTcp, p.buffer.LastMeasTcp, cmpopts.IgnoreUnexported(sensorData{})) &&
cmp.Equal(p.buffer.MeasSerial, p.buffer.LastMeasSerial, cmpopts.IgnoreUnexported(sensorData{})) {
p.buffer.tcpMutex.Unlock()
p.buffer.serialMutex.Unlock()
return errors.New("same data")
}
logrus.Debug("")
logrus.Debugf("SER old: %v", p.buffer.LastMeasSerial)
logrus.Debugf("SER new: %v", p.buffer.MeasSerial)
logrus.Debugf("TCP old: %v", p.buffer.LastMeasTcp)
logrus.Debugf("TCP new: %v", p.buffer.MeasTcp)
logrus.Debug("")
p.buffer.LastMeasTcp = p.buffer.MeasTcp
p.buffer.LastMeasSerial = p.buffer.MeasSerial
p.storer.EnqueuePair(p.buffer.MeasTcp, p.buffer.MeasSerial)
data := map[string]sensorData{
string(SOURCE_TCP): p.buffer.MeasTcp,
string(SOURCE_SERIAL): p.buffer.MeasSerial,
}
p.buffer.tcpMutex.Unlock()
p.buffer.serialMutex.Unlock()
jdata, err := json.Marshal(data)
//logrus.Println(string(pretty.Pretty(jdata)))
if err != nil {
return err
}
p.publisher.Publish(string(jdata))
return nil
}
type pipeBuffer struct {
MeasTcp sensorData
MeasSerial sensorData
LastMeasTcp sensorData
LastMeasSerial sensorData
tcpMutex *sync.Mutex
serialMutex *sync.Mutex
}
type UnixNanoTime int64
type synchronizer struct {
tcpSerialDelayMs int64
mutex *sync.RWMutex
updateTicker *time.Ticker
}
func (p *pipeline) refreshDelay() error {
p.synchroniz.mutex.RLock()
if p.synchroniz.tcpSerialDelayMs != 0 {
logrus.Println("Delay TCP/SERIAL", p.synchroniz.tcpSerialDelayMs)
}
p.synchroniz.mutex.RUnlock()
p.buffer.serialMutex.Lock()
p.buffer.tcpMutex.Lock()
tcpTime := time.Unix(0, p.buffer.MeasTcp.Timestamp)
serTime := time.Unix(0, p.buffer.MeasSerial.Timestamp)
p.buffer.tcpMutex.Unlock()
p.buffer.serialMutex.Unlock()
if tcpTime.UnixNano() == 0 || serTime.UnixNano() == 0 {
return errors.New("no sync possible. check if both Collectors running. otherwise check GPS fix")
}
currentDelay := tcpTime.Sub(serTime).Milliseconds()
if currentDelay > 5000 || currentDelay < -5000 {
p.synchroniz.mutex.Lock()
p.synchroniz.tcpSerialDelayMs = 0
p.synchroniz.mutex.Unlock()
return errors.New("skipping synchronisation! time not properly configured or facing network problems.")
}
logrus.Debug("TCP", tcpTime.String())
logrus.Debug("SER", serTime.String())
logrus.Debug("Difference", tcpTime.Sub(serTime).Milliseconds(), "ms")
delay := tcpTime.Sub(serTime).Milliseconds()
p.synchroniz.mutex.Lock()
p.synchroniz.tcpSerialDelayMs += delay
p.synchroniz.mutex.Unlock()
return nil
}
func (p *pipeline) Push(data *sensorData) error {
if data == nil {
return errors.New("nil processing not allowed")
}
//logrus.Println("push data to pipe:", string(data.source))
p.storer.EnqueueRaw(*data)
switch data.source {
case SOURCE_TCP:
go p.pushTcpDataToBuffer(*data)
case SOURCE_SERIAL:
go p.pushSerialDataToBuffer(*data)
default:
panic("pipe: invalid data source")
}
return nil
}
func (p *pipeline) pushTcpDataToBuffer(data sensorData) {
p.synchroniz.mutex.RLock()
if p.synchroniz.tcpSerialDelayMs > 0 {
time.Sleep(time.Duration(p.synchroniz.tcpSerialDelayMs) * time.Millisecond)
}
p.synchroniz.mutex.RLock()
p.buffer.tcpMutex.Lock()
p.buffer.MeasTcp = p.buffer.MeasTcp.ConsolidateExTime(data)
p.buffer.tcpMutex.Unlock()
}
func (p *pipeline) pushSerialDataToBuffer(data sensorData) {
p.synchroniz.mutex.RLock()
if p.synchroniz.tcpSerialDelayMs < 0 {
time.Sleep(time.Duration(-p.synchroniz.tcpSerialDelayMs) * time.Millisecond)
}
p.synchroniz.mutex.RUnlock()
p.buffer.serialMutex.Lock()
p.buffer.MeasSerial = p.buffer.MeasSerial.ConsolidateEpochsOnly(data)
p.buffer.serialMutex.Unlock()
}
func (p *pipeline) Close() {
p.mu.Lock()
p.active = false
p.mu.Unlock()
}