package core import ( "container/heap" "github.com/reugn/go-streams" ext "github.com/reugn/go-streams/extension" "github.com/reugn/go-streams/flow" "github.com/sirupsen/logrus" "sort" "sync" "time" ) type pipelineReplay struct{} func NewReplayPipeline(p Publisher, t *Tracking) *pipelineReplay { // set pipeline up and wire it together collNet := ext.NewChanSource(channelFromTracking(t)) dataSanitizer := flow.NewMap(replaySanitizeFunc(), 8) flowReorder := NewRearranger() flowJson := flow.NewMap(jsonFunc, 1) sinkPub := newPublishSink(p) // wire up and execute //go collNet.Via(dataSanitizer).Via(flowJson).To(sinkPub) go collNet.Via(dataSanitizer).Via(flowReorder).Via(flowJson).To(sinkPub) return &pipelineReplay{} } func logFunc() flow.MapFunc { return func(i interface{}) interface{} { //s := i.(SensorData) logrus.Debugln("logfunc", i) return i } } func channelFromTracking(t *Tracking) chan interface{} { ch := make(chan interface{}) sort.Slice(t.Data, func(i, j int) bool { return t.Data[i].Servertime.Before(t.Data[j].Servertime) }) go func() { lastTimestamp := t.Data[0].Servertime //lastTimestamp := t.Data[len(t.Data)-1].Servertime //for i := len(t.Data)-1; i >= 0; i-- { for i := 0; i <= len(t.Data)-1; i++ { sleeps := t.Data[i].Servertime.Sub(lastTimestamp) lastTimestamp = t.Data[i].Servertime logrus.Traceln("simulation original stream: waiting ->", sleeps) time.Sleep(sleeps) ch <- t.Data[i] } logrus.Infoln("replay: pushed all tracking data to pipeline") }() return ch } func replaySanitizeFunc() flow.MapFunc { var lastTimeOffsetIphone int64 var lastTimeOffsetUblox int64 return func(i interface{}) interface{} { sd := i.(SensorData) if !(sd.Timestamp.IsZero() || sd.Timestamp.Nanosecond() == 0) { lastOffset := sd.Servertime.UnixNano() - sd.Timestamp.UnixNano() if sd.Source() == SOURCE_TCP { lastTimeOffsetIphone = lastOffset } if sd.Source() == SOURCE_SERIAL { lastTimeOffsetUblox = lastOffset } } else { var lastOff int64 if sd.Source() == SOURCE_TCP { lastOff = lastTimeOffsetIphone } if sd.Source() == SOURCE_SERIAL { lastOff = lastTimeOffsetUblox } sd.Timestamp = sd.Servertime.Add(time.Duration(lastOff)) } if sd.Servertime.Before(time.Unix(1608422400, 0)) && sd.Speed != 0 && sd.Source() == SOURCE_SERIAL { sd.Speed = sd.Speed * 3.6 * 3.6 } return sd } } func NewRearranger() *rearranger { rearran := &rearranger{ queue: &flow.PriorityQueue{}, in: make(chan interface{}), out: make(chan interface{}), done: make(chan struct{}), } go rearran.receive() go rearran.emit() return rearran } type rearranger struct { sync.Mutex queue *flow.PriorityQueue in chan interface{} out chan interface{} done chan struct{} startTimeNano int64 startTimeNanoNow int64 } // Verify rearranger satisfies the Flow interface. var _ streams.Flow = (*rearranger)(nil) func (r *rearranger) In() chan<- interface{} { return r.in } func (r *rearranger) Out() <-chan interface{} { return r.out } func (r *rearranger) Via(flow streams.Flow) streams.Flow { go r.transmit(flow) return flow } func (r *rearranger) To(sink streams.Sink) { r.transmit(sink) } // submit emitted windows to the next Inlet func (r *rearranger) transmit(inlet streams.Inlet) { for elem := range r.Out() { inlet.In() <- elem } close(inlet.In()) } func (r *rearranger) receive() { for elem := range r.in { ts := r.timestamp(elem) if r.startTimeNano == 0 { r.startTimeNano = ts //- (500 * 1e+6) // Delay r.startTimeNanoNow = time.Now().UTC().UnixNano() } item := flow.NewItem(elem, ts, 0) r.Lock() heap.Push(r.queue, item) r.Unlock() } close(r.done) close(r.out) } // emit pops data from ordered priority queue func (r *rearranger) emit() { for { if r.startTimeNano == 0 { continue } if r.queue.Len() <= 0 { continue } r.Lock() durationSinceStartItem := r.queue.Head().Msg.(SensorData).Timestamp.UnixNano() - r.startTimeNano durationSinceStartNow := time.Now().UTC().UnixNano() - r.startTimeNanoNow - (100 * 1e+6) if durationSinceStartNow >= durationSinceStartItem { logrus.Debugln("pqueue size: ", r.queue.Len()) logrus.Debugln("time: ", time.Duration(durationSinceStartNow), time.Duration(durationSinceStartItem), time.Duration(durationSinceStartNow)-time.Duration(durationSinceStartItem)) item := heap.Pop(r.queue).(*flow.Item) v := item.Msg.(SensorData) r.out <- v } r.Unlock() select { case <-r.done: return default: } } } func (r *rearranger) timestamp(elem interface{}) int64 { v := elem.(SensorData) return v.Timestamp.UnixNano() }