package core import ( "container/heap" "github.com/reugn/go-streams" ext "github.com/reugn/go-streams/extension" "github.com/reugn/go-streams/flow" "github.com/sirupsen/logrus" "runtime" "sort" "sync" "time" ) type pipelineReplay struct { stopChan chan struct{} replayChan chan interface{} } func (r pipelineReplay) Stop() { r.stopChan <- struct{}{} close(r.replayChan) } func NewReplayPipeline(p Publisher, t *Tracking) *pipelineReplay { r := &pipelineReplay{make(chan struct{}), nil} // set pipeline up and wire it together r.replayChan = r.channelFromTracking(t) collNet := ext.NewChanSource(r.replayChan) dataSanitizer := flow.NewMap(replaySanitizeFunc(), 1) //flowReorder := NewRearranger() flowJson := flow.NewMap(jsonFunc, 1) sinkPub := newPublishSink(p) // wire up and execute //go collNet.Via(dataSanitizer).Via(flowJson).To(sinkPub) go collNet.Via(dataSanitizer).Via(flowJson).To(sinkPub) return r } func logFunc() flow.MapFunc { return func(i interface{}) interface{} { //s := i.(SensorData) logrus.Debugln("logfunc", i) return i } } func (p *pipelineReplay) channelFromTracking(t *Tracking) chan interface{} { ch := make(chan interface{}) sort.Slice(t.Data, func(i, j int) bool { return t.Data[i].Servertime.Before(t.Data[j].Servertime) }) go func() { lastTs := t.Data[0].Servertime.UnixNano() lastTsNow := time.Now().UTC().UnixNano() i := 0 br: for i <= len(t.Data)-1 { durationSinceLastEvent := t.Data[i].Servertime.UnixNano() - lastTs timeCounter := time.Now().UTC().UnixNano() - lastTsNow if timeCounter >= durationSinceLastEvent { select { case <-p.stopChan: logrus.Debugln("replay stopped") break br case ch <- &(t.Data[i]): } logrus.Traceln("replay tracking: ", t.Data[i]) lastTs = t.Data[i].Servertime.UnixNano() lastTsNow = time.Now().UTC().UnixNano() i++ } } logrus.Infoln("replay: tracking replay finished") }() return ch } func replaySanitizeFunc() flow.MapFunc { var lastTimeOffsetIphone int64 var lastTimeOffsetUblox int64 return func(i interface{}) interface{} { sd := i.(*SensorData) if !(sd.Timestamp.IsZero() || sd.Timestamp.Nanosecond() == 0) { lastOffset := sd.Servertime.UnixNano() - sd.Timestamp.UnixNano() if sd.Source() == SOURCE_TCP { lastTimeOffsetIphone = lastOffset } if sd.Source() == SOURCE_SERIAL { lastTimeOffsetUblox = lastOffset } } else { var lastOff int64 if sd.Source() == SOURCE_TCP { lastOff = lastTimeOffsetIphone } if sd.Source() == SOURCE_SERIAL { lastOff = lastTimeOffsetUblox } sd.Timestamp = sd.Servertime.Add(time.Duration(lastOff)) } if sd.Servertime.Before(time.Unix(1608422400, 0)) && sd.Speed != 0 && sd.Source() == SOURCE_SERIAL { sd.Speed = sd.Speed * 3.6 } return sd } } func NewRearranger() *rearranger { rearran := &rearranger{ queue: &flow.PriorityQueue{}, in: make(chan interface{}), out: make(chan interface{}), done: make(chan struct{}), } go rearran.receive() go rearran.emit() return rearran } type rearranger struct { sync.RWMutex queue *flow.PriorityQueue in chan interface{} out chan interface{} done chan struct{} lastTimestamp int64 lastTimeNanoNow int64 } // Verify rearranger satisfies the Flow interface. var _ streams.Flow = (*rearranger)(nil) func (r *rearranger) In() chan<- interface{} { return r.in } func (r *rearranger) Out() <-chan interface{} { return r.out } func (r *rearranger) Via(flow streams.Flow) streams.Flow { go r.transmit(flow) return flow } func (r *rearranger) To(sink streams.Sink) { r.transmit(sink) } // submit emitted windows to the next Inlet func (r *rearranger) transmit(inlet streams.Inlet) { for elem := range r.Out() { inlet.In() <- elem } close(inlet.In()) } func (r *rearranger) receive() { for elem := range r.in { ts := r.timestamp(elem) //if r.lastTimestamp == 0 { // r.lastTimestamp = ts //- (500 * 1e+6) // Delay // r.lastTimeNanoNow = time.Now().UTC().UnixNano() //} item := flow.NewItem(elem, ts, 0) r.Lock() heap.Push(r.queue, item) r.Unlock() logrus.Debugln("item recieved.") runtime.Gosched() } close(r.done) close(r.out) } // emit pops data from ordered priority queue func (r *rearranger) emit() { for { if r.queue.Len() <= 10 { continue } tnow := time.Now().UTC().UnixNano() logrus.Debugln("popping item: ") r.Lock() item := heap.Pop(r.queue).(*flow.Item) r.Unlock() v := item.Msg.(*SensorData) r.out <- v r.lastTimestamp = v.Timestamp.UnixNano() r.lastTimeNanoNow = tnow select { case <-r.done: return default: } } } func (r *rearranger) timestamp(elem interface{}) int64 { v := elem.(*SensorData) return v.Timestamp.UnixNano() }