package core import ( ext "github.com/reugn/go-streams/extension" "github.com/reugn/go-streams/flow" "github.com/sirupsen/logrus" "reflect" ) type pipelineX struct { collNet *ext.ChanSource collSer *ext.ChanSource transNet *flow.FlatMap transSer *flow.FlatMap flowDelay *flow.Map flowStore *flow.Map flowJson *flow.Map sinkPub *publishSink } func NewPipelineX(p Publisher, s Tracker, netChan chan interface{}, serialChan chan interface{}) *pipelineX { // set pipeline up and wire it together collNet := ext.NewChanSource(netChan) collSer := ext.NewChanSource(serialChan) transNet := flow.NewFlatMap(transformNetFunc, 8) transSer := flow.NewFlatMap(transformSerFunc, 8) flowDelay := flow.NewMap(delayFunc(), 8) flowStore := flow.NewMap(storeFunc(s), 1) flowJson := flow.NewMap(jsonFunc, 8) sinkPub := newPublishSink(p) // wire up and execute demux := flow.Merge(collNet.Via(transNet), collSer.Via(transSer)) go demux.Via(flowDelay).Via(flowStore).Via(flowJson).To(sinkPub) return &pipelineX{} } func (p *pipelineX) Run() { } func jsonFunc(i interface{}) interface{} { return i } func storeFunc(s Tracker) flow.MapFunc { return func(i interface{}) interface{} { if v, ok := i.(*SensorData); ok { if v == nil { logrus.Trace(v, ok) } else { s.Put(*v) } } else { logrus.Trace(i) logrus.Trace(reflect.TypeOf(i)) panic("pipeline storeFunc: wrong data type") } return i } } type timeDelay struct { offsets map[sourceId]int } func delayFunc() flow.MapFunc { //td := &timeDelay{} return func(i interface{}) interface{} { return i } } func transformNetFunc(i interface{}) []interface{} { logrus.Traceln("transform TCP data...") var returnSlice []interface{} if b, ok := i.([]byte); ok { sd, err := ConvertByteSensorData(b) if err != nil { logrus.Errorln("error converting byte message:", err) } if sd != nil { return append(returnSlice, sd) } } logrus.Errorln("wrong data type. expected []byte data") return nil } func transformSerFunc(i interface{}) []interface{} { logrus.Traceln("transform SERIAL data...") var returnSlice []interface{} sd, err := ConvertUbxSensorData(i) if err != nil { logrus.Errorln("error converting ubx message:", err) return nil } if sd == nil { return nil } return append(returnSlice, sd) } type publishSink struct { in chan interface{} p Publisher } func newPublishSink(p Publisher) *publishSink { sink := &publishSink{make(chan interface{}), p} sink.init() return sink } func (ps *publishSink) In() chan<- interface{} { return ps.in } func (ps *publishSink) init() { go func() { logrus.Trace("publish sink running") for elem := range ps.in { if v, ok := elem.(string); ok { ps.p.Publish(v) } else { logrus.Debugln("not publishing. wrong format", elem) //reflect.TypeOf(elem) } } logrus.Trace("publish sink stopped") }() }