package core import ( "encoding/json" ext "github.com/reugn/go-streams/extension" "github.com/reugn/go-streams/flow" "github.com/sirupsen/logrus" "github.com/tidwall/pretty" "time" ) type pipelineX struct { collNet *ext.ChanSource collSer *ext.ChanSource transNet *flow.FlatMap transSer *flow.FlatMap flowDelay *flow.Map flowStore *flow.Map flowJson *flow.Map sinkPub *publishSink } func NewPipelineX(p Publisher, s Tracker, netChan chan interface{}, serialChan chan interface{}) *pipelineX { // set pipeline up and wire it together collNet := ext.NewChanSource(netChan) collSer := ext.NewChanSource(serialChan) transNet := flow.NewFlatMap(transformNetFunc, 8) transSer := flow.NewFlatMap(transformSerFunc, 8) flowDelay := flow.NewMap(delayFunc(), 8) flowStore := flow.NewMap(storeFunc(s), 8) //flowJson := flow.NewMap(jsonFunc, 8) sinkPub := newPublishSink(p) // wire up and execute demux := flow.Merge(collNet.Via(transNet), collSer.Via(transSer)) //go demux.Via(flowDelay).Via(flowStore).Via(flowJson).To(sinkPub) go demux.Via(flowDelay).Via(flowStore).To(sinkPub) return &pipelineX{} } func storeFunc(s Tracker) flow.MapFunc { return func(i interface{}) interface{} { var sd *SensorData if v, ok := i.(*SensorData); ok { sd = v } else { panic("unexpected data struct") } if (*sd == SensorData{} || sd == nil) { logrus.Info("empty data") } else { sd.Servertime = time.Now().UTC() s.Put(*sd) } logrus.Debugf("%-14v %-40s %-40s %v %v", sd.Source(), sd.Timestamp.Format(time.RFC3339Nano), sd.Servertime.Format(time.RFC3339Nano), sd.Position, sd.Orientation) data := map[string]interface{}{} if sd.Source() == SOURCE_TCP { data[string(SOURCE_TCP)] = *sd } if sd.Source() == SOURCE_SERIAL { data[string(SOURCE_SERIAL)] = *sd } jdata, err := json.Marshal(data) logrus.Traceln(string(pretty.Pretty(jdata))) if err != nil { logrus.Fatalln(err) } return string(jdata) } } type timeDelay struct { offsets map[SourceId]int } func delayFunc() flow.MapFunc { //td := &timeDelay{} return func(i interface{}) interface{} { return i } } func transformNetFunc(i interface{}) []interface{} { logrus.Traceln("transform TCP data...") var returnSlice []interface{} if b, ok := i.([]byte); ok { sd, err := ConvertByteSensorData(b) if err != nil { logrus.Errorln("error converting byte message:", err) } if sd != nil { return append(returnSlice, sd) } } logrus.Errorln("wrong data type. expected []byte data") return nil } func transformSerFunc(i interface{}) []interface{} { logrus.Traceln("transform SERIAL data...") var returnSlice []interface{} sd, err := ConvertUbxSensorData(i) if err != nil { logrus.Errorln("error converting ubx message:", err) return nil } if sd == nil { return nil } return append(returnSlice, sd) } type publishSink struct { in chan interface{} p Publisher } func newPublishSink(p Publisher) *publishSink { sink := &publishSink{make(chan interface{}), p} sink.init() return sink } func (ps *publishSink) In() chan<- interface{} { return ps.in } func (ps *publishSink) init() { go func() { logrus.Trace("publish sink running") for elem := range ps.in { if v, ok := elem.(string); ok { ps.p.Publish(v) } else { logrus.Debugln("not publishing. wrong format", elem) //reflect.TypeOf(elem) } } logrus.Trace("publish sink stopped") }() }