package core import ( "encoding/json" "errors" "fmt" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "log" "sync" "time" ) type pipeline struct { active bool record bool synchroniz synchronizer buffer pipeBuffer publisher Publisher storer Storer publishTicker *time.Ticker } // pipe implements Runner & Pusher func NewPipeline(d Publisher, s Storer, conf *Configuration) *pipeline { return &pipeline{ false, false, synchronizer{ //bufferSize: 100, mutex: &sync.Mutex{}, updateTicker: time.NewTicker(time.Duration(conf.Pipeline.SyncUpdateIntervalMs) * time.Millisecond), }, pipeBuffer{ tcpMutex: &sync.Mutex{}, serialMutex: &sync.Mutex{}, }, d, s, time.NewTicker(time.Duration(conf.Pipeline.PublishIntervalMs) * time.Millisecond), } } func (p *pipeline) Run() { p.active = true log.Println("pipe: processing service started") go func() { for p.active { <-p.synchroniz.updateTicker.C err := p.refreshDelay() if err != nil { log.Println(err) } } log.Println("pipe: updater stopped") }() go func() { for p.active { <-p.publishTicker.C err := p.publish() if err != nil && err.Error() != "no data available" { log.Println(err) } } log.Println("pipe: publisher stopped") }() } func (p *pipeline) Record() { p.record = true } func (p *pipeline) Stop() { p.record = false } func (p *pipeline) publish() error { p.buffer.tcpMutex.Lock() p.buffer.serialMutex.Lock() if (p.buffer.MeasTcp == sensorData{} && p.buffer.MeasSerial == sensorData{}) { p.buffer.tcpMutex.Unlock() p.buffer.serialMutex.Unlock() return errors.New("no data available") } if cmp.Equal(p.buffer.MeasTcp, p.buffer.LastMeasTcp, cmpopts.IgnoreUnexported(sensorData{})) && cmp.Equal(p.buffer.MeasSerial, p.buffer.LastMeasSerial, cmpopts.IgnoreUnexported(sensorData{})) { p.buffer.tcpMutex.Unlock() p.buffer.serialMutex.Unlock() return errors.New("same data") } log.Println("––––––––––––––––––––––––––––––––––––") log.Printf("MEAS old: %v", p.buffer.LastMeasTcp) log.Printf("MEAS new: %v", p.buffer.MeasTcp) log.Println("––––––––––––––––––––––––––––––––––––") p.buffer.LastMeasTcp = p.buffer.MeasTcp p.buffer.LastMeasSerial = p.buffer.MeasSerial p.storer.EnqueuePair(p.buffer.MeasTcp, p.buffer.MeasSerial) data := map[string]sensorData{ string(SOURCE_TCP): p.buffer.MeasTcp, string(SOURCE_SERIAL): p.buffer.MeasSerial, } p.buffer.tcpMutex.Unlock() p.buffer.serialMutex.Unlock() jdata, err := json.Marshal(data) //log.Println(string(pretty.Pretty(jdata))) if err != nil { return err } p.publisher.Publish(string(jdata)) return nil } type pipeBuffer struct { MeasTcp sensorData MeasSerial sensorData LastMeasTcp sensorData LastMeasSerial sensorData tcpMutex *sync.Mutex serialMutex *sync.Mutex } type UnixNanoTime int64 type synchronizer struct { tcpSerialDelayMs int64 mutex *sync.Mutex updateTicker *time.Ticker } func (p *pipeline) refreshDelay() error { log.Println("refreshing delay....") fmt.Println("Delay TCP/SERIAL", p.synchroniz.tcpSerialDelayMs) p.buffer.serialMutex.Lock() p.buffer.tcpMutex.Lock() tcpTime := time.Unix(0, p.buffer.MeasTcp.Timestamp) serTime := time.Unix(0, p.buffer.MeasSerial.Timestamp) p.buffer.serialMutex.Unlock() p.buffer.tcpMutex.Unlock() if tcpTime.UnixNano() == 0 || serTime.UnixNano() == 0 { return errors.New("no sync possible. check if both collectors running. otherwise check GPS fix") } currentDelay := tcpTime.Sub(serTime).Milliseconds() if currentDelay > 5000 || currentDelay < -5000 { p.synchroniz.tcpSerialDelayMs = 0 return errors.New("skipping synchronisation! time not properly configured or facing network problems.") } log.Println("TCP", tcpTime.String()) log.Println("SER", serTime.String()) log.Println("Difference", tcpTime.Sub(serTime).Milliseconds(), "ms") delay := tcpTime.Sub(serTime).Milliseconds() p.synchroniz.tcpSerialDelayMs += delay return nil } func (p *pipeline) Push(data *sensorData) error { if data == nil { return errors.New("nil processing not allowed") } //log.Println("push data to pipe:", string(data.source)) p.storer.EnqueueRaw(*data) switch data.source { case SOURCE_TCP: go p.pushTcpDataToBuffer(*data) case SOURCE_SERIAL: go p.pushSerialDataToBuffer(*data) default: panic("pipe: invalid data source") } return nil } func (p *pipeline) pushTcpDataToBuffer(data sensorData) { if p.synchroniz.tcpSerialDelayMs > 0 { time.Sleep(time.Duration(p.synchroniz.tcpSerialDelayMs) * time.Millisecond) } p.buffer.tcpMutex.Lock() p.buffer.MeasTcp = p.buffer.MeasTcp.ConsolidateExTime(data) p.buffer.tcpMutex.Unlock() } func (p *pipeline) pushSerialDataToBuffer(data sensorData) { if p.synchroniz.tcpSerialDelayMs < 0 { time.Sleep(time.Duration(-p.synchroniz.tcpSerialDelayMs) * time.Millisecond) } p.buffer.serialMutex.Lock() p.buffer.MeasSerial = p.buffer.MeasSerial.ConsolidateEpochsOnly(data) p.buffer.serialMutex.Unlock() } func (p *pipeline) Close() { p.active = false }