package core import ( ext "github.com/reugn/go-streams/extension" "github.com/reugn/go-streams/flow" "github.com/sirupsen/logrus" ) type pipelineX struct { } func NewPipelineX(p Publisher, s Tracker, netChan chan interface{}, serialChan chan interface{}) *pipelineX { // set pipeline up and wire it together collNet := ext.NewChanSource(netChan) collSer := ext.NewChanSource(serialChan) transNet := flow.NewFlatMap(transformNetFunc, 1) transSer := flow.NewFlatMap(transformSerFunc, 1) flowDelay := flow.NewMap(delayFunc(), 8) flowStore := flow.NewMap(storeFunc(s), 1) sinkPub := newPublishSink(p) demux := flow.Merge(collNet.Via(transNet), collSer.Via(transSer)) demux.Via(flowDelay).Via(flowStore).To(sinkPub) return &pipelineX{} } func storeFunc(s Tracker) flow.MapFunc { return func(i interface{}) interface{} { if v, ok := i.(SensorData); ok { s.Put(v) } else { panic("pipeline storeFunc: wrong data type") } return i } } type timeDelay struct { offsets map[sourceId]int } func delayFunc() flow.MapFunc { //td := &timeDelay{} return func(i interface{}) interface{} { return i } } func transformNetFunc(i interface{}) []interface{} { var returnSlice []interface{} if b, ok := i.([]byte); ok { sd, err := ConvertByteSensorData(b) if err != nil { logrus.Errorln("error converting byte message:", err) } if sd != nil { return append(returnSlice, sd) } } logrus.Errorln("wrong data type. expected []byte data") return nil } func transformSerFunc(i interface{}) []interface{} { var returnSlice []interface{} sd, err := ConvertUbxSensorData(i) if err != nil { logrus.Errorln("error converting ubx message:", err) return nil } return append(returnSlice, sd) } type publishSink struct { in chan interface{} p Publisher } func newPublishSink(p Publisher) *publishSink { sink := &publishSink{nil, p} //sink.init() return sink } func (ps *publishSink) In() chan<- interface{} { return ps.in } func (ps *publishSink) init() { go func() { for elem := range ps.in { if v, ok := elem.(string); ok { ps.p.Publish(v) } else { logrus.Debugln("not publishing. wrong format", elem) break } } }() }