package core import ( "context" "encoding/json" "errors" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/sirupsen/logrus" "golang.org/x/sync/semaphore" "sync" "time" ) type pipeline struct { active bool record bool synchroniz synchronizer buffer pipeBuffer publisher Publisher storer Storer publishTicker *time.Ticker mu sync.RWMutex sema *semaphore.Weighted } // pipe implements Runner & Pusher func NewPipeline(d Publisher, s Storer, conf *Configuration) *pipeline { return &pipeline{ false, false, synchronizer{ //bufferSize: 100, mutex: &sync.RWMutex{}, updateTicker: time.NewTicker(time.Duration(conf.Pipeline.SyncUpdateIntervalMs) * time.Millisecond), }, pipeBuffer{ tcpMutex: &sync.Mutex{}, serialMutex: &sync.Mutex{}, }, d, s, time.NewTicker(time.Duration(conf.Pipeline.PublishIntervalMs) * time.Millisecond), sync.RWMutex{}, semaphore.NewWeighted(2), } } func (p *pipeline) isPipeActive() bool { p.mu.RLock() defer p.mu.RUnlock() return p.active } func (p *pipeline) Run() { p.sema.Acquire(context.Background(), 2) p.mu.Lock() p.active = true p.mu.Unlock() logrus.Println("pipe: processing service started") go func() { for p.isPipeActive() { <-p.synchroniz.updateTicker.C err := p.refreshDelay() if err != nil { logrus.Debugln(err) } } p.sema.Release(1) logrus.Println("pipe: updater stopped") }() go func() { for p.isPipeActive() { <-p.publishTicker.C err := p.publish() if err != nil && err.Error() != "no data available" { logrus.Debug(err) } } p.sema.Release(1) logrus.Println("pipe: publisher stopped") }() } func (p *pipeline) Record() { p.record = true } func (p *pipeline) StopRecord() { p.record = false } func (p *pipeline) publish() error { p.buffer.tcpMutex.Lock() p.buffer.serialMutex.Lock() if (p.buffer.MeasTcp == sensorData{} && p.buffer.MeasSerial == sensorData{}) { p.buffer.tcpMutex.Unlock() p.buffer.serialMutex.Unlock() return errors.New("no data available") } if cmp.Equal(p.buffer.MeasTcp, p.buffer.LastMeasTcp, cmpopts.IgnoreUnexported(sensorData{})) && cmp.Equal(p.buffer.MeasSerial, p.buffer.LastMeasSerial, cmpopts.IgnoreUnexported(sensorData{})) { p.buffer.tcpMutex.Unlock() p.buffer.serialMutex.Unlock() return errors.New("same data") } logrus.Debug("––––––––––––––––––––––––––––––––––––") logrus.Debugf("SER old: %v", p.buffer.LastMeasSerial) logrus.Debugf("SER new: %v", p.buffer.MeasSerial) logrus.Debugf("TCP old: %v", p.buffer.LastMeasTcp) logrus.Debugf("TCP new: %v", p.buffer.MeasTcp) logrus.Debug("––––––––––––––––––––––––––––––––––––") p.buffer.LastMeasTcp = p.buffer.MeasTcp p.buffer.LastMeasSerial = p.buffer.MeasSerial p.storer.EnqueuePair(p.buffer.MeasTcp, p.buffer.MeasSerial) data := map[string]sensorData{ string(SOURCE_TCP): p.buffer.MeasTcp, string(SOURCE_SERIAL): p.buffer.MeasSerial, } p.buffer.tcpMutex.Unlock() p.buffer.serialMutex.Unlock() jdata, err := json.Marshal(data) //logrus.Println(string(pretty.Pretty(jdata))) if err != nil { return err } p.publisher.Publish(string(jdata)) return nil } type pipeBuffer struct { MeasTcp sensorData MeasSerial sensorData LastMeasTcp sensorData LastMeasSerial sensorData tcpMutex *sync.Mutex serialMutex *sync.Mutex } type UnixNanoTime int64 type synchronizer struct { tcpSerialDelayMs int64 mutex *sync.RWMutex updateTicker *time.Ticker } func (p *pipeline) refreshDelay() error { p.synchroniz.mutex.RLock() if p.synchroniz.tcpSerialDelayMs != 0 { logrus.Println("Delay TCP/SERIAL", p.synchroniz.tcpSerialDelayMs) } p.synchroniz.mutex.RUnlock() p.buffer.serialMutex.Lock() p.buffer.tcpMutex.Lock() tcpTime := time.Unix(0, p.buffer.MeasTcp.Timestamp) serTime := time.Unix(0, p.buffer.MeasSerial.Timestamp) p.buffer.tcpMutex.Unlock() p.buffer.serialMutex.Unlock() if tcpTime.UnixNano() == 0 || serTime.UnixNano() == 0 { return errors.New("no sync possible. check if both Collectors running. otherwise check GPS fix") } currentDelay := tcpTime.Sub(serTime).Milliseconds() if currentDelay > 5000 || currentDelay < -5000 { p.synchroniz.mutex.Lock() p.synchroniz.tcpSerialDelayMs = 0 p.synchroniz.mutex.Unlock() return errors.New("skipping synchronisation! time not properly configured or facing network problems.") } logrus.Debug("TCP", tcpTime.String()) logrus.Debug("SER", serTime.String()) logrus.Debug("Difference", tcpTime.Sub(serTime).Milliseconds(), "ms") delay := tcpTime.Sub(serTime).Milliseconds() p.synchroniz.mutex.Lock() p.synchroniz.tcpSerialDelayMs += delay p.synchroniz.mutex.Unlock() return nil } func (p *pipeline) Push(data *sensorData) error { if data == nil { return errors.New("nil processing not allowed") } //logrus.Println("push data to pipe:", string(data.source)) p.storer.EnqueueRaw(*data) switch data.source { case SOURCE_TCP: go p.pushTcpDataToBuffer(*data) case SOURCE_SERIAL: go p.pushSerialDataToBuffer(*data) default: panic("pipe: invalid data source") } return nil } func (p *pipeline) pushTcpDataToBuffer(data sensorData) { p.synchroniz.mutex.RLock() if p.synchroniz.tcpSerialDelayMs > 0 { time.Sleep(time.Duration(p.synchroniz.tcpSerialDelayMs) * time.Millisecond) } p.synchroniz.mutex.RLock() p.buffer.tcpMutex.Lock() p.buffer.MeasTcp = p.buffer.MeasTcp.ConsolidateExTime(data) p.buffer.tcpMutex.Unlock() } func (p *pipeline) pushSerialDataToBuffer(data sensorData) { p.synchroniz.mutex.RLock() if p.synchroniz.tcpSerialDelayMs < 0 { time.Sleep(time.Duration(-p.synchroniz.tcpSerialDelayMs) * time.Millisecond) } p.synchroniz.mutex.RUnlock() p.buffer.serialMutex.Lock() p.buffer.MeasSerial = p.buffer.MeasSerial.ConsolidateEpochsOnly(data) p.buffer.serialMutex.Unlock() } func (p *pipeline) Close() { p.mu.Lock() p.active = false p.mu.Unlock() }