diff --git a/core/pipeline_replay.go b/core/pipeline_replay.go index 53c0208..c9d2e89 100644 --- a/core/pipeline_replay.go +++ b/core/pipeline_replay.go @@ -23,9 +23,13 @@ func (p *pipelineReplay) Stop() { logrus.Debugln("replay channel already closed") } }() - logrus.Debugln("send stop signal") - p.stopChan <- struct{}{} - logrus.Debugln("stop signal sent") + logrus.Debugln("send stop signal...") + select { + case p.stopChan <- struct{}{}: + logrus.Debugln("stop signal sent") + //default: + // logrus.Debugln("stop signal skipped") + } } func NewReplayPipeline(p Publisher, t *Tracking) *pipelineReplay { @@ -59,7 +63,7 @@ func (p *pipelineReplay) channelFromTracking(t *Tracking) chan interface{} { if timeCounter >= durationSinceLastEvent { select { case <-p.stopChan: - logrus.Debugln("replay stopped") + logrus.Debugln("received stop signal: replay stopped") break br case ch <- &(t.Data[i]): } @@ -72,7 +76,7 @@ func (p *pipelineReplay) channelFromTracking(t *Tracking) chan interface{} { logrus.Infoln("replay: tracking replay finished") select { case <-p.stopChan: - logrus.Debugln("replay pipeline closed") + logrus.Debugln("received stop signal: replay pipeline closed") } close(p.replayChan) }() diff --git a/core/service.go b/core/service.go index b7b0410..ffe74ae 100644 --- a/core/service.go +++ b/core/service.go @@ -243,8 +243,12 @@ func (t *TrackingService) LoadTracking(trackingId uuid.UUID, replay bool) (*Trac } t.safelyReplaceTracking(*tracking) if t.replaypipe != nil { - t.replaypipe.Stop() t.replaypipe = nil + select { + case <-t.replaypipe.stopChan: + logrus.Warnln("blocking channel closed") + default: + } } t.replaypipe = NewReplayPipeline(t.publisher, t.tracking) t.publisher.SetStreaming(true)