gyrogpsc/core/pipeline.go

173 lines
4.3 KiB
Go

package core
import (
"encoding/json"
"errors"
"fmt"
"log"
"sync"
"time"
)
type Processor interface {
Process(data *Sensordata) error
}
type pipeline struct {
syn synchronizer
agr aggregator
pub Publisher
publishTicker *time.Ticker
}
func NewPipeline(d Publisher, publishIntervalMs int, delayUpdateIntervalMs int) *pipeline {
return &pipeline{
synchronizer{
//bufferSize: 100,
mutex: &sync.Mutex{},
updateTicker: time.NewTicker(time.Duration(delayUpdateIntervalMs) * time.Millisecond),
},
aggregator{
tcpMutex: &sync.Mutex{},
serialMutex: &sync.Mutex{},
},
d,
time.NewTicker(time.Duration(publishIntervalMs) * time.Millisecond),
}
}
func (p *pipeline) Run() {
go p.scheduleSynchronizer()
go func() {
for {
<-p.publishTicker.C
err := p.Publish()
if err != nil && err.Error() != "no data available" {
log.Println(err)
}
}
}()
log.Println("pipeline: processing service started")
}
func (p *pipeline) Publish() error {
p.agr.tcpMutex.Lock()
p.agr.serialMutex.Lock()
//log.Println(pub.tcpSensorData)
//log.Println(pub.serialSensorData)
if (p.agr.tcpSensorData == Sensordata{} && p.agr.serialSensorData == Sensordata{}) {
p.agr.tcpMutex.Unlock()
p.agr.serialMutex.Unlock()
return errors.New("no data available")
}
data := map[string]Sensordata{
string(SOURCE_TCP): p.agr.tcpSensorData,
string(SOURCE_SERIAL): p.agr.serialSensorData,
}
//p.agr.tcpSensorData = Sensordata{}
//p.agr.serialSensorData = Sensordata{}
p.agr.tcpMutex.Unlock()
p.agr.serialMutex.Unlock()
jdata, err := json.Marshal(data)
//log.Println(string(pretty.Pretty(jdata)))
if err != nil {
return err
}
p.pub.Publish(string(jdata))
return nil
}
type aggregator struct {
tcpSensorData Sensordata
serialSensorData Sensordata
tcpMutex *sync.Mutex
serialMutex *sync.Mutex
}
type UnixNanoTime int64
type synchronizer struct {
tcpSerialDelayMs int64
//tcpBuffer map[UnixNanoTime]Sensordata
//serialBuffer map[UnixNanoTime]Sensordata
//bufferSize int
mutex *sync.Mutex
updateTicker *time.Ticker
// should run concurrently
//
// Methods:
// pushSensordata(Sensordata), remove oldest if larger than bufferSize
// refreshDelay()
// Schedule()
}
func (p *pipeline) scheduleSynchronizer() {
log.Println("synchronizer: started")
for {
<-p.syn.updateTicker.C
err := p.refreshDelay()
if err != nil {
log.Println(err)
}
}
}
func (p *pipeline) refreshDelay() error {
log.Println("refreshing delay....")
fmt.Println("Delay TCP/SERIAL", p.syn.tcpSerialDelayMs)
p.agr.serialMutex.Lock()
p.agr.tcpMutex.Lock()
tcpTime := time.Unix(0, p.agr.tcpSensorData.Timestamp)
serTime := time.Unix(0, p.agr.serialSensorData.Timestamp)
p.agr.serialMutex.Unlock()
p.agr.tcpMutex.Unlock()
if tcpTime.UnixNano() == 0 || serTime.UnixNano() == 0 {
return errors.New("no sync possible. check if both collectors running. otherwise check GPS fix")
}
currentDelay := tcpTime.Sub(serTime).Milliseconds()
if currentDelay > 5000 || currentDelay < -5000 {
p.syn.tcpSerialDelayMs = 0
return errors.New("skipping synchronisation! time not properly configured or facing network problems.")
}
log.Println("TCP", tcpTime.String())
log.Println("SER", serTime.String())
log.Println("Difference", tcpTime.Sub(serTime).Milliseconds(), "ms")
delay := tcpTime.Sub(serTime).Milliseconds()
p.syn.tcpSerialDelayMs += delay
return nil
}
func (p *pipeline) Process(data *Sensordata) error {
if data == nil {
return errors.New("nil processing not allowed")
}
//log.Println(string(data.SourceId))
switch data.SourceId {
case SOURCE_TCP:
go p.pushTcpDataToBuffer(*data)
case SOURCE_SERIAL:
go p.pushSerialDataToBuffer(*data)
default:
panic("pipeline: invalid data source")
}
return nil
}
func (p *pipeline) pushTcpDataToBuffer(data Sensordata) {
if p.syn.tcpSerialDelayMs > 0 {
time.Sleep(time.Duration(p.syn.tcpSerialDelayMs) * time.Millisecond)
}
p.agr.tcpMutex.Lock()
p.agr.tcpSensorData = p.agr.tcpSensorData.ConsolidateExTime(data)
p.agr.tcpMutex.Unlock()
}
func (p *pipeline) pushSerialDataToBuffer(data Sensordata) {
if p.syn.tcpSerialDelayMs < 0 {
time.Sleep(time.Duration(-p.syn.tcpSerialDelayMs) * time.Millisecond)
}
p.agr.serialMutex.Lock()
p.agr.serialSensorData = p.agr.serialSensorData.ConsolidateEpochsOnly(data)
p.agr.serialMutex.Unlock()
}