package core import ( "context" "encoding/json" "errors" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/sirupsen/logrus" "golang.org/x/sync/semaphore" "sync" "time" ) type pipeline struct { active bool record bool synchroniz synchronizer buffer pipeBuffer publisher Publisher storer Tracker publishTicker *time.Ticker mu sync.RWMutex sema *semaphore.Weighted } // pipeline implements Runner & Processor func NewPipeline(d Publisher, s Tracker, conf *Configuration) *pipeline { return &pipeline{ false, false, synchronizer{ //bufferSize: 100, mutex: &sync.RWMutex{}, updateTicker: time.NewTicker(time.Duration(conf.Pipeline.SyncUpdateIntervalMs) * time.Millisecond), }, pipeBuffer{ tcpMutex: &sync.Mutex{}, serialMutex: &sync.Mutex{}, }, d, s, time.NewTicker(time.Duration(conf.Pipeline.PublishIntervalMs) * time.Millisecond), sync.RWMutex{}, semaphore.NewWeighted(2), } } func (p *pipeline) isPipeActive() bool { p.mu.RLock() defer p.mu.RUnlock() return p.active } func (p *pipeline) isPipeRecording() bool { p.mu.RLock() defer p.mu.RUnlock() return p.record } func (p *pipeline) Run() { p.sema.Acquire(context.Background(), 1) // !!! n=2 wenn synchronizer mitläuft p.mu.Lock() p.active = true p.mu.Unlock() logrus.Println("pipeline: processing service started") //go func() { // for p.isPipeActive() { // <-p.synchroniz.updateTicker.C // err := p.refreshDelay() // if err != nil { // logrus.Debugln(err) // } // } // p.sema.Release(1) // logrus.Println("pipeline: updater stopped") //}() go func() { for p.isPipeActive() { <-p.publishTicker.C err := p.publish() if err != nil && err.Error() != "no data available" { logrus.Trace(err) } } p.sema.Release(1) logrus.Println("pipeline: publisher stopped") }() } func (p *pipeline) Record() { p.record = true } func (p *pipeline) StopRecord() { p.record = false } func (p *pipeline) Push(data *SensorData) error { if (data == nil || *data == SensorData{}) { return errors.New("no data") } //logrus.Println("push data to pipeline:", string(data.source)) switch data.source { case SOURCE_TCP: go p.pushTcpDataToBuffer(*data) case SOURCE_SERIAL: go p.pushSerialDataToBuffer(*data) default: panic("pipeline: invalid data source") } return nil } func (p *pipeline) publish() error { p.buffer.serialMutex.Lock() p.buffer.tcpMutex.Lock() if (p.buffer.MeasTcp == SensorData{} && p.buffer.MeasSerial == SensorData{}) { p.buffer.tcpMutex.Unlock() p.buffer.serialMutex.Unlock() return errors.New("no data available") } if cmp.Equal(p.buffer.MeasTcp, p.buffer.LastMeasTcp, cmpopts.IgnoreUnexported(SensorData{})) && cmp.Equal(p.buffer.MeasSerial, p.buffer.LastMeasSerial, cmpopts.IgnoreUnexported(SensorData{})) { p.buffer.tcpMutex.Unlock() p.buffer.serialMutex.Unlock() return errors.New("same data") } logrus.Debugf("–––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––") logrus.Tracef("SER old: %-40s %-40s %v %v", p.buffer.LastMeasSerial.Timestamp.Format(time.RFC3339Nano), p.buffer.LastMeasSerial.Servertime.Format(time.RFC3339Nano), p.buffer.LastMeasSerial.Position, p.buffer.LastMeasSerial.Orientation) logrus.Debugf("SER new: %-40s %-40s %v %v", p.buffer.MeasSerial.Timestamp.Format(time.RFC3339Nano), p.buffer.MeasSerial.Servertime.Format(time.RFC3339Nano), p.buffer.MeasSerial.Position, p.buffer.MeasSerial.Orientation) logrus.Tracef("TCP old: %-40s %-40s %v %v", p.buffer.LastMeasTcp.Timestamp.Format(time.RFC3339Nano), p.buffer.LastMeasTcp.Servertime.Format(time.RFC3339Nano), p.buffer.LastMeasTcp.Position, p.buffer.LastMeasTcp.Orientation) logrus.Debugf("TCP new: %-40s %-40s %v %v", p.buffer.MeasTcp.Timestamp.Format(time.RFC3339Nano), p.buffer.MeasTcp.Servertime.Format(time.RFC3339Nano), p.buffer.MeasTcp.Position, p.buffer.MeasTcp.Orientation) p.buffer.LastMeasTcp = p.buffer.MeasTcp p.buffer.LastMeasSerial = p.buffer.MeasSerial data := map[string]interface{}{} if p.buffer.MeasTcp.source == SOURCE_TCP { data[string(SOURCE_TCP)] = p.buffer.MeasTcp } if p.buffer.MeasSerial.source == SOURCE_SERIAL { data[string(SOURCE_SERIAL)] = p.buffer.MeasSerial } p.buffer.tcpMutex.Unlock() p.buffer.serialMutex.Unlock() jdata, err := json.Marshal(data) //logrus.Println(string(pretty.Pretty(jdata))) if err != nil { return err } p.publisher.Publish(string(jdata)) return nil } type pipeBuffer struct { MeasTcp SensorData MeasSerial SensorData LastMeasTcp SensorData LastMeasSerial SensorData tcpMutex *sync.Mutex serialMutex *sync.Mutex } type UnixNanoTime int64 type synchronizer struct { tcpSerialDelayMs int64 mutex *sync.RWMutex updateTicker *time.Ticker } func (p *pipeline) refreshDelay() error { logrus.Debugf("refreshing delay...") p.buffer.serialMutex.Lock() p.buffer.tcpMutex.Lock() tcpTime := p.buffer.MeasTcp.Timestamp serTime := p.buffer.MeasSerial.Timestamp p.buffer.tcpMutex.Unlock() p.buffer.serialMutex.Unlock() if tcpTime.IsZero() || serTime.IsZero() || tcpTime.UnixNano() == 0 || serTime.UnixNano() == 0 { return errors.New("sync not possible. zero time value detected") } logrus.Debug("TCP", tcpTime.Format(time.RFC3339Nano)) logrus.Debug("SER", serTime.Format(time.RFC3339Nano)) currentDelay := tcpTime.Sub(serTime).Milliseconds() p.synchroniz.mutex.Lock() defer p.synchroniz.mutex.Unlock() logrus.Debugf("old delay-> %vms...", p.synchroniz.tcpSerialDelayMs) if currentDelay > 5000 || currentDelay < -5000 { p.synchroniz.tcpSerialDelayMs = 0 return errors.New("skipping synchronisation! time not properly configured or facing network problems.") } p.synchroniz.tcpSerialDelayMs += currentDelay logrus.Infof("new delay-> %vms", p.synchroniz.tcpSerialDelayMs) return nil } func (p *pipeline) pushTcpDataToBuffer(data SensorData) { data.Servertime = time.Now().UTC() if p.isPipeRecording() { p.storer.Put(data) } p.synchroniz.mutex.RLock() if p.synchroniz.tcpSerialDelayMs > 0 { time.Sleep(time.Duration(p.synchroniz.tcpSerialDelayMs) * time.Millisecond) } p.synchroniz.mutex.RUnlock() p.buffer.tcpMutex.Lock() p.buffer.MeasTcp = data //p.buffer.MeasTcp = p.buffer.MeasTcp.ConsolidateExTime(data) p.buffer.tcpMutex.Unlock() } func (p *pipeline) pushSerialDataToBuffer(data SensorData) { data.Servertime = time.Now().UTC() if p.isPipeRecording() { p.storer.Put(data) } p.synchroniz.mutex.RLock() if p.synchroniz.tcpSerialDelayMs < 0 { time.Sleep(time.Duration(-p.synchroniz.tcpSerialDelayMs) * time.Millisecond) } p.synchroniz.mutex.RUnlock() p.buffer.serialMutex.Lock() p.buffer.MeasSerial = data //p.buffer.MeasSerial = p.buffer.MeasSerial.ConsolidateEpochsOnly(data) p.buffer.serialMutex.Unlock() } func (p *pipeline) Close() { p.mu.Lock() p.active = false p.mu.Unlock() }