gyrogpsc/core/pipeline_replay.go

202 lines
4.6 KiB
Go

package core
import (
"container/heap"
"github.com/reugn/go-streams"
ext "github.com/reugn/go-streams/extension"
"github.com/reugn/go-streams/flow"
"github.com/sirupsen/logrus"
"runtime"
"sort"
"sync"
"time"
)
type pipelineReplay struct {
stopChan chan struct{}
replayChan chan interface{}
}
func (r pipelineReplay) Stop() {
r.stopChan <- struct{}{}
close(r.replayChan)
}
func NewReplayPipeline(p Publisher, t *Tracking) *pipelineReplay {
r := &pipelineReplay{make(chan struct{}), nil}
// set pipeline up and wire it together
r.replayChan = r.channelFromTracking(t)
collNet := ext.NewChanSource(r.replayChan)
dataSanitizer := flow.NewMap(replaySanitizeFunc(), 1)
//flowReorder := NewRearranger()
flowJson := flow.NewMap(jsonFunc, 1)
sinkPub := newPublishSink(p)
// wire up and execute
//go collNet.Via(dataSanitizer).Via(flowJson).To(sinkPub)
go collNet.Via(dataSanitizer).Via(flowJson).To(sinkPub)
return r
}
func (p *pipelineReplay) channelFromTracking(t *Tracking) chan interface{} {
ch := make(chan interface{})
sort.Slice(t.Data, func(i, j int) bool { return t.Data[i].Servertime.Before(t.Data[j].Servertime) })
go func() {
lastTs := t.Data[0].Servertime.UnixNano()
lastTsNow := time.Now().UTC().UnixNano()
i := 0
br:
for i <= len(t.Data)-1 {
durationSinceLastEvent := t.Data[i].Servertime.UnixNano() - lastTs
timeCounter := time.Now().UTC().UnixNano() - lastTsNow
if timeCounter >= durationSinceLastEvent {
select {
case <-p.stopChan:
logrus.Debugln("replay stopped")
break br
case ch <- &(t.Data[i]):
}
logrus.Traceln("replay tracking: ", t.Data[i])
lastTs = t.Data[i].Servertime.UnixNano()
lastTsNow = time.Now().UTC().UnixNano()
i++
}
}
logrus.Infoln("replay: tracking replay finished")
}()
return ch
}
func replaySanitizeFunc() flow.MapFunc {
var lastTimeOffsetIphone int64
var lastTimeOffsetUblox int64
return func(i interface{}) interface{} {
sd := i.(*SensorData)
if !(sd.Timestamp.IsZero() || sd.Timestamp.Nanosecond() == 0) {
lastOffset := sd.Servertime.UnixNano() - sd.Timestamp.UnixNano()
if sd.Source() == SOURCE_TCP {
lastTimeOffsetIphone = lastOffset
}
if sd.Source() == SOURCE_SERIAL {
lastTimeOffsetUblox = lastOffset
}
} else {
var lastOff int64
if sd.Source() == SOURCE_TCP {
lastOff = lastTimeOffsetIphone
}
if sd.Source() == SOURCE_SERIAL {
lastOff = lastTimeOffsetUblox
}
sd.Timestamp = sd.Servertime.Add(time.Duration(lastOff))
}
if sd.Servertime.Before(time.Unix(1608422400, 0)) && sd.Speed != 0 && sd.Source() == SOURCE_SERIAL {
sd.Speed = sd.Speed * 3.6
}
return sd
}
}
// The Rearranger is not used but kept, for later experiments.
func NewRearranger() *rearranger {
rearran := &rearranger{
queue: &flow.PriorityQueue{},
in: make(chan interface{}),
out: make(chan interface{}),
done: make(chan struct{}),
}
go rearran.receive()
go rearran.emit()
return rearran
}
type rearranger struct {
sync.RWMutex
queue *flow.PriorityQueue
in chan interface{}
out chan interface{}
done chan struct{}
lastTimestamp int64
lastTimeNanoNow int64
}
// Verify rearranger satisfies the Flow interface.
var _ streams.Flow = (*rearranger)(nil)
func (r *rearranger) In() chan<- interface{} {
return r.in
}
func (r *rearranger) Out() <-chan interface{} {
return r.out
}
func (r *rearranger) Via(flow streams.Flow) streams.Flow {
go r.transmit(flow)
return flow
}
func (r *rearranger) To(sink streams.Sink) {
r.transmit(sink)
}
// submit emitted windows to the next Inlet
func (r *rearranger) transmit(inlet streams.Inlet) {
for elem := range r.Out() {
inlet.In() <- elem
}
close(inlet.In())
}
func (r *rearranger) receive() {
for elem := range r.in {
ts := r.timestamp(elem)
//if r.lastTimestamp == 0 {
// r.lastTimestamp = ts //- (500 * 1e+6) // Delay
// r.lastTimeNanoNow = time.Now().UTC().UnixNano()
//}
item := flow.NewItem(elem, ts, 0)
r.Lock()
heap.Push(r.queue, item)
r.Unlock()
logrus.Debugln("item recieved.")
runtime.Gosched()
}
close(r.done)
close(r.out)
}
// emit pops data from ordered priority queue
func (r *rearranger) emit() {
for {
if r.queue.Len() <= 10 {
continue
}
tnow := time.Now().UTC().UnixNano()
logrus.Debugln("popping item: ")
r.Lock()
item := heap.Pop(r.queue).(*flow.Item)
r.Unlock()
v := item.Msg.(*SensorData)
r.out <- v
r.lastTimestamp = v.Timestamp.UnixNano()
r.lastTimeNanoNow = tnow
select {
case <-r.done:
return
default:
}
}
}
func (r *rearranger) timestamp(elem interface{}) int64 {
v := elem.(*SensorData)
return v.Timestamp.UnixNano()
}