diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 7ca8493..202b72a 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -22,25 +22,18 @@ - - - - - - - - - - - - - - - + + + + + + + + + - - + + - - + @@ -193,7 +186,14 @@ @@ -238,12 +238,17 @@ - true + + + + @@ -256,6 +261,10 @@ + + + + @@ -280,7 +289,7 @@ - + @@ -290,8 +299,8 @@ - - + + @@ -301,8 +310,8 @@ - - + + @@ -312,8 +321,8 @@ - - + + @@ -323,7 +332,7 @@ - + @@ -354,22 +363,29 @@ - + - + - + - + + + + + + + + \ No newline at end of file diff --git a/cmd/server/server.go b/cmd/server/server.go index 536320a..3b6bb8f 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -2,6 +2,7 @@ package main import ( "git.timovolkmann.de/gyrogpsc/core" + "log" ) const ( @@ -11,10 +12,15 @@ const ( ) func main() { + log.Println("setup dispatcher") dispatcher := core.NewDispatcher() - processor := core.NewPipeline(dispatcher, 20, 10000) + log.Println("initialize processing pipeline") + processor := core.NewPipeline(dispatcher, 50, 1000) + processor.Run() + log.Println("start data collectors") collectRoutines(processor) - core.NewHttpServer(dispatcher, HTTP_PORT) + log.Println("start http server") + core.HttpListenAndServe(dispatcher, HTTP_PORT) } func collectRoutines(proc core.Processor) { diff --git a/cmd/tcp_only/tcp_only.go b/cmd/tcp_only/tcp_only.go index b9383f1..5e21bac 100644 --- a/cmd/tcp_only/tcp_only.go +++ b/cmd/tcp_only/tcp_only.go @@ -13,7 +13,7 @@ func main() { dispatcher := core.NewDispatcher() processor := core.NewPipeline(dispatcher, 20, 10000) collectRoutines(processor) - core.NewHttpServer(dispatcher, HTTP_PORT) + core.HttpListenAndServe(dispatcher, HTTP_PORT) } func collectRoutines(proc core.Processor) { diff --git a/core/collectors.go b/core/collectors.go index b5a5b21..47146c4 100644 --- a/core/collectors.go +++ b/core/collectors.go @@ -66,7 +66,7 @@ func SerialCollector(proc Processor, serialPort string) { } port, err := serial.Open(serialPort, mode) if err != nil { - log.Fatalln(err) + log.Fatalln(err.Error()) } decoder := ublox.NewDecoder(port) @@ -74,20 +74,25 @@ func SerialCollector(proc Processor, serialPort string) { for { meas, err := decoder.Decode() if err != nil { + if err.Error() == "NMEA not implemented" { + continue + } log.Println(err) continue } sd, err := ConvertUbxToSensorData(meas) if err != nil { - log.Println(err) + log.Println("convert err:", err, meas) + continue + } + if sd == nil { continue } err = proc.Process(sd) if err != nil { - log.Println(err) + log.Println("process err:", err, *sd) continue } - } } diff --git a/core/dispatcher.go b/core/dispatcher.go index fb5455c..e6a97b7 100644 --- a/core/dispatcher.go +++ b/core/dispatcher.go @@ -2,7 +2,7 @@ package core import ( "errors" - "fmt" + "log" ) type Subscriber interface { @@ -20,7 +20,6 @@ type dispatcher struct { } func NewDispatcher() *dispatcher { - fmt.Println("new dispatcher") return &dispatcher{ listeners: make(map[int16]chan string), counter: 0, @@ -28,14 +27,13 @@ func NewDispatcher() *dispatcher { } func (d *dispatcher) Publish(message string) { - //fmt.Println("publish to listeners", len(d.listeners)) + log.Printf("publish to %v listeners:\n%v\n", len(d.listeners), message) for _, ch := range d.listeners { ch <- message } } func (d *dispatcher) Subscribe() (id int16, receiver <-chan string) { - fmt.Println("subscribe") key := d.counter d.counter++ rec := make(chan string) @@ -44,7 +42,6 @@ func (d *dispatcher) Subscribe() (id int16, receiver <-chan string) { } func (d *dispatcher) Unsubscribe(id int16) error { - fmt.Println("unsubscribe") receiver, ok := d.listeners[id] if !ok { return errors.New("no subscription with id") diff --git a/core/format.go b/core/format.go index 0ef83d1..5da9ef1 100644 --- a/core/format.go +++ b/core/format.go @@ -37,47 +37,74 @@ type Sensordata struct { Orientation [3]float64 } -func (s *Sensordata) isSameEpoch(n *Sensordata) bool { +func (s Sensordata) isSameEpoch(n Sensordata) bool { if n.itow == 0 { return false } return s.itow == n.itow } -func (s *Sensordata) Consolidate(n *Sensordata) *Sensordata { - if s.SourceId != n.SourceId { +func (s Sensordata) Consolidate(n Sensordata) Sensordata { + if (s.SourceId != n.SourceId && s != Sensordata{}) { + log.Println(s) + log.Println(n) log.Fatalln("Do not consolidate Sensordata from different Sources") } if s.isSameEpoch(n) { - // TODO: implement epoch consolidation logic + null := Sensordata{} + //if s.Timestamp == null.Timestamp { s.Timestamp = n.Timestamp } + //if s.Position == null.Position { s.Position = n.Position } + //if s.Orientation == null.Orientation { s.Orientation = n.Orientation } + if n.Timestamp != null.Timestamp && s.Timestamp != n.Timestamp { + s.Timestamp = n.Timestamp + } + if n.Position != null.Position && s.Position != n.Position { + s.Position = n.Position + } + if n.Orientation != null.Orientation && s.Orientation != n.Orientation { + s.Orientation = n.Orientation + } + + return s } return n } var ( errNotImplemented = errors.New("message not implemented") + errRawMessage = errors.New("raw message") ) func ConvertUbxToSensorData(msg interface{}) (*Sensordata, error) { - sd := &Sensordata{} + sd := &Sensordata{ + SourceId: SOURCE_SERIAL, + } switch v := msg.(type) { case *ublox.NavPvt: + //log.Println("NAV-PVT") sd.itow = v.ITOW_ms sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC).UnixNano() sd.Position[0] = float64(v.Lat_dege7) sd.Position[1] = float64(v.Lon_dege7) sd.Position[2] = float64(v.Height_mm) case *ublox.HnrPvt: + //log.Println("HNR-PVT") sd.itow = v.ITOW_ms sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC).UnixNano() sd.Position[0] = float64(v.Lat_dege7) sd.Position[1] = float64(v.Lon_dege7) sd.Position[2] = float64(v.Height_mm) case *ublox.NavAtt: + //log.Println("NAV-ATT") sd.itow = v.ITOW_ms sd.Orientation[0] = float64(v.Pitch_deg) sd.Orientation[1] = float64(v.Roll_deg) sd.Orientation[2] = float64(v.Heading_deg) + case *ublox.RawMessage: + //class := make([]byte, 2) + //binary.LittleEndian.PutUint16(class, v.ClassID()) + //log.Printf("%#v, %#v", class[0],class[1]) + return nil, nil default: return nil, errNotImplemented } diff --git a/core/http.go b/core/http.go index 0328766..b957ac6 100644 --- a/core/http.go +++ b/core/http.go @@ -48,7 +48,7 @@ func echo(sub Subscriber) func(w http.ResponseWriter, r *http.Request) { func home(w http.ResponseWriter, r *http.Request) { //var homeTemplate = template.Must(template.NewDispatcher("").ParseFiles("index.html")) - tpl, err := template.ParseFiles("index.html") + tpl, err := template.ParseFiles("static/index.html") if err != nil { log.Fatalln(err) } @@ -58,10 +58,14 @@ func home(w http.ResponseWriter, r *http.Request) { } } -func NewHttpServer(sub Subscriber, httpPort string) { +func HttpListenAndServe(sub Subscriber, httpPort string) { + log.Println("register websocket handler") http.HandleFunc("/echo", echo(sub)) + log.Println("register index handler") http.HandleFunc("/", home) + log.Println("register static file handler") http.Handle("/static/", http.FileServer(http.Dir("."))) + log.Println("start server") log.Fatal(http.ListenAndServe(httpPort, nil)) } diff --git a/core/pipeline.go b/core/pipeline.go index 8d98e81..9f031ee 100644 --- a/core/pipeline.go +++ b/core/pipeline.go @@ -8,6 +8,8 @@ import ( "time" ) +// TODO: adapt HNR-INS data to continue orientation stream + type Processor interface { Process(data *Sensordata) error } @@ -23,11 +25,15 @@ func NewPipeline(d Publisher, publishIntervalMs int, delayUpdateIntervalMs int) return &pipeline{ synchronizer{ bufferSize: 100, - updateTicker: time.NewTicker(time.Duration(delayUpdateIntervalMs) * time.Second), + mutex: &sync.Mutex{}, + updateTicker: time.NewTicker(time.Duration(delayUpdateIntervalMs) * time.Millisecond), + }, + aggregator{ + tcpMutex: &sync.Mutex{}, + serialMutex: &sync.Mutex{}, }, - aggregator{}, d, - time.NewTicker(time.Duration(publishIntervalMs) * time.Second), + time.NewTicker(time.Duration(publishIntervalMs) * time.Millisecond), } } @@ -37,22 +43,35 @@ func (p *pipeline) Run() { for { <-p.publishTicker.C err := p.Publish() - if err != nil { + if err != nil /*&& err.Error() != "no data available"*/ { log.Println(err) } } }() + log.Println("pipeline: processing service started") } func (p *pipeline) Publish() error { + p.tcpMutex.Lock() + p.serialMutex.Lock() + //log.Println(p.tcpSensorData) + //log.Println(p.serialSensorData) if (p.tcpSensorData == Sensordata{} && p.serialSensorData == Sensordata{}) { - return errors.New("No Data available") + p.tcpMutex.Unlock() + p.serialMutex.Unlock() + return errors.New("no data available") } data := map[string]Sensordata{ - string(p.tcpSensorData.SourceId): p.tcpSensorData, - string(p.serialSensorData.SourceId): p.serialSensorData, + string(SOURCE_TCP): p.tcpSensorData, + string(SOURCE_SERIAL): p.serialSensorData, } + p.tcpSensorData = Sensordata{} + p.serialSensorData = Sensordata{} + p.tcpMutex.Unlock() + p.serialMutex.Unlock() + jdata, err := json.Marshal(data) + log.Println(string(jdata)) if err != nil { return err } @@ -86,6 +105,7 @@ type synchronizer struct { } func (s *synchronizer) schedule() { + log.Println("synchronizer: started") for { <-s.updateTicker.C err := s.refreshDelay() @@ -101,16 +121,19 @@ func (s *synchronizer) refreshDelay() error { } func (p *pipeline) Process(data *Sensordata) error { + if data == nil { + return errors.New("nil processing not allowed") + } + //log.Println(string(data.SourceId)) switch data.SourceId { case SOURCE_TCP: go p.pushTcpDataToBuffer(*data) - return nil case SOURCE_SERIAL: go p.pushSerialDataToBuffer(*data) - return nil default: return errors.New("invalid data source") } + return nil } func (p *pipeline) pushTcpDataToBuffer(data Sensordata) { @@ -122,6 +145,6 @@ func (p *pipeline) pushTcpDataToBuffer(data Sensordata) { func (p *pipeline) pushSerialDataToBuffer(data Sensordata) { time.Sleep(time.Duration(p.serialDelayMs)) p.serialMutex.Lock() - p.serialSensorData = data + p.serialSensorData = p.serialSensorData.Consolidate(data) p.serialMutex.Unlock() } diff --git a/ex_websocketMessage.json b/static/ex_websocketMessage.json similarity index 100% rename from ex_websocketMessage.json rename to static/ex_websocketMessage.json diff --git a/hyperimu.json b/static/hyperimu.json similarity index 100% rename from hyperimu.json rename to static/hyperimu.json diff --git a/index.html b/static/index.html similarity index 100% rename from index.html rename to static/index.html diff --git a/ublox/decode.go b/ublox/decode.go index a940e08..3e0ccab 100644 --- a/ublox/decode.go +++ b/ublox/decode.go @@ -85,8 +85,7 @@ func (d *decoder) Decode() (msg interface{}, err error) { switch d.s.Bytes()[0] { case '$': - //fmt.Println("NMEA message: skipping!") - return nil, err + return nil, errors.New("NMEA not implemented") //return nmea.Decode(d.s.Bytes()) case 0xB5: return decodeUbx(d.s.Bytes())