package core import ( "encoding/json" "errors" "fmt" "log" "sync" "time" ) // TODO: adapt HNR-INS data to continue orientation stream type Processor interface { Process(data *Sensordata) error } type pipeline struct { syn synchronizer agr aggregator pub Publisher publishTicker *time.Ticker } func NewPipeline(d Publisher, publishIntervalMs int, delayUpdateIntervalMs int) *pipeline { return &pipeline{ synchronizer{ //bufferSize: 100, mutex: &sync.Mutex{}, updateTicker: time.NewTicker(time.Duration(delayUpdateIntervalMs) * time.Millisecond), }, aggregator{ tcpMutex: &sync.Mutex{}, serialMutex: &sync.Mutex{}, }, d, time.NewTicker(time.Duration(publishIntervalMs) * time.Millisecond), } } func (p *pipeline) Run() { go p.scheduleSynchronizer() go func() { for { <-p.publishTicker.C err := p.Publish() if err != nil && err.Error() != "no data available" { log.Println(err) } } }() log.Println("pipeline: processing service started") } func (p *pipeline) Publish() error { p.agr.tcpMutex.Lock() p.agr.serialMutex.Lock() //log.Println(pub.tcpSensorData) //log.Println(pub.serialSensorData) if (p.agr.tcpSensorData == Sensordata{} && p.agr.serialSensorData == Sensordata{}) { p.agr.tcpMutex.Unlock() p.agr.serialMutex.Unlock() return errors.New("no data available") } data := map[string]Sensordata{ string(SOURCE_TCP): p.agr.tcpSensorData, string(SOURCE_SERIAL): p.agr.serialSensorData, } //p.agr.tcpSensorData = Sensordata{} //p.agr.serialSensorData = Sensordata{} p.agr.tcpMutex.Unlock() p.agr.serialMutex.Unlock() jdata, err := json.Marshal(data) //log.Println(string(pretty.Pretty(jdata))) if err != nil { return err } p.pub.Publish(string(jdata)) return nil } type aggregator struct { tcpSensorData Sensordata serialSensorData Sensordata tcpMutex *sync.Mutex serialMutex *sync.Mutex } type UnixNanoTime int64 type synchronizer struct { tcpSerialDelayMs int64 //tcpBuffer map[UnixNanoTime]Sensordata //serialBuffer map[UnixNanoTime]Sensordata //bufferSize int mutex *sync.Mutex updateTicker *time.Ticker // should run concurrently // // Methods: // pushSensordata(Sensordata), remove oldest if larger than bufferSize // refreshDelay() // Schedule() } func (p *pipeline) scheduleSynchronizer() { log.Println("synchronizer: started") for { <-p.syn.updateTicker.C err := p.refreshDelay() if err != nil { log.Println(err) } } } func (p *pipeline) refreshDelay() error { log.Println("refreshing delay....") fmt.Println("Delay TCP/SERIAL", p.syn.tcpSerialDelayMs) p.agr.serialMutex.Lock() p.agr.tcpMutex.Lock() tcpTime := time.Unix(0, p.agr.tcpSensorData.Timestamp) serTime := time.Unix(0, p.agr.serialSensorData.Timestamp) p.agr.serialMutex.Unlock() p.agr.tcpMutex.Unlock() if tcpTime.UnixNano() == 0 || serTime.UnixNano() == 0 { return errors.New("no sync possible. no data to compare") } log.Println("TCP", tcpTime.String()) log.Println("SER", serTime.String()) log.Println("Difference", tcpTime.Sub(serTime).Milliseconds()) delay := tcpTime.Sub(serTime).Milliseconds() p.syn.tcpSerialDelayMs += delay return nil } func (p *pipeline) Process(data *Sensordata) error { if data == nil { return errors.New("nil processing not allowed") } //log.Println(string(data.SourceId)) switch data.SourceId { case SOURCE_TCP: go p.pushTcpDataToBuffer(*data) case SOURCE_SERIAL: go p.pushSerialDataToBuffer(*data) default: return errors.New("invalid data source") } return nil } func (p *pipeline) pushTcpDataToBuffer(data Sensordata) { if p.syn.tcpSerialDelayMs > 0 { time.Sleep(time.Duration(p.syn.tcpSerialDelayMs) * time.Millisecond) } p.agr.tcpMutex.Lock() p.agr.tcpSensorData = p.agr.tcpSensorData.Consolidate2(data) p.agr.tcpMutex.Unlock() } func (p *pipeline) pushSerialDataToBuffer(data Sensordata) { if p.syn.tcpSerialDelayMs < 0 { time.Sleep(time.Duration(-p.syn.tcpSerialDelayMs) * time.Millisecond) } p.agr.serialMutex.Lock() p.agr.serialSensorData = p.agr.serialSensorData.Consolidate2(data) p.agr.serialMutex.Unlock() }