151 lines
3.2 KiB
Go
151 lines
3.2 KiB
Go
package core
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// TODO: adapt HNR-INS data to continue orientation stream
|
|
|
|
type Processor interface {
|
|
Process(data *Sensordata) error
|
|
}
|
|
|
|
type pipeline struct {
|
|
synchronizer
|
|
aggregator
|
|
Publisher
|
|
publishTicker *time.Ticker
|
|
}
|
|
|
|
func NewPipeline(d Publisher, publishIntervalMs int, delayUpdateIntervalMs int) *pipeline {
|
|
return &pipeline{
|
|
synchronizer{
|
|
bufferSize: 100,
|
|
mutex: &sync.Mutex{},
|
|
updateTicker: time.NewTicker(time.Duration(delayUpdateIntervalMs) * time.Millisecond),
|
|
},
|
|
aggregator{
|
|
tcpMutex: &sync.Mutex{},
|
|
serialMutex: &sync.Mutex{},
|
|
},
|
|
d,
|
|
time.NewTicker(time.Duration(publishIntervalMs) * time.Millisecond),
|
|
}
|
|
}
|
|
|
|
func (p *pipeline) Run() {
|
|
go p.schedule()
|
|
go func() {
|
|
for {
|
|
<-p.publishTicker.C
|
|
err := p.Publish()
|
|
if err != nil /*&& err.Error() != "no data available"*/ {
|
|
log.Println(err)
|
|
}
|
|
}
|
|
}()
|
|
log.Println("pipeline: processing service started")
|
|
}
|
|
|
|
func (p *pipeline) Publish() error {
|
|
p.tcpMutex.Lock()
|
|
p.serialMutex.Lock()
|
|
//log.Println(p.tcpSensorData)
|
|
//log.Println(p.serialSensorData)
|
|
if (p.tcpSensorData == Sensordata{} && p.serialSensorData == Sensordata{}) {
|
|
p.tcpMutex.Unlock()
|
|
p.serialMutex.Unlock()
|
|
return errors.New("no data available")
|
|
}
|
|
data := map[string]Sensordata{
|
|
string(SOURCE_TCP): p.tcpSensorData,
|
|
string(SOURCE_SERIAL): p.serialSensorData,
|
|
}
|
|
p.tcpSensorData = Sensordata{}
|
|
p.serialSensorData = Sensordata{}
|
|
p.tcpMutex.Unlock()
|
|
p.serialMutex.Unlock()
|
|
|
|
jdata, err := json.Marshal(data)
|
|
log.Println(string(jdata))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
p.Publisher.Publish(string(jdata))
|
|
return nil
|
|
}
|
|
|
|
type aggregator struct {
|
|
tcpSensorData Sensordata
|
|
serialSensorData Sensordata
|
|
tcpMutex *sync.Mutex
|
|
serialMutex *sync.Mutex
|
|
}
|
|
|
|
type UnixNanoTime int64
|
|
|
|
type synchronizer struct {
|
|
tcpDelayMs int
|
|
serialDelayMs int
|
|
tcpBuffer map[UnixNanoTime]Sensordata
|
|
serialBuffer map[UnixNanoTime]Sensordata
|
|
bufferSize int
|
|
mutex *sync.Mutex
|
|
updateTicker *time.Ticker
|
|
// should run concurrently
|
|
//
|
|
// Methods:
|
|
// pushSensordata(Sensordata), remove oldest if larger than bufferSize
|
|
// refreshDelay()
|
|
// Schedule()
|
|
}
|
|
|
|
func (s *synchronizer) schedule() {
|
|
log.Println("synchronizer: started")
|
|
for {
|
|
<-s.updateTicker.C
|
|
err := s.refreshDelay()
|
|
if err != nil {
|
|
log.Println(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *synchronizer) refreshDelay() error {
|
|
// TODO: implement
|
|
return nil
|
|
}
|
|
|
|
func (p *pipeline) Process(data *Sensordata) error {
|
|
if data == nil {
|
|
return errors.New("nil processing not allowed")
|
|
}
|
|
//log.Println(string(data.SourceId))
|
|
switch data.SourceId {
|
|
case SOURCE_TCP:
|
|
go p.pushTcpDataToBuffer(*data)
|
|
case SOURCE_SERIAL:
|
|
go p.pushSerialDataToBuffer(*data)
|
|
default:
|
|
return errors.New("invalid data source")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *pipeline) pushTcpDataToBuffer(data Sensordata) {
|
|
time.Sleep(time.Duration(p.tcpDelayMs))
|
|
p.tcpMutex.Lock()
|
|
p.tcpSensorData = data
|
|
p.tcpMutex.Unlock()
|
|
}
|
|
func (p *pipeline) pushSerialDataToBuffer(data Sensordata) {
|
|
time.Sleep(time.Duration(p.serialDelayMs))
|
|
p.serialMutex.Lock()
|
|
p.serialSensorData = p.serialSensorData.Consolidate(data)
|
|
p.serialMutex.Unlock()
|
|
}
|