package core import ( "encoding/json" "errors" "log" "sync" "time" ) // TODO: adapt HNR-INS data to continue orientation stream type Processor interface { Process(data *Sensordata) error } type pipeline struct { syn synchronizer agr aggregator pub Publisher publishTicker *time.Ticker } func NewPipeline(d Publisher, publishIntervalMs int, delayUpdateIntervalMs int) *pipeline { return &pipeline{ synchronizer{ bufferSize: 100, mutex: &sync.Mutex{}, updateTicker: time.NewTicker(time.Duration(delayUpdateIntervalMs) * time.Millisecond), }, aggregator{ tcpMutex: &sync.Mutex{}, serialMutex: &sync.Mutex{}, }, d, time.NewTicker(time.Duration(publishIntervalMs) * time.Millisecond), } } func (p *pipeline) Run() { go p.syn.schedule() go func() { for { <-p.publishTicker.C err := p.Publish() if err != nil && err.Error() != "no data available" { log.Println(err) } } }() log.Println("pipeline: processing service started") } func (p *pipeline) Publish() error { p.agr.tcpMutex.Lock() p.agr.serialMutex.Lock() //log.Println(pub.tcpSensorData) //log.Println(pub.serialSensorData) if (p.agr.tcpSensorData == Sensordata{} && p.agr.serialSensorData == Sensordata{}) { p.agr.tcpMutex.Unlock() p.agr.serialMutex.Unlock() return errors.New("no data available") } data := map[string]Sensordata{ string(SOURCE_TCP): p.agr.tcpSensorData, string(SOURCE_SERIAL): p.agr.serialSensorData, } p.agr.tcpSensorData = Sensordata{} p.agr.serialSensorData = Sensordata{} p.agr.tcpMutex.Unlock() p.agr.serialMutex.Unlock() jdata, err := json.Marshal(data) //log.Println(string(pretty.Pretty(jdata))) if err != nil { return err } p.pub.Publish(string(jdata)) return nil } type aggregator struct { tcpSensorData Sensordata serialSensorData Sensordata tcpMutex *sync.Mutex serialMutex *sync.Mutex } type UnixNanoTime int64 type synchronizer struct { tcpDelayMs int serialDelayMs int tcpBuffer map[UnixNanoTime]Sensordata serialBuffer map[UnixNanoTime]Sensordata bufferSize int mutex *sync.Mutex updateTicker *time.Ticker // should run concurrently // // Methods: // pushSensordata(Sensordata), remove oldest if larger than bufferSize // refreshDelay() // Schedule() } func (s *synchronizer) schedule() { log.Println("synchronizer: started") for { <-s.updateTicker.C err := s.refreshDelay() if err != nil { log.Println(err) } } } func (s *synchronizer) refreshDelay() error { // TODO: implement return nil } func (p *pipeline) Process(data *Sensordata) error { if data == nil { return errors.New("nil processing not allowed") } //log.Println(string(data.SourceId)) switch data.SourceId { case SOURCE_TCP: go p.pushTcpDataToBuffer(*data) case SOURCE_SERIAL: go p.pushSerialDataToBuffer(*data) default: return errors.New("invalid data source") } return nil } func (p *pipeline) pushTcpDataToBuffer(data Sensordata) { time.Sleep(time.Duration(p.syn.tcpDelayMs)) p.agr.tcpMutex.Lock() p.agr.tcpSensorData = data p.agr.tcpMutex.Unlock() } func (p *pipeline) pushSerialDataToBuffer(data Sensordata) { time.Sleep(time.Duration(p.syn.serialDelayMs)) p.agr.serialMutex.Lock() p.agr.serialSensorData = p.agr.serialSensorData.Consolidate(data) p.agr.serialMutex.Unlock() }