pipeline ordering
This commit is contained in:
parent
f42e2d29b0
commit
53351c7349
@ -7,10 +7,15 @@ import (
|
||||
"github.com/google/uuid"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/spf13/viper"
|
||||
"time"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
)
|
||||
|
||||
func main() {
|
||||
go func() {
|
||||
logrus.Println(http.ListenAndServe("localhost:6060", nil))
|
||||
}()
|
||||
|
||||
conf := configurationFromFile()
|
||||
|
||||
logrus.Debug(conf)
|
||||
@ -21,8 +26,10 @@ func main() {
|
||||
service := core.TrackingService(conf, repo, disp)
|
||||
|
||||
go func() {
|
||||
service.LoadTracking(uuid.MustParse("06b05aa3-6a13-4ffb-8ac7-cd35dfc0f949"))
|
||||
time.Sleep(120 * time.Second)
|
||||
// Long Run
|
||||
//service.LoadTracking(uuid.MustParse("06b05aa3-6a13-4ffb-8ac7-cd35dfc0f949"))
|
||||
// Tunnel
|
||||
service.LoadTracking(uuid.MustParse("c3dbee7c-512a-4cc8-9804-21f0f2cf3c22"))
|
||||
|
||||
//pprof.StopCPUProfile()
|
||||
//os.Exit(0)
|
||||
|
||||
@ -6,11 +6,15 @@ import (
|
||||
"git.timovolkmann.de/gyrogpsc/web"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/spf13/viper"
|
||||
"os"
|
||||
"time"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
)
|
||||
|
||||
func main() {
|
||||
go func() {
|
||||
logrus.Println(http.ListenAndServe("localhost:6060", nil))
|
||||
}()
|
||||
|
||||
conf := configurationFromFile()
|
||||
|
||||
logrus.Debug(conf)
|
||||
@ -20,27 +24,6 @@ func main() {
|
||||
|
||||
service := core.TrackingService(conf, repo, disp)
|
||||
|
||||
go func() {
|
||||
service.StartPipeline(core.TCP, core.SERIAL)
|
||||
time.Sleep(5 * time.Second)
|
||||
service.StartRecord()
|
||||
time.Sleep(10 * time.Second)
|
||||
service.StopRecord()
|
||||
time.Sleep(5 * time.Second)
|
||||
service.StartPipeline(core.TCP, core.SERIAL)
|
||||
time.Sleep(5 * time.Second)
|
||||
service.StartRecord()
|
||||
time.Sleep(60 * time.Second)
|
||||
service.StopRecord()
|
||||
time.Sleep(2 * time.Second)
|
||||
service.StopAll()
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
//pprof.StopCPUProfile()
|
||||
os.Exit(0)
|
||||
}()
|
||||
|
||||
web.CreateServer(service, disp, conf)
|
||||
}
|
||||
|
||||
|
||||
@ -1,83 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"git.timovolkmann.de/gyrogpsc/core"
|
||||
"git.timovolkmann.de/gyrogpsc/storage"
|
||||
"git.timovolkmann.de/gyrogpsc/web"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/spf13/viper"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
//f, err := os.Create("_profile.pprof")
|
||||
//if err != nil {
|
||||
// logrus.Fatal(err)
|
||||
//}
|
||||
//pprof.StartCPUProfile(f)
|
||||
go func() {
|
||||
logrus.Println(http.ListenAndServe("localhost:6060", nil))
|
||||
}()
|
||||
//defer pprof.StopCPUProfile()
|
||||
|
||||
conf := configurationFromFile()
|
||||
|
||||
repo := storage.NewRepository(conf)
|
||||
disp := core.NewDispatcher()
|
||||
|
||||
service := core.TrackingService(repo, disp, conf)
|
||||
|
||||
go func() {
|
||||
service.StartPipeline(core.TCP, core.SERIAL)
|
||||
time.Sleep(5 * time.Second)
|
||||
service.StartRecord()
|
||||
time.Sleep(10 * time.Second)
|
||||
service.StopRecord()
|
||||
time.Sleep(5 * time.Second)
|
||||
service.StartPipeline(core.TCP, core.SERIAL)
|
||||
time.Sleep(5 * time.Second)
|
||||
service.StartRecord()
|
||||
time.Sleep(60 * time.Second)
|
||||
service.StopRecord()
|
||||
time.Sleep(2 * time.Second)
|
||||
service.StopAll()
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
//pprof.StopCPUProfile()
|
||||
os.Exit(0)
|
||||
}()
|
||||
|
||||
web.CreateServer(service, disp, conf)
|
||||
}
|
||||
|
||||
func configurationFromFile() *core.Configuration {
|
||||
viper.SetDefault("collectors.porttcp", ":3010")
|
||||
viper.SetDefault("collectors.portserial", "/dev/tty.usbmodem14201")
|
||||
viper.SetDefault("webserver.port", ":3011")
|
||||
viper.SetDefault("pipeline.publishIntervalMs", 50)
|
||||
viper.SetDefault("pipeline.syncUpdateIntervalMs", 494)
|
||||
viper.SetDefault("debuglevel", "INFO")
|
||||
|
||||
viper.SetConfigName("gpsconfig") // name of config file (without extension)
|
||||
viper.SetConfigType("yaml")
|
||||
viper.AddConfigPath(".")
|
||||
viper.AddConfigPath("./../../")
|
||||
if err := viper.ReadInConfig(); err != nil {
|
||||
logrus.Warn("couldn't find config file. using standard configuration")
|
||||
}
|
||||
|
||||
c := core.Configuration{}
|
||||
if err := viper.Unmarshal(&c); err != nil {
|
||||
logrus.Debug("couldn't load config...")
|
||||
logrus.Error(err)
|
||||
}
|
||||
lvl, err := logrus.ParseLevel(c.Debuglevel)
|
||||
if err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
logrus.SetLevel(lvl)
|
||||
return &c
|
||||
}
|
||||
@ -19,13 +19,17 @@ func NewRecordPipeline(p Publisher, s Tracker, netChan chan interface{}, serialC
|
||||
transSer := flow.NewFlatMap(transformSerFunc, 1)
|
||||
//flowDelay := flow.NewMap(delayFunc(), 1)
|
||||
flowStore := flow.NewMap(storeFunc(s), 1)
|
||||
|
||||
dataSanitizer := flow.NewMap(replaySanitizeFunc(), 1)
|
||||
flowReorder := NewRearranger()
|
||||
|
||||
flowJson := flow.NewMap(jsonFunc, 1)
|
||||
sinkPub := newPublishSink(p)
|
||||
|
||||
// wire up and execute
|
||||
demux := flow.Merge(collNet.Via(transNet), collSer.Via(transSer))
|
||||
//go demux.Via(flowDelay).Via(flowStore).Via(flowJson).To(sinkPub)
|
||||
go demux.Via(flowStore).Via(flowJson).To(sinkPub)
|
||||
go demux.Via(flowStore).Via(dataSanitizer).Via(flowReorder).Via(flowJson).To(sinkPub)
|
||||
return &pipelineRecord{}
|
||||
}
|
||||
|
||||
@ -45,7 +49,7 @@ func storeFunc(s Tracker) flow.MapFunc {
|
||||
}
|
||||
|
||||
logrus.Debugf("%-14v %-40s %-40s %v %v", sd.Source(), sd.Timestamp.Format(time.RFC3339Nano), sd.Servertime.Format(time.RFC3339Nano), sd.Position, sd.Orientation)
|
||||
|
||||
//runtime.Gosched()
|
||||
return sd
|
||||
}
|
||||
}
|
||||
|
||||
@ -6,6 +6,7 @@ import (
|
||||
ext "github.com/reugn/go-streams/extension"
|
||||
"github.com/reugn/go-streams/flow"
|
||||
"github.com/sirupsen/logrus"
|
||||
"runtime"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
@ -16,7 +17,7 @@ type pipelineReplay struct{}
|
||||
func NewReplayPipeline(p Publisher, t *Tracking) *pipelineReplay {
|
||||
// set pipeline up and wire it together
|
||||
collNet := ext.NewChanSource(channelFromTracking(t))
|
||||
dataSanitizer := flow.NewMap(replaySanitizeFunc(), 8)
|
||||
dataSanitizer := flow.NewMap(replaySanitizeFunc(), 1)
|
||||
flowReorder := NewRearranger()
|
||||
|
||||
flowJson := flow.NewMap(jsonFunc, 1)
|
||||
@ -40,17 +41,21 @@ func channelFromTracking(t *Tracking) chan interface{} {
|
||||
ch := make(chan interface{})
|
||||
sort.Slice(t.Data, func(i, j int) bool { return t.Data[i].Servertime.Before(t.Data[j].Servertime) })
|
||||
go func() {
|
||||
lastTimestamp := t.Data[0].Servertime
|
||||
//lastTimestamp := t.Data[len(t.Data)-1].Servertime
|
||||
//for i := len(t.Data)-1; i >= 0; i-- {
|
||||
for i := 0; i <= len(t.Data)-1; i++ {
|
||||
sleeps := t.Data[i].Servertime.Sub(lastTimestamp)
|
||||
lastTimestamp = t.Data[i].Servertime
|
||||
logrus.Traceln("simulation original stream: waiting ->", sleeps)
|
||||
time.Sleep(sleeps)
|
||||
ch <- t.Data[i]
|
||||
}
|
||||
logrus.Infoln("replay: pushed all tracking data to pipeline")
|
||||
lastTs := t.Data[0].Servertime.UnixNano()
|
||||
lastTsNow := time.Now().UTC().UnixNano()
|
||||
i := 0
|
||||
for i <= len(t.Data)-1 {
|
||||
durationSinceLastEvent := t.Data[i].Servertime.UnixNano() - lastTs
|
||||
timeCounter := time.Now().UTC().UnixNano() - lastTsNow
|
||||
if timeCounter >= durationSinceLastEvent {
|
||||
ch <- &(t.Data[i])
|
||||
lastTs = t.Data[i].Servertime.UnixNano()
|
||||
lastTsNow = time.Now().UTC().UnixNano()
|
||||
i++
|
||||
}
|
||||
}
|
||||
|
||||
logrus.Infoln("replay: pushed all tracking data to pipeline")
|
||||
}()
|
||||
return ch
|
||||
}
|
||||
@ -60,7 +65,7 @@ func replaySanitizeFunc() flow.MapFunc {
|
||||
var lastTimeOffsetUblox int64
|
||||
|
||||
return func(i interface{}) interface{} {
|
||||
sd := i.(SensorData)
|
||||
sd := i.(*SensorData)
|
||||
|
||||
if !(sd.Timestamp.IsZero() || sd.Timestamp.Nanosecond() == 0) {
|
||||
lastOffset := sd.Servertime.UnixNano() - sd.Timestamp.UnixNano()
|
||||
@ -100,13 +105,13 @@ func NewRearranger() *rearranger {
|
||||
}
|
||||
|
||||
type rearranger struct {
|
||||
sync.Mutex
|
||||
queue *flow.PriorityQueue
|
||||
in chan interface{}
|
||||
out chan interface{}
|
||||
done chan struct{}
|
||||
startTimeNano int64
|
||||
startTimeNanoNow int64
|
||||
sync.RWMutex
|
||||
queue *flow.PriorityQueue
|
||||
in chan interface{}
|
||||
out chan interface{}
|
||||
done chan struct{}
|
||||
lastTimestamp int64
|
||||
lastTimeNanoNow int64
|
||||
}
|
||||
|
||||
// Verify rearranger satisfies the Flow interface.
|
||||
@ -140,14 +145,16 @@ func (r *rearranger) transmit(inlet streams.Inlet) {
|
||||
func (r *rearranger) receive() {
|
||||
for elem := range r.in {
|
||||
ts := r.timestamp(elem)
|
||||
if r.startTimeNano == 0 {
|
||||
r.startTimeNano = ts //- (500 * 1e+6) // Delay
|
||||
r.startTimeNanoNow = time.Now().UTC().UnixNano()
|
||||
}
|
||||
//if r.lastTimestamp == 0 {
|
||||
// r.lastTimestamp = ts //- (500 * 1e+6) // Delay
|
||||
// r.lastTimeNanoNow = time.Now().UTC().UnixNano()
|
||||
//}
|
||||
item := flow.NewItem(elem, ts, 0)
|
||||
r.Lock()
|
||||
heap.Push(r.queue, item)
|
||||
r.Unlock()
|
||||
logrus.Debugln("item recieved.")
|
||||
runtime.Gosched()
|
||||
}
|
||||
close(r.done)
|
||||
close(r.out)
|
||||
@ -156,29 +163,19 @@ func (r *rearranger) receive() {
|
||||
// emit pops data from ordered priority queue
|
||||
func (r *rearranger) emit() {
|
||||
for {
|
||||
if r.startTimeNano == 0 {
|
||||
if r.queue.Len() <= 10 {
|
||||
continue
|
||||
}
|
||||
tnow := time.Now().UTC().UnixNano()
|
||||
logrus.Debugln("popping item: ")
|
||||
|
||||
if r.queue.Len() <= 0 {
|
||||
continue
|
||||
}
|
||||
r.Lock()
|
||||
durationSinceStartItem := r.queue.Head().Msg.(SensorData).Timestamp.UnixNano() - r.startTimeNano
|
||||
durationSinceStartNow := time.Now().UTC().UnixNano() - r.startTimeNanoNow - (100 * 1e+6)
|
||||
if durationSinceStartNow >= durationSinceStartItem {
|
||||
logrus.Debugln("pqueue size: ", r.queue.Len())
|
||||
logrus.Debugln("time: ",
|
||||
time.Duration(durationSinceStartNow),
|
||||
time.Duration(durationSinceStartItem),
|
||||
time.Duration(durationSinceStartNow)-time.Duration(durationSinceStartItem))
|
||||
|
||||
item := heap.Pop(r.queue).(*flow.Item)
|
||||
v := item.Msg.(SensorData)
|
||||
r.out <- v
|
||||
}
|
||||
item := heap.Pop(r.queue).(*flow.Item)
|
||||
r.Unlock()
|
||||
|
||||
v := item.Msg.(*SensorData)
|
||||
r.out <- v
|
||||
r.lastTimestamp = v.Timestamp.UnixNano()
|
||||
r.lastTimeNanoNow = tnow
|
||||
select {
|
||||
case <-r.done:
|
||||
return
|
||||
@ -188,6 +185,6 @@ func (r *rearranger) emit() {
|
||||
}
|
||||
|
||||
func (r *rearranger) timestamp(elem interface{}) int64 {
|
||||
v := elem.(SensorData)
|
||||
v := elem.(*SensorData)
|
||||
return v.Timestamp.UnixNano()
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user