181 lines
4.3 KiB
Go
181 lines
4.3 KiB
Go
package core
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type pipeline struct {
|
|
active bool
|
|
record bool
|
|
syn synchronizer
|
|
agr aggregator
|
|
pub Publisher
|
|
stor Storer
|
|
publishTicker *time.Ticker
|
|
}
|
|
|
|
// pipe implements Runner & Pusher
|
|
func NewPipeline(d Publisher, s Storer, conf *Configuration) *pipeline {
|
|
return &pipeline{
|
|
false,
|
|
false,
|
|
synchronizer{
|
|
//bufferSize: 100,
|
|
mutex: &sync.Mutex{},
|
|
updateTicker: time.NewTicker(time.Duration(conf.Pipeline.SyncUpdateIntervalMs) * time.Millisecond),
|
|
},
|
|
aggregator{
|
|
tcpMutex: &sync.Mutex{},
|
|
serialMutex: &sync.Mutex{},
|
|
},
|
|
d,
|
|
s,
|
|
time.NewTicker(time.Duration(conf.Pipeline.PublishIntervalMs) * time.Millisecond),
|
|
}
|
|
}
|
|
|
|
func (p *pipeline) Run() {
|
|
p.active = true
|
|
log.Println("pipe: processing service started")
|
|
go func() {
|
|
for p.active {
|
|
<-p.syn.updateTicker.C
|
|
err := p.refreshDelay()
|
|
if err != nil {
|
|
log.Println(err)
|
|
}
|
|
}
|
|
log.Println("pipe: updater stopped")
|
|
}()
|
|
go func() {
|
|
for p.active {
|
|
<-p.publishTicker.C
|
|
err := p.Publish()
|
|
if err != nil && err.Error() != "no data available" {
|
|
log.Println(err)
|
|
}
|
|
}
|
|
log.Println("pipe: publisher stopped")
|
|
}()
|
|
}
|
|
|
|
func (p *pipeline) Record() {
|
|
p.record = true
|
|
}
|
|
func (p *pipeline) Stop() {
|
|
p.record = false
|
|
}
|
|
|
|
func (p *pipeline) Publish() error {
|
|
p.agr.tcpMutex.Lock()
|
|
p.agr.serialMutex.Lock()
|
|
|
|
if (p.agr.tcpSensorData == sensorData{} && p.agr.serialSensorData == sensorData{}) {
|
|
p.agr.tcpMutex.Unlock()
|
|
p.agr.serialMutex.Unlock()
|
|
return errors.New("no data available")
|
|
}
|
|
|
|
p.stor.EnqueuePair(p.agr.tcpSensorData, p.agr.serialSensorData)
|
|
|
|
data := map[string]sensorData{
|
|
string(SOURCE_TCP): p.agr.tcpSensorData,
|
|
string(SOURCE_SERIAL): p.agr.serialSensorData,
|
|
}
|
|
|
|
p.agr.tcpMutex.Unlock()
|
|
p.agr.serialMutex.Unlock()
|
|
|
|
jdata, err := json.Marshal(data)
|
|
//log.Println(string(pretty.Pretty(jdata)))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
p.pub.Publish(string(jdata))
|
|
return nil
|
|
}
|
|
|
|
type aggregator struct {
|
|
tcpSensorData sensorData
|
|
serialSensorData sensorData
|
|
tcpMutex *sync.Mutex
|
|
serialMutex *sync.Mutex
|
|
}
|
|
|
|
type UnixNanoTime int64
|
|
|
|
type synchronizer struct {
|
|
tcpSerialDelayMs int64
|
|
mutex *sync.Mutex
|
|
updateTicker *time.Ticker
|
|
}
|
|
|
|
func (p *pipeline) refreshDelay() error {
|
|
log.Println("refreshing delay....")
|
|
fmt.Println("Delay TCP/SERIAL", p.syn.tcpSerialDelayMs)
|
|
p.agr.serialMutex.Lock()
|
|
p.agr.tcpMutex.Lock()
|
|
tcpTime := time.Unix(0, p.agr.tcpSensorData.Timestamp)
|
|
serTime := time.Unix(0, p.agr.serialSensorData.Timestamp)
|
|
p.agr.serialMutex.Unlock()
|
|
p.agr.tcpMutex.Unlock()
|
|
if tcpTime.UnixNano() == 0 || serTime.UnixNano() == 0 {
|
|
return errors.New("no sync possible. check if both collectors running. otherwise check GPS fix")
|
|
}
|
|
currentDelay := tcpTime.Sub(serTime).Milliseconds()
|
|
if currentDelay > 5000 || currentDelay < -5000 {
|
|
p.syn.tcpSerialDelayMs = 0
|
|
return errors.New("skipping synchronisation! time not properly configured or facing network problems.")
|
|
}
|
|
log.Println("TCP", tcpTime.String())
|
|
log.Println("SER", serTime.String())
|
|
log.Println("Difference", tcpTime.Sub(serTime).Milliseconds(), "ms")
|
|
delay := tcpTime.Sub(serTime).Milliseconds()
|
|
p.syn.tcpSerialDelayMs += delay
|
|
return nil
|
|
}
|
|
|
|
func (p *pipeline) Push(data *sensorData) error {
|
|
if data == nil {
|
|
return errors.New("nil processing not allowed")
|
|
}
|
|
//log.Println(string(data.source))
|
|
// TODO: persist data here with current timestamp
|
|
p.stor.EnqueueRaw(*data)
|
|
switch data.source {
|
|
case SOURCE_TCP:
|
|
go p.pushTcpDataToBuffer(*data)
|
|
case SOURCE_SERIAL:
|
|
go p.pushSerialDataToBuffer(*data)
|
|
default:
|
|
panic("pipe: invalid data source")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *pipeline) pushTcpDataToBuffer(data sensorData) {
|
|
if p.syn.tcpSerialDelayMs > 0 {
|
|
time.Sleep(time.Duration(p.syn.tcpSerialDelayMs) * time.Millisecond)
|
|
}
|
|
p.agr.tcpMutex.Lock()
|
|
p.agr.tcpSensorData = p.agr.tcpSensorData.ConsolidateExTime(data)
|
|
p.agr.tcpMutex.Unlock()
|
|
}
|
|
func (p *pipeline) pushSerialDataToBuffer(data sensorData) {
|
|
if p.syn.tcpSerialDelayMs < 0 {
|
|
time.Sleep(time.Duration(-p.syn.tcpSerialDelayMs) * time.Millisecond)
|
|
}
|
|
p.agr.serialMutex.Lock()
|
|
p.agr.serialSensorData = p.agr.serialSensorData.ConsolidateEpochsOnly(data)
|
|
p.agr.serialMutex.Unlock()
|
|
}
|
|
|
|
func (p *pipeline) Close() {
|
|
p.active = false
|
|
}
|