gyrogpsc/core/pipeline.go

128 lines
2.5 KiB
Go

package core
import (
"encoding/json"
"errors"
"log"
"sync"
"time"
)
type Processor interface {
Process(data *Sensordata) error
}
type pipeline struct {
synchronizer
aggregator
Publisher
publishTicker *time.Ticker
}
func NewPipeline(d Publisher, publishIntervalMs int, delayUpdateIntervalMs int) *pipeline {
return &pipeline{
synchronizer{
bufferSize: 100,
updateTicker: time.NewTicker(time.Duration(delayUpdateIntervalMs) * time.Second),
},
aggregator{},
d,
time.NewTicker(time.Duration(publishIntervalMs) * time.Second),
}
}
func (p *pipeline) Run() {
go p.schedule()
go func() {
for {
<-p.publishTicker.C
err := p.Publish()
if err != nil {
log.Println(err)
}
}
}()
}
func (p *pipeline) Publish() error {
if (p.tcpSensorData == Sensordata{} && p.serialSensorData == Sensordata{}) {
return errors.New("No Data available")
}
data := map[string]Sensordata{
string(p.tcpSensorData.SourceId): p.tcpSensorData,
string(p.serialSensorData.SourceId): p.serialSensorData,
}
jdata, err := json.Marshal(data)
if err != nil {
return err
}
p.Publisher.Publish(string(jdata))
return nil
}
type aggregator struct {
tcpSensorData Sensordata
serialSensorData Sensordata
tcpMutex *sync.Mutex
serialMutex *sync.Mutex
}
type UnixNanoTime int64
type synchronizer struct {
tcpDelayMs int
serialDelayMs int
tcpBuffer map[UnixNanoTime]Sensordata
serialBuffer map[UnixNanoTime]Sensordata
bufferSize int
mutex *sync.Mutex
updateTicker *time.Ticker
// should run concurrently
//
// Methods:
// pushSensordata(Sensordata), remove oldest if larger than bufferSize
// refreshDelay()
// Schedule()
}
func (s *synchronizer) schedule() {
for {
<-s.updateTicker.C
err := s.refreshDelay()
if err != nil {
log.Println(err)
}
}
}
func (s *synchronizer) refreshDelay() error {
// TODO: implement
return nil
}
func (p *pipeline) Process(data *Sensordata) error {
switch data.SourceId {
case SOURCE_TCP:
go p.pushTcpDataToBuffer(*data)
return nil
case SOURCE_SERIAL:
go p.pushSerialDataToBuffer(*data)
return nil
default:
return errors.New("invalid data source")
}
}
func (p *pipeline) pushTcpDataToBuffer(data Sensordata) {
time.Sleep(time.Duration(p.tcpDelayMs))
p.tcpMutex.Lock()
p.tcpSensorData = data
p.tcpMutex.Unlock()
}
func (p *pipeline) pushSerialDataToBuffer(data Sensordata) {
time.Sleep(time.Duration(p.serialDelayMs))
p.serialMutex.Lock()
p.serialSensorData = data
p.serialMutex.Unlock()
}