From b4a3b09fb151df109cbf3946d3493bdb4b7d3c9b Mon Sep 17 00:00:00 2001 From: Timo Volkmann Date: Mon, 11 Jan 2021 01:18:38 +0100 Subject: [PATCH] backend bugfixes --- core/pipeline_replay.go | 13 +++++++------ core/service.go | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/core/pipeline_replay.go b/core/pipeline_replay.go index c9d2e89..7f40d18 100644 --- a/core/pipeline_replay.go +++ b/core/pipeline_replay.go @@ -61,17 +61,18 @@ func (p *pipelineReplay) channelFromTracking(t *Tracking) chan interface{} { durationSinceLastEvent := t.Data[i].Servertime.UnixNano() - lastTs timeCounter := time.Now().UTC().UnixNano() - lastTsNow if timeCounter >= durationSinceLastEvent { - select { - case <-p.stopChan: - logrus.Debugln("received stop signal: replay stopped") - break br - case ch <- &(t.Data[i]): - } logrus.Traceln("replay tracking: ", t.Data[i]) + ch <- &(t.Data[i]) lastTs = t.Data[i].Servertime.UnixNano() lastTsNow = time.Now().UTC().UnixNano() i++ } + select { + case <-p.stopChan: + logrus.Debugln("received stop signal: replay stopped") + break br + default: + } } logrus.Infoln("replay: tracking replay finished") select { diff --git a/core/service.go b/core/service.go index ffe74ae..706a1b1 100644 --- a/core/service.go +++ b/core/service.go @@ -243,12 +243,12 @@ func (t *TrackingService) LoadTracking(trackingId uuid.UUID, replay bool) (*Trac } t.safelyReplaceTracking(*tracking) if t.replaypipe != nil { - t.replaypipe = nil select { case <-t.replaypipe.stopChan: logrus.Warnln("blocking channel closed") default: } + t.replaypipe = nil } t.replaypipe = NewReplayPipeline(t.publisher, t.tracking) t.publisher.SetStreaming(true)