some backend fixes

This commit is contained in:
Timo Volkmann 2021-01-02 16:44:34 +01:00
parent 6ded3ec741
commit ca73b7c14e
3 changed files with 38 additions and 11 deletions

View File

@ -12,11 +12,21 @@ import (
"time" "time"
) )
type pipelineReplay struct{} type pipelineReplay struct {
stopChan chan struct{}
replayChan chan interface{}
}
func (r pipelineReplay) Stop() {
r.stopChan <- struct{}{}
close(r.replayChan)
}
func NewReplayPipeline(p Publisher, t *Tracking) *pipelineReplay { func NewReplayPipeline(p Publisher, t *Tracking) *pipelineReplay {
r := &pipelineReplay{make(chan struct{}), nil}
// set pipeline up and wire it together // set pipeline up and wire it together
collNet := ext.NewChanSource(channelFromTracking(t)) r.replayChan = r.channelFromTracking(t)
collNet := ext.NewChanSource(r.replayChan)
dataSanitizer := flow.NewMap(replaySanitizeFunc(), 1) dataSanitizer := flow.NewMap(replaySanitizeFunc(), 1)
//flowReorder := NewRearranger() //flowReorder := NewRearranger()
@ -26,7 +36,7 @@ func NewReplayPipeline(p Publisher, t *Tracking) *pipelineReplay {
// wire up and execute // wire up and execute
//go collNet.Via(dataSanitizer).Via(flowJson).To(sinkPub) //go collNet.Via(dataSanitizer).Via(flowJson).To(sinkPub)
go collNet.Via(dataSanitizer).Via(flowJson).To(sinkPub) go collNet.Via(dataSanitizer).Via(flowJson).To(sinkPub)
return &pipelineReplay{} return r
} }
func logFunc() flow.MapFunc { func logFunc() flow.MapFunc {
@ -37,18 +47,24 @@ func logFunc() flow.MapFunc {
} }
} }
func channelFromTracking(t *Tracking) chan interface{} { func (p *pipelineReplay) channelFromTracking(t *Tracking) chan interface{} {
ch := make(chan interface{}) ch := make(chan interface{})
sort.Slice(t.Data, func(i, j int) bool { return t.Data[i].Servertime.Before(t.Data[j].Servertime) }) sort.Slice(t.Data, func(i, j int) bool { return t.Data[i].Servertime.Before(t.Data[j].Servertime) })
go func() { go func() {
lastTs := t.Data[0].Servertime.UnixNano() lastTs := t.Data[0].Servertime.UnixNano()
lastTsNow := time.Now().UTC().UnixNano() lastTsNow := time.Now().UTC().UnixNano()
i := 0 i := 0
br:
for i <= len(t.Data)-1 { for i <= len(t.Data)-1 {
durationSinceLastEvent := t.Data[i].Servertime.UnixNano() - lastTs durationSinceLastEvent := t.Data[i].Servertime.UnixNano() - lastTs
timeCounter := time.Now().UTC().UnixNano() - lastTsNow timeCounter := time.Now().UTC().UnixNano() - lastTsNow
if timeCounter >= durationSinceLastEvent { if timeCounter >= durationSinceLastEvent {
ch <- &(t.Data[i]) select {
case <-p.stopChan:
logrus.Debugln("replay stopped")
break br
case ch <- &(t.Data[i]):
}
logrus.Traceln("replay tracking: ", t.Data[i]) logrus.Traceln("replay tracking: ", t.Data[i])
lastTs = t.Data[i].Servertime.UnixNano() lastTs = t.Data[i].Servertime.UnixNano()
lastTsNow = time.Now().UTC().UnixNano() lastTsNow = time.Now().UTC().UnixNano()
@ -56,7 +72,7 @@ func channelFromTracking(t *Tracking) chan interface{} {
} }
} }
logrus.Infoln("replay: pushed all tracking data to pipeline") logrus.Infoln("replay: tracking replay finished")
}() }()
return ch return ch
} }

View File

@ -45,9 +45,10 @@ type Recorder interface {
} }
type trackingService struct { type trackingService struct {
opMode OpMode opMode OpMode
tracking *Tracking tracking *Tracking
pipeline *pipelineRecord //pipeline *pipelineRecord
replaypipe *pipelineReplay
collectors []Collector collectors []Collector
store Storer store Storer
publisher Publisher publisher Publisher
@ -200,6 +201,10 @@ func (t *trackingService) StopAll() (*TrackingMetadata, error) {
for _, e := range t.collectors { for _, e := range t.collectors {
e.Stop() e.Stop()
} }
if t.replaypipe != nil {
t.replaypipe.Stop()
t.replaypipe = nil
}
// let buffer run empty after collectors stopped // let buffer run empty after collectors stopped
time.Sleep(time.Millisecond * 5) time.Sleep(time.Millisecond * 5)
t.publisher.SetStreaming(false) t.publisher.SetStreaming(false)
@ -212,7 +217,7 @@ func (t *trackingService) StopAll() (*TrackingMetadata, error) {
} }
func (t *trackingService) LoadTracking(trackingId uuid.UUID) (*Tracking, error) { func (t *trackingService) LoadTracking(trackingId uuid.UUID) (*Tracking, error) {
if !(t.opMode == REPLAY || t.opMode == STOPPED) { if !(t.opMode == REPLAY || t.opMode == STOPPED || t.opMode == LIVE || t.opMode == RECORDING) {
t.StopAll() t.StopAll()
} }
logrus.Info("LOAD TRACKING from database") logrus.Info("LOAD TRACKING from database")
@ -221,7 +226,7 @@ func (t *trackingService) LoadTracking(trackingId uuid.UUID) (*Tracking, error)
return nil, err return nil, err
} }
t.safelyReplaceTracking(*tracking) t.safelyReplaceTracking(*tracking)
NewReplayPipeline(t.publisher, t.tracking) t.replaypipe = NewReplayPipeline(t.publisher, t.tracking)
t.publisher.SetStreaming(true) t.publisher.SetStreaming(true)
t.opMode = REPLAY t.opMode = REPLAY
return t.tracking, nil return t.tracking, nil

View File

@ -18,6 +18,7 @@ func CreateServer(s core.Service, sub core.Subscriber, c *core.Configuration) {
// Application Main Page // Application Main Page
app.Get("/", fiberHomeHandler) app.Get("/", fiberHomeHandler)
app.Get("/tracking", fiberTrackingHandler)
// Websocket // Websocket
app.Get("/ws", websocket.New(createFiberWebsocketHandler(sub))) app.Get("/ws", websocket.New(createFiberWebsocketHandler(sub)))
@ -275,3 +276,8 @@ func fiberHomeHandler(c *fiber.Ctx) error {
// Render index template // Render index template
return c.Render("index", "ws://"+c.Hostname()+"/ws") return c.Render("index", "ws://"+c.Hostname()+"/ws")
} }
func fiberTrackingHandler(c *fiber.Ctx) error {
// Render index template
return c.Render("replayFull", nil)
}