From ca73b7c14ef0461ab36b5650666960fefa3a3c17 Mon Sep 17 00:00:00 2001 From: Timo Volkmann Date: Sat, 2 Jan 2021 16:44:34 +0100 Subject: [PATCH] some backend fixes --- core/pipeline_replay.go | 28 ++++++++++++++++++++++------ core/service.go | 15 ++++++++++----- web/http.go | 6 ++++++ 3 files changed, 38 insertions(+), 11 deletions(-) diff --git a/core/pipeline_replay.go b/core/pipeline_replay.go index 449e7d8..5dcebbc 100644 --- a/core/pipeline_replay.go +++ b/core/pipeline_replay.go @@ -12,11 +12,21 @@ import ( "time" ) -type pipelineReplay struct{} +type pipelineReplay struct { + stopChan chan struct{} + replayChan chan interface{} +} + +func (r pipelineReplay) Stop() { + r.stopChan <- struct{}{} + close(r.replayChan) +} func NewReplayPipeline(p Publisher, t *Tracking) *pipelineReplay { + r := &pipelineReplay{make(chan struct{}), nil} // set pipeline up and wire it together - collNet := ext.NewChanSource(channelFromTracking(t)) + r.replayChan = r.channelFromTracking(t) + collNet := ext.NewChanSource(r.replayChan) dataSanitizer := flow.NewMap(replaySanitizeFunc(), 1) //flowReorder := NewRearranger() @@ -26,7 +36,7 @@ func NewReplayPipeline(p Publisher, t *Tracking) *pipelineReplay { // wire up and execute //go collNet.Via(dataSanitizer).Via(flowJson).To(sinkPub) go collNet.Via(dataSanitizer).Via(flowJson).To(sinkPub) - return &pipelineReplay{} + return r } func logFunc() flow.MapFunc { @@ -37,18 +47,24 @@ func logFunc() flow.MapFunc { } } -func channelFromTracking(t *Tracking) chan interface{} { +func (p *pipelineReplay) channelFromTracking(t *Tracking) chan interface{} { ch := make(chan interface{}) sort.Slice(t.Data, func(i, j int) bool { return t.Data[i].Servertime.Before(t.Data[j].Servertime) }) go func() { lastTs := t.Data[0].Servertime.UnixNano() lastTsNow := time.Now().UTC().UnixNano() i := 0 + br: for i <= len(t.Data)-1 { durationSinceLastEvent := t.Data[i].Servertime.UnixNano() - lastTs timeCounter := time.Now().UTC().UnixNano() - lastTsNow if timeCounter >= durationSinceLastEvent { - ch <- &(t.Data[i]) + select { + case <-p.stopChan: + logrus.Debugln("replay stopped") + break br + case ch <- &(t.Data[i]): + } logrus.Traceln("replay tracking: ", t.Data[i]) lastTs = t.Data[i].Servertime.UnixNano() lastTsNow = time.Now().UTC().UnixNano() @@ -56,7 +72,7 @@ func channelFromTracking(t *Tracking) chan interface{} { } } - logrus.Infoln("replay: pushed all tracking data to pipeline") + logrus.Infoln("replay: tracking replay finished") }() return ch } diff --git a/core/service.go b/core/service.go index dfeb885..3752628 100644 --- a/core/service.go +++ b/core/service.go @@ -45,9 +45,10 @@ type Recorder interface { } type trackingService struct { - opMode OpMode - tracking *Tracking - pipeline *pipelineRecord + opMode OpMode + tracking *Tracking + //pipeline *pipelineRecord + replaypipe *pipelineReplay collectors []Collector store Storer publisher Publisher @@ -200,6 +201,10 @@ func (t *trackingService) StopAll() (*TrackingMetadata, error) { for _, e := range t.collectors { e.Stop() } + if t.replaypipe != nil { + t.replaypipe.Stop() + t.replaypipe = nil + } // let buffer run empty after collectors stopped time.Sleep(time.Millisecond * 5) t.publisher.SetStreaming(false) @@ -212,7 +217,7 @@ func (t *trackingService) StopAll() (*TrackingMetadata, error) { } func (t *trackingService) LoadTracking(trackingId uuid.UUID) (*Tracking, error) { - if !(t.opMode == REPLAY || t.opMode == STOPPED) { + if !(t.opMode == REPLAY || t.opMode == STOPPED || t.opMode == LIVE || t.opMode == RECORDING) { t.StopAll() } logrus.Info("LOAD TRACKING from database") @@ -221,7 +226,7 @@ func (t *trackingService) LoadTracking(trackingId uuid.UUID) (*Tracking, error) return nil, err } t.safelyReplaceTracking(*tracking) - NewReplayPipeline(t.publisher, t.tracking) + t.replaypipe = NewReplayPipeline(t.publisher, t.tracking) t.publisher.SetStreaming(true) t.opMode = REPLAY return t.tracking, nil diff --git a/web/http.go b/web/http.go index 89592bb..e9de4c5 100644 --- a/web/http.go +++ b/web/http.go @@ -18,6 +18,7 @@ func CreateServer(s core.Service, sub core.Subscriber, c *core.Configuration) { // Application Main Page app.Get("/", fiberHomeHandler) + app.Get("/tracking", fiberTrackingHandler) // Websocket app.Get("/ws", websocket.New(createFiberWebsocketHandler(sub))) @@ -275,3 +276,8 @@ func fiberHomeHandler(c *fiber.Ctx) error { // Render index template return c.Render("index", "ws://"+c.Hostname()+"/ws") } + +func fiberTrackingHandler(c *fiber.Ctx) error { + // Render index template + return c.Render("replayFull", nil) +}