From 53351c73493a8506f39ccfcc1546141501f0f0bc Mon Sep 17 00:00:00 2001 From: Timo Volkmann Date: Mon, 28 Dec 2020 11:06:41 +0100 Subject: [PATCH] pipeline ordering --- cmd/replay/replay.go | 13 +++++-- cmd/server/server.go | 29 +++----------- cmd/test/autorun.go | 83 ----------------------------------------- core/pipeline_record.go | 8 +++- core/pipeline_replay.go | 83 ++++++++++++++++++++--------------------- 5 files changed, 62 insertions(+), 154 deletions(-) delete mode 100644 cmd/test/autorun.go diff --git a/cmd/replay/replay.go b/cmd/replay/replay.go index 8ed8404..710bdd1 100644 --- a/cmd/replay/replay.go +++ b/cmd/replay/replay.go @@ -7,10 +7,15 @@ import ( "github.com/google/uuid" "github.com/sirupsen/logrus" "github.com/spf13/viper" - "time" + "net/http" + _ "net/http/pprof" ) func main() { + go func() { + logrus.Println(http.ListenAndServe("localhost:6060", nil)) + }() + conf := configurationFromFile() logrus.Debug(conf) @@ -21,8 +26,10 @@ func main() { service := core.TrackingService(conf, repo, disp) go func() { - service.LoadTracking(uuid.MustParse("06b05aa3-6a13-4ffb-8ac7-cd35dfc0f949")) - time.Sleep(120 * time.Second) + // Long Run + //service.LoadTracking(uuid.MustParse("06b05aa3-6a13-4ffb-8ac7-cd35dfc0f949")) + // Tunnel + service.LoadTracking(uuid.MustParse("c3dbee7c-512a-4cc8-9804-21f0f2cf3c22")) //pprof.StopCPUProfile() //os.Exit(0) diff --git a/cmd/server/server.go b/cmd/server/server.go index 4e46c1e..c3e0148 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -6,11 +6,15 @@ import ( "git.timovolkmann.de/gyrogpsc/web" "github.com/sirupsen/logrus" "github.com/spf13/viper" - "os" - "time" + "net/http" + _ "net/http/pprof" ) func main() { + go func() { + logrus.Println(http.ListenAndServe("localhost:6060", nil)) + }() + conf := configurationFromFile() logrus.Debug(conf) @@ -20,27 +24,6 @@ func main() { service := core.TrackingService(conf, repo, disp) - go func() { - service.StartPipeline(core.TCP, core.SERIAL) - time.Sleep(5 * time.Second) - service.StartRecord() - time.Sleep(10 * time.Second) - service.StopRecord() - time.Sleep(5 * time.Second) - service.StartPipeline(core.TCP, core.SERIAL) - time.Sleep(5 * time.Second) - service.StartRecord() - time.Sleep(60 * time.Second) - service.StopRecord() - time.Sleep(2 * time.Second) - service.StopAll() - - time.Sleep(5 * time.Second) - - //pprof.StopCPUProfile() - os.Exit(0) - }() - web.CreateServer(service, disp, conf) } diff --git a/cmd/test/autorun.go b/cmd/test/autorun.go deleted file mode 100644 index ea54994..0000000 --- a/cmd/test/autorun.go +++ /dev/null @@ -1,83 +0,0 @@ -package main - -import ( - "git.timovolkmann.de/gyrogpsc/core" - "git.timovolkmann.de/gyrogpsc/storage" - "git.timovolkmann.de/gyrogpsc/web" - "github.com/sirupsen/logrus" - "github.com/spf13/viper" - "net/http" - _ "net/http/pprof" - "os" - "time" -) - -func main() { - //f, err := os.Create("_profile.pprof") - //if err != nil { - // logrus.Fatal(err) - //} - //pprof.StartCPUProfile(f) - go func() { - logrus.Println(http.ListenAndServe("localhost:6060", nil)) - }() - //defer pprof.StopCPUProfile() - - conf := configurationFromFile() - - repo := storage.NewRepository(conf) - disp := core.NewDispatcher() - - service := core.TrackingService(repo, disp, conf) - - go func() { - service.StartPipeline(core.TCP, core.SERIAL) - time.Sleep(5 * time.Second) - service.StartRecord() - time.Sleep(10 * time.Second) - service.StopRecord() - time.Sleep(5 * time.Second) - service.StartPipeline(core.TCP, core.SERIAL) - time.Sleep(5 * time.Second) - service.StartRecord() - time.Sleep(60 * time.Second) - service.StopRecord() - time.Sleep(2 * time.Second) - service.StopAll() - time.Sleep(1 * time.Second) - - //pprof.StopCPUProfile() - os.Exit(0) - }() - - web.CreateServer(service, disp, conf) -} - -func configurationFromFile() *core.Configuration { - viper.SetDefault("collectors.porttcp", ":3010") - viper.SetDefault("collectors.portserial", "/dev/tty.usbmodem14201") - viper.SetDefault("webserver.port", ":3011") - viper.SetDefault("pipeline.publishIntervalMs", 50) - viper.SetDefault("pipeline.syncUpdateIntervalMs", 494) - viper.SetDefault("debuglevel", "INFO") - - viper.SetConfigName("gpsconfig") // name of config file (without extension) - viper.SetConfigType("yaml") - viper.AddConfigPath(".") - viper.AddConfigPath("./../../") - if err := viper.ReadInConfig(); err != nil { - logrus.Warn("couldn't find config file. using standard configuration") - } - - c := core.Configuration{} - if err := viper.Unmarshal(&c); err != nil { - logrus.Debug("couldn't load config...") - logrus.Error(err) - } - lvl, err := logrus.ParseLevel(c.Debuglevel) - if err != nil { - logrus.Error(err) - } - logrus.SetLevel(lvl) - return &c -} diff --git a/core/pipeline_record.go b/core/pipeline_record.go index f0adee8..748b725 100644 --- a/core/pipeline_record.go +++ b/core/pipeline_record.go @@ -19,13 +19,17 @@ func NewRecordPipeline(p Publisher, s Tracker, netChan chan interface{}, serialC transSer := flow.NewFlatMap(transformSerFunc, 1) //flowDelay := flow.NewMap(delayFunc(), 1) flowStore := flow.NewMap(storeFunc(s), 1) + + dataSanitizer := flow.NewMap(replaySanitizeFunc(), 1) + flowReorder := NewRearranger() + flowJson := flow.NewMap(jsonFunc, 1) sinkPub := newPublishSink(p) // wire up and execute demux := flow.Merge(collNet.Via(transNet), collSer.Via(transSer)) //go demux.Via(flowDelay).Via(flowStore).Via(flowJson).To(sinkPub) - go demux.Via(flowStore).Via(flowJson).To(sinkPub) + go demux.Via(flowStore).Via(dataSanitizer).Via(flowReorder).Via(flowJson).To(sinkPub) return &pipelineRecord{} } @@ -45,7 +49,7 @@ func storeFunc(s Tracker) flow.MapFunc { } logrus.Debugf("%-14v %-40s %-40s %v %v", sd.Source(), sd.Timestamp.Format(time.RFC3339Nano), sd.Servertime.Format(time.RFC3339Nano), sd.Position, sd.Orientation) - + //runtime.Gosched() return sd } } diff --git a/core/pipeline_replay.go b/core/pipeline_replay.go index 83759e0..e534643 100644 --- a/core/pipeline_replay.go +++ b/core/pipeline_replay.go @@ -6,6 +6,7 @@ import ( ext "github.com/reugn/go-streams/extension" "github.com/reugn/go-streams/flow" "github.com/sirupsen/logrus" + "runtime" "sort" "sync" "time" @@ -16,7 +17,7 @@ type pipelineReplay struct{} func NewReplayPipeline(p Publisher, t *Tracking) *pipelineReplay { // set pipeline up and wire it together collNet := ext.NewChanSource(channelFromTracking(t)) - dataSanitizer := flow.NewMap(replaySanitizeFunc(), 8) + dataSanitizer := flow.NewMap(replaySanitizeFunc(), 1) flowReorder := NewRearranger() flowJson := flow.NewMap(jsonFunc, 1) @@ -40,17 +41,21 @@ func channelFromTracking(t *Tracking) chan interface{} { ch := make(chan interface{}) sort.Slice(t.Data, func(i, j int) bool { return t.Data[i].Servertime.Before(t.Data[j].Servertime) }) go func() { - lastTimestamp := t.Data[0].Servertime - //lastTimestamp := t.Data[len(t.Data)-1].Servertime - //for i := len(t.Data)-1; i >= 0; i-- { - for i := 0; i <= len(t.Data)-1; i++ { - sleeps := t.Data[i].Servertime.Sub(lastTimestamp) - lastTimestamp = t.Data[i].Servertime - logrus.Traceln("simulation original stream: waiting ->", sleeps) - time.Sleep(sleeps) - ch <- t.Data[i] - } - logrus.Infoln("replay: pushed all tracking data to pipeline") + lastTs := t.Data[0].Servertime.UnixNano() + lastTsNow := time.Now().UTC().UnixNano() + i := 0 + for i <= len(t.Data)-1 { + durationSinceLastEvent := t.Data[i].Servertime.UnixNano() - lastTs + timeCounter := time.Now().UTC().UnixNano() - lastTsNow + if timeCounter >= durationSinceLastEvent { + ch <- &(t.Data[i]) + lastTs = t.Data[i].Servertime.UnixNano() + lastTsNow = time.Now().UTC().UnixNano() + i++ + } + } + + logrus.Infoln("replay: pushed all tracking data to pipeline") }() return ch } @@ -60,7 +65,7 @@ func replaySanitizeFunc() flow.MapFunc { var lastTimeOffsetUblox int64 return func(i interface{}) interface{} { - sd := i.(SensorData) + sd := i.(*SensorData) if !(sd.Timestamp.IsZero() || sd.Timestamp.Nanosecond() == 0) { lastOffset := sd.Servertime.UnixNano() - sd.Timestamp.UnixNano() @@ -100,13 +105,13 @@ func NewRearranger() *rearranger { } type rearranger struct { - sync.Mutex - queue *flow.PriorityQueue - in chan interface{} - out chan interface{} - done chan struct{} - startTimeNano int64 - startTimeNanoNow int64 + sync.RWMutex + queue *flow.PriorityQueue + in chan interface{} + out chan interface{} + done chan struct{} + lastTimestamp int64 + lastTimeNanoNow int64 } // Verify rearranger satisfies the Flow interface. @@ -140,14 +145,16 @@ func (r *rearranger) transmit(inlet streams.Inlet) { func (r *rearranger) receive() { for elem := range r.in { ts := r.timestamp(elem) - if r.startTimeNano == 0 { - r.startTimeNano = ts //- (500 * 1e+6) // Delay - r.startTimeNanoNow = time.Now().UTC().UnixNano() - } + //if r.lastTimestamp == 0 { + // r.lastTimestamp = ts //- (500 * 1e+6) // Delay + // r.lastTimeNanoNow = time.Now().UTC().UnixNano() + //} item := flow.NewItem(elem, ts, 0) r.Lock() heap.Push(r.queue, item) r.Unlock() + logrus.Debugln("item recieved.") + runtime.Gosched() } close(r.done) close(r.out) @@ -156,29 +163,19 @@ func (r *rearranger) receive() { // emit pops data from ordered priority queue func (r *rearranger) emit() { for { - if r.startTimeNano == 0 { + if r.queue.Len() <= 10 { continue } + tnow := time.Now().UTC().UnixNano() + logrus.Debugln("popping item: ") - if r.queue.Len() <= 0 { - continue - } r.Lock() - durationSinceStartItem := r.queue.Head().Msg.(SensorData).Timestamp.UnixNano() - r.startTimeNano - durationSinceStartNow := time.Now().UTC().UnixNano() - r.startTimeNanoNow - (100 * 1e+6) - if durationSinceStartNow >= durationSinceStartItem { - logrus.Debugln("pqueue size: ", r.queue.Len()) - logrus.Debugln("time: ", - time.Duration(durationSinceStartNow), - time.Duration(durationSinceStartItem), - time.Duration(durationSinceStartNow)-time.Duration(durationSinceStartItem)) - - item := heap.Pop(r.queue).(*flow.Item) - v := item.Msg.(SensorData) - r.out <- v - } + item := heap.Pop(r.queue).(*flow.Item) r.Unlock() - + v := item.Msg.(*SensorData) + r.out <- v + r.lastTimestamp = v.Timestamp.UnixNano() + r.lastTimeNanoNow = tnow select { case <-r.done: return @@ -188,6 +185,6 @@ func (r *rearranger) emit() { } func (r *rearranger) timestamp(elem interface{}) int64 { - v := elem.(SensorData) + v := elem.(*SensorData) return v.Timestamp.UnixNano() }