From bcdf7ddbf7761f88c0d8a4aa7cae53a427c2ef42 Mon Sep 17 00:00:00 2001 From: Timo Volkmann Date: Thu, 3 Dec 2020 15:09:33 +0100 Subject: [PATCH] cleaned up develop and merged with refactoring --- .idea/.gitignore | 8 + .idea/misc.xml | 6 + .idea/vcs.xml | 1 - .idea/workspace.xml | 226 ++++++++++-------- cmd/serial_only/serial_only.go | 21 +- cmd/server/server.go | 26 +- cmd/server_only/server_only.go | 25 -- cmd/tcp_only/tcp_only.go | 22 ++ core/collectors.go | 98 ++++++++ core/dispatcher.go | 52 ++++ core/format.go | 135 +++++++++++ core/http.go | 71 ++++++ core/pipeline.go | 150 ++++++++++++ dispatcher/dispatcher.go | 46 ---- go.mod | 4 +- go.sum | 39 +-- net/net.go | 132 ---------- serial_ubx/serial.go | 124 ---------- .../ex_websocketMessage.json | 0 hyperimu.json => static/hyperimu.json | 2 +- index.html => static/index.html | 0 ublox/decode.go | 21 +- ublox/messages.go | 18 +- 23 files changed, 732 insertions(+), 495 deletions(-) create mode 100644 .idea/.gitignore create mode 100644 .idea/misc.xml delete mode 100644 cmd/server_only/server_only.go create mode 100644 cmd/tcp_only/tcp_only.go create mode 100644 core/collectors.go create mode 100644 core/dispatcher.go create mode 100644 core/format.go create mode 100644 core/http.go create mode 100644 core/pipeline.go delete mode 100644 dispatcher/dispatcher.go delete mode 100644 net/net.go delete mode 100644 serial_ubx/serial.go rename ex_websocketMessage.json => static/ex_websocketMessage.json (100%) rename hyperimu.json => static/hyperimu.json (99%) rename index.html => static/index.html (100%) diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..73f69e0 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml +# Editor-based HTTP Client requests +/httpRequests/ diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..0e93714 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml index ac4ad06..94a25f7 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -2,6 +2,5 @@ - \ No newline at end of file diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 230effa..522aabe 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -16,85 +16,35 @@ - - + - - - - - + + + + + + + + + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + - + + + + - - - - + @@ -156,6 +106,15 @@ + + + + + + + + + @@ -165,6 +124,14 @@ + + + + + + + + @@ -175,11 +142,7 @@ - - - - - + @@ -227,7 +190,21 @@ @@ -239,12 +216,17 @@ + + + + - true + + + + + + + + + + + + @@ -301,46 +300,50 @@ - - + + - + - - + + + - + - - + + + - + - - + + + - + + @@ -357,22 +360,43 @@ - - + + + + + + + - - + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/cmd/serial_only/serial_only.go b/cmd/serial_only/serial_only.go index 3cd3139..352585e 100644 --- a/cmd/serial_only/serial_only.go +++ b/cmd/serial_only/serial_only.go @@ -1,14 +1,21 @@ package main import ( - "git.timovolkmann.de/gyrogpsc/serial_ubx" - "log" + "fmt" + "git.timovolkmann.de/gyrogpsc/core" +) + +const ( + SERIAL_PORT = "/dev/tty.usbmodem14201" ) func main() { - r, err := serial_ubx.Setup("/dev/tty.usbmodem14201") - if err != nil { - log.Fatal(err) - } - r.Printloop() + core.SerialCollector(&printer{}, SERIAL_PORT) +} + +type printer struct{} + +func (p *printer) Process(data *core.Sensordata) error { + fmt.Println(data) + return nil } diff --git a/cmd/server/server.go b/cmd/server/server.go index a4be449..3b6bb8f 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -1,8 +1,8 @@ package main import ( - "git.timovolkmann.de/gyrogpsc/dispatcher" - gnet "git.timovolkmann.de/gyrogpsc/net" + "git.timovolkmann.de/gyrogpsc/core" + "log" ) const ( @@ -12,14 +12,20 @@ const ( ) func main() { - d := dispatcher.New() - collectRoutines(d) - gnet.NewHttpServer(d, HTTP_PORT) + log.Println("setup dispatcher") + dispatcher := core.NewDispatcher() + log.Println("initialize processing pipeline") + processor := core.NewPipeline(dispatcher, 50, 1000) + processor.Run() + log.Println("start data collectors") + collectRoutines(processor) + log.Println("start http server") + core.HttpListenAndServe(dispatcher, HTTP_PORT) } -func collectRoutines(d *dispatcher.Dispatcher) { - // collectRoutines Serial UBX Sensor Data - go gnet.SerialUbxCollector(d, SERIAL_PORT) - // collectRoutines TCP JSON Sensor Data - go gnet.TcpJsonCollector(d, TCP_PORT) +func collectRoutines(proc core.Processor) { + // collect Sensor data from Serial UBX in Goroutine + go core.SerialCollector(proc, SERIAL_PORT) + // collect Sensor data from JSON over TCP in Goroutine + go core.TcpCollector(proc, TCP_PORT) } diff --git a/cmd/server_only/server_only.go b/cmd/server_only/server_only.go deleted file mode 100644 index a4be449..0000000 --- a/cmd/server_only/server_only.go +++ /dev/null @@ -1,25 +0,0 @@ -package main - -import ( - "git.timovolkmann.de/gyrogpsc/dispatcher" - gnet "git.timovolkmann.de/gyrogpsc/net" -) - -const ( - TCP_PORT = ":3010" - HTTP_PORT = ":3011" - SERIAL_PORT = "/dev/tty.usbmodem14201" -) - -func main() { - d := dispatcher.New() - collectRoutines(d) - gnet.NewHttpServer(d, HTTP_PORT) -} - -func collectRoutines(d *dispatcher.Dispatcher) { - // collectRoutines Serial UBX Sensor Data - go gnet.SerialUbxCollector(d, SERIAL_PORT) - // collectRoutines TCP JSON Sensor Data - go gnet.TcpJsonCollector(d, TCP_PORT) -} diff --git a/cmd/tcp_only/tcp_only.go b/cmd/tcp_only/tcp_only.go new file mode 100644 index 0000000..5e21bac --- /dev/null +++ b/cmd/tcp_only/tcp_only.go @@ -0,0 +1,22 @@ +package main + +import ( + "git.timovolkmann.de/gyrogpsc/core" +) + +const ( + TCP_PORT = ":3010" + HTTP_PORT = ":3011" +) + +func main() { + dispatcher := core.NewDispatcher() + processor := core.NewPipeline(dispatcher, 20, 10000) + collectRoutines(processor) + core.HttpListenAndServe(dispatcher, HTTP_PORT) +} + +func collectRoutines(proc core.Processor) { + // collect Sensor data from JSON over TCP in Goroutine + go core.TcpCollector(proc, TCP_PORT) +} diff --git a/core/collectors.go b/core/collectors.go new file mode 100644 index 0000000..47146c4 --- /dev/null +++ b/core/collectors.go @@ -0,0 +1,98 @@ +package core + +import ( + "fmt" + "git.timovolkmann.de/gyrogpsc/ublox" + "go.bug.st/serial" + "log" + "net" + "os" +) + +func TcpCollector(proc Processor, tcpPort string) { + listener, err := net.Listen("tcp", tcpPort) + if err != nil { + fmt.Println("Error listening:", err.Error()) + os.Exit(1) + } + // Close the listener when the application closes. + defer listener.Close() + + for { + // Listen for an incoming connection. + conn, err := listener.Accept() + if err != nil { + fmt.Println("Error accepting: ", err.Error()) + os.Exit(1) + } + // Handle connections in a new goroutine. + go jsonHandler(conn, proc) + } +} + +// handles incoming tcp connections with json payload. +func jsonHandler(conn net.Conn, proc Processor) { + defer conn.Close() + + // TRY reader := bufio.NewReader(conn) OR NewScanner(conn) + buf := make([]byte, 2048) + for { + // Read the incoming connection into the buffer. + n, err := conn.Read(buf) + if err != nil { + fmt.Println("TCP error - reading from connection:", n, err.Error()) + break + } + //json := pretty.Pretty(buf[:n]) + //fmt.Println(string(json)) + fmt.Println(string(buf[:n])) + sd, err := ConvertSensorDataPhone(buf[:n]) + if err != nil { + log.Println(err) + continue + } + + err = proc.Process(sd) + if err != nil { + log.Println(err) + continue + } + } +} + +func SerialCollector(proc Processor, serialPort string) { + mode := &serial.Mode{ + BaudRate: 115200, + } + port, err := serial.Open(serialPort, mode) + if err != nil { + log.Fatalln(err.Error()) + } + + decoder := ublox.NewDecoder(port) + + for { + meas, err := decoder.Decode() + if err != nil { + if err.Error() == "NMEA not implemented" { + continue + } + log.Println(err) + continue + } + sd, err := ConvertUbxToSensorData(meas) + if err != nil { + log.Println("convert err:", err, meas) + continue + } + if sd == nil { + continue + } + + err = proc.Process(sd) + if err != nil { + log.Println("process err:", err, *sd) + continue + } + } +} diff --git a/core/dispatcher.go b/core/dispatcher.go new file mode 100644 index 0000000..e6a97b7 --- /dev/null +++ b/core/dispatcher.go @@ -0,0 +1,52 @@ +package core + +import ( + "errors" + "log" +) + +type Subscriber interface { + Subscribe() (int16, <-chan string) + Unsubscribe(id int16) error +} + +type Publisher interface { + Publish(message string) +} + +type dispatcher struct { + listeners map[int16]chan string + counter int16 +} + +func NewDispatcher() *dispatcher { + return &dispatcher{ + listeners: make(map[int16]chan string), + counter: 0, + } +} + +func (d *dispatcher) Publish(message string) { + log.Printf("publish to %v listeners:\n%v\n", len(d.listeners), message) + for _, ch := range d.listeners { + ch <- message + } +} + +func (d *dispatcher) Subscribe() (id int16, receiver <-chan string) { + key := d.counter + d.counter++ + rec := make(chan string) + d.listeners[key] = rec + return key, rec +} + +func (d *dispatcher) Unsubscribe(id int16) error { + receiver, ok := d.listeners[id] + if !ok { + return errors.New("no subscription with id") + } + delete(d.listeners, id) + close(receiver) + return nil +} diff --git a/core/format.go b/core/format.go new file mode 100644 index 0000000..5da9ef1 --- /dev/null +++ b/core/format.go @@ -0,0 +1,135 @@ +package core + +import ( + "errors" + "git.timovolkmann.de/gyrogpsc/ublox" + "log" + "time" + + "github.com/m7shapan/njson" +) + +/*{ +"smartphone": { +// hier daten von hyperimu +}, +"serial": { +// hier Daten von M8U: +"timestamp": 37539672354 +"position": [0, 0, 0], +"orientation": [0, 0, 0] +} +} +*/ + +type sourceId string + +const ( + SOURCE_TCP sourceId = "SOURCE_TCP" + SOURCE_SERIAL sourceId = "SOURCE_SERIAL" +) + +type Sensordata struct { + itow uint32 + SourceId sourceId + Timestamp int64 + Position [3]float64 + Orientation [3]float64 +} + +func (s Sensordata) isSameEpoch(n Sensordata) bool { + if n.itow == 0 { + return false + } + return s.itow == n.itow +} + +func (s Sensordata) Consolidate(n Sensordata) Sensordata { + if (s.SourceId != n.SourceId && s != Sensordata{}) { + log.Println(s) + log.Println(n) + log.Fatalln("Do not consolidate Sensordata from different Sources") + } + if s.isSameEpoch(n) { + null := Sensordata{} + //if s.Timestamp == null.Timestamp { s.Timestamp = n.Timestamp } + //if s.Position == null.Position { s.Position = n.Position } + //if s.Orientation == null.Orientation { s.Orientation = n.Orientation } + if n.Timestamp != null.Timestamp && s.Timestamp != n.Timestamp { + s.Timestamp = n.Timestamp + } + if n.Position != null.Position && s.Position != n.Position { + s.Position = n.Position + } + if n.Orientation != null.Orientation && s.Orientation != n.Orientation { + s.Orientation = n.Orientation + } + + return s + } + return n +} + +var ( + errNotImplemented = errors.New("message not implemented") + errRawMessage = errors.New("raw message") +) + +func ConvertUbxToSensorData(msg interface{}) (*Sensordata, error) { + sd := &Sensordata{ + SourceId: SOURCE_SERIAL, + } + switch v := msg.(type) { + case *ublox.NavPvt: + //log.Println("NAV-PVT") + sd.itow = v.ITOW_ms + sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC).UnixNano() + sd.Position[0] = float64(v.Lat_dege7) + sd.Position[1] = float64(v.Lon_dege7) + sd.Position[2] = float64(v.Height_mm) + case *ublox.HnrPvt: + //log.Println("HNR-PVT") + sd.itow = v.ITOW_ms + sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC).UnixNano() + sd.Position[0] = float64(v.Lat_dege7) + sd.Position[1] = float64(v.Lon_dege7) + sd.Position[2] = float64(v.Height_mm) + case *ublox.NavAtt: + //log.Println("NAV-ATT") + sd.itow = v.ITOW_ms + sd.Orientation[0] = float64(v.Pitch_deg) + sd.Orientation[1] = float64(v.Roll_deg) + sd.Orientation[2] = float64(v.Heading_deg) + case *ublox.RawMessage: + //class := make([]byte, 2) + //binary.LittleEndian.PutUint16(class, v.ClassID()) + //log.Printf("%#v, %#v", class[0],class[1]) + return nil, nil + default: + return nil, errNotImplemented + } + return sd, nil +} + +func ConvertSensorDataPhone(jsonData []byte) (*Sensordata, error) { + return convertAndroidHyperImu(jsonData) +} + +func convertAndroidHyperImu(jsonData []byte) (*Sensordata, error) { + prep := struct { + Timestamp int64 `njson:"Timestamp"` + Position [3]float64 `njson:"GPS"` + Orientation [3]float64 `njson:"orientation"` + }{} + err := njson.Unmarshal(jsonData, &prep) + if err != nil { + return nil, err + } + sd := &Sensordata{ + Timestamp: prep.Timestamp * int64(time.Millisecond), + //Timestamp: time.Unix(0, prep.Timestamp * int64(time.Millisecond)), + Position: prep.Position, + Orientation: prep.Orientation, + } + return sd, nil +} diff --git a/core/http.go b/core/http.go new file mode 100644 index 0000000..b957ac6 --- /dev/null +++ b/core/http.go @@ -0,0 +1,71 @@ +package core + +import ( + "fmt" + "github.com/gorilla/websocket" + "html/template" + "log" + "net/http" +) + +func echo(sub Subscriber) func(w http.ResponseWriter, r *http.Request) { + var upgrader = websocket.Upgrader{} // use default options + return func(w http.ResponseWriter, r *http.Request) { + fmt.Println("upgrading to ws") + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Print("upgrade:", err) + return + } + //defer c.Close() + go func() { + for { + if _, _, err := c.NextReader(); err != nil { + c.Close() + break + } + } + }() + + dispatcherId, channel := sub.Subscribe() + defer sub.Unsubscribe(dispatcherId) + for { + log.Println("") + //if err != nil { + // log.Println("read:", err) + // break + //} + cmsg := <-channel + err = c.WriteMessage(websocket.TextMessage, []byte(cmsg)) + if err != nil { + log.Println("write:", err) + + break + } + } + } +} + +func home(w http.ResponseWriter, r *http.Request) { + //var homeTemplate = template.Must(template.NewDispatcher("").ParseFiles("index.html")) + tpl, err := template.ParseFiles("static/index.html") + if err != nil { + log.Fatalln(err) + } + err = tpl.Execute(w, "ws://"+r.Host+"/echo") + if err != nil { + log.Fatalln(err) + } +} + +func HttpListenAndServe(sub Subscriber, httpPort string) { + log.Println("register websocket handler") + http.HandleFunc("/echo", echo(sub)) + log.Println("register index handler") + http.HandleFunc("/", home) + log.Println("register static file handler") + http.Handle("/static/", http.FileServer(http.Dir("."))) + + log.Println("start server") + log.Fatal(http.ListenAndServe(httpPort, nil)) +} diff --git a/core/pipeline.go b/core/pipeline.go new file mode 100644 index 0000000..9f031ee --- /dev/null +++ b/core/pipeline.go @@ -0,0 +1,150 @@ +package core + +import ( + "encoding/json" + "errors" + "log" + "sync" + "time" +) + +// TODO: adapt HNR-INS data to continue orientation stream + +type Processor interface { + Process(data *Sensordata) error +} + +type pipeline struct { + synchronizer + aggregator + Publisher + publishTicker *time.Ticker +} + +func NewPipeline(d Publisher, publishIntervalMs int, delayUpdateIntervalMs int) *pipeline { + return &pipeline{ + synchronizer{ + bufferSize: 100, + mutex: &sync.Mutex{}, + updateTicker: time.NewTicker(time.Duration(delayUpdateIntervalMs) * time.Millisecond), + }, + aggregator{ + tcpMutex: &sync.Mutex{}, + serialMutex: &sync.Mutex{}, + }, + d, + time.NewTicker(time.Duration(publishIntervalMs) * time.Millisecond), + } +} + +func (p *pipeline) Run() { + go p.schedule() + go func() { + for { + <-p.publishTicker.C + err := p.Publish() + if err != nil /*&& err.Error() != "no data available"*/ { + log.Println(err) + } + } + }() + log.Println("pipeline: processing service started") +} + +func (p *pipeline) Publish() error { + p.tcpMutex.Lock() + p.serialMutex.Lock() + //log.Println(p.tcpSensorData) + //log.Println(p.serialSensorData) + if (p.tcpSensorData == Sensordata{} && p.serialSensorData == Sensordata{}) { + p.tcpMutex.Unlock() + p.serialMutex.Unlock() + return errors.New("no data available") + } + data := map[string]Sensordata{ + string(SOURCE_TCP): p.tcpSensorData, + string(SOURCE_SERIAL): p.serialSensorData, + } + p.tcpSensorData = Sensordata{} + p.serialSensorData = Sensordata{} + p.tcpMutex.Unlock() + p.serialMutex.Unlock() + + jdata, err := json.Marshal(data) + log.Println(string(jdata)) + if err != nil { + return err + } + p.Publisher.Publish(string(jdata)) + return nil +} + +type aggregator struct { + tcpSensorData Sensordata + serialSensorData Sensordata + tcpMutex *sync.Mutex + serialMutex *sync.Mutex +} + +type UnixNanoTime int64 + +type synchronizer struct { + tcpDelayMs int + serialDelayMs int + tcpBuffer map[UnixNanoTime]Sensordata + serialBuffer map[UnixNanoTime]Sensordata + bufferSize int + mutex *sync.Mutex + updateTicker *time.Ticker + // should run concurrently + // + // Methods: + // pushSensordata(Sensordata), remove oldest if larger than bufferSize + // refreshDelay() + // Schedule() +} + +func (s *synchronizer) schedule() { + log.Println("synchronizer: started") + for { + <-s.updateTicker.C + err := s.refreshDelay() + if err != nil { + log.Println(err) + } + } +} + +func (s *synchronizer) refreshDelay() error { + // TODO: implement + return nil +} + +func (p *pipeline) Process(data *Sensordata) error { + if data == nil { + return errors.New("nil processing not allowed") + } + //log.Println(string(data.SourceId)) + switch data.SourceId { + case SOURCE_TCP: + go p.pushTcpDataToBuffer(*data) + case SOURCE_SERIAL: + go p.pushSerialDataToBuffer(*data) + default: + return errors.New("invalid data source") + } + return nil +} + +func (p *pipeline) pushTcpDataToBuffer(data Sensordata) { + time.Sleep(time.Duration(p.tcpDelayMs)) + p.tcpMutex.Lock() + p.tcpSensorData = data + p.tcpMutex.Unlock() +} +func (p *pipeline) pushSerialDataToBuffer(data Sensordata) { + time.Sleep(time.Duration(p.serialDelayMs)) + p.serialMutex.Lock() + p.serialSensorData = p.serialSensorData.Consolidate(data) + p.serialMutex.Unlock() +} diff --git a/dispatcher/dispatcher.go b/dispatcher/dispatcher.go deleted file mode 100644 index 05bcce4..0000000 --- a/dispatcher/dispatcher.go +++ /dev/null @@ -1,46 +0,0 @@ -package dispatcher - -import ( - "errors" - "fmt" -) - -type Dispatcher struct { - listeners map[int16]chan string - counter int16 -} - -func New() *Dispatcher { - fmt.Println("new dispatcher") - return &Dispatcher{ - listeners: make(map[int16]chan string), - counter: 0, - } -} - -func (d *Dispatcher) Publish(message string) { - fmt.Println("publish to listeners", len(d.listeners)) - for _, ch := range d.listeners { - ch <- message - } -} - -func (d *Dispatcher) Subscribe() (id int16, receiver <-chan string) { - fmt.Println("subscribe") - key := d.counter - d.counter++ - rec := make(chan string) - d.listeners[key] = rec - return key, rec -} - -func (d *Dispatcher) Unsubscribe(id int16) error { - fmt.Println("unsubscribe") - receiver, ok := d.listeners[id] - if !ok { - return errors.New("no subscription with id") - } - delete(d.listeners, id) - close(receiver) - return nil -} diff --git a/go.mod b/go.mod index 2773932..77abb86 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,9 @@ module git.timovolkmann.de/gyrogpsc go 1.15 require ( - github.com/daedaleanai/ublox v0.0.0-20201103121443-9befa131d32d github.com/gorilla/websocket v1.4.2 - github.com/sparkfun/SparkFun_Ublox_Arduino_Library v1.8.7 // indirect + github.com/m7shapan/njson v1.0.1 github.com/tidwall/pretty v1.0.2 go.bug.st/serial v1.1.1 golang.org/x/sys v0.0.0-20201107080550-4d91cf3a1aaf // indirect - golang.org/x/tools v0.0.0-20201121010211-780cb80bd7fb // indirect ) diff --git a/go.sum b/go.sum index 8877819..6d0f678 100644 --- a/go.sum +++ b/go.sum @@ -1,50 +1,33 @@ github.com/creack/goselect v0.1.1 h1:tiSSgKE1eJtxs1h/VgGQWuXUP0YS4CDIFMp6vaI1ls0= github.com/creack/goselect v0.1.1/go.mod h1:a/NhLweNvqIYMuxcMOuWY516Cimucms3DglDzQP3hKY= -github.com/daedaleanai/ublox v0.0.0-20201103121443-9befa131d32d h1:WbFmX8L79E02PgDJYWINhWvceaMGUzgmrwdE5CuUBBk= -github.com/daedaleanai/ublox v0.0.0-20201103121443-9befa131d32d/go.mod h1:pfcwlN8XUYXVYAkPU2LrFZnXIS4EvpZaXh+qRKCN9Sg= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/m7shapan/njson v1.0.1 h1:s+odQrPkzcCCGRTp46cD0XCVYN3pvdoaVwbFVmjAvys= +github.com/m7shapan/njson v1.0.1/go.mod h1:4sidL3oRZO1KV5FkclRBPI7nqFzlIq3BwdxHRMlOa9U= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/sparkfun/SparkFun_Ublox_Arduino_Library v1.8.7 h1:T112CHmp+v1bh0W7sp49tZe8SUbw3viSGRTVLziZxfc= -github.com/sparkfun/SparkFun_Ublox_Arduino_Library v1.8.7/go.mod h1:FVoZAzJFrR5D6P8qd2rgpJAV/qF5oODtIT9YjVV+xzY= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/tidwall/gjson v1.6.0 h1:9VEQWz6LLMUsUl6PueE49ir4Ka6CzLymOAZDxpFsTDc= +github.com/tidwall/gjson v1.6.0/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls= +github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc= +github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= +github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tidwall/pretty v1.0.2 h1:Z7S3cePv9Jwm1KwS0513MRaoUe3S01WPbLNV40pwWZU= github.com/tidwall/pretty v1.0.2/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= -github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.bug.st/serial v1.1.1 h1:5J1DpaIaSIruBi7jVnKXnhRS+YQ9+2PLJMtIZKoIgnc= go.bug.st/serial v1.1.1/go.mod h1:VmYBeyJWp5BnJ0tw2NUJHZdJTGl2ecBGABHlzRK1knY= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= -golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200909081042-eff7692f9009/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201107080550-4d91cf3a1aaf h1:kt3wY1Lu5MJAnKTfoMR52Cu4gwvna4VTzNOiT8tY73s= golang.org/x/sys v0.0.0-20201107080550-4d91cf3a1aaf/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20201121010211-780cb80bd7fb h1:z5+u0pkAUPUWd3taoTialQ2JAMo4Wo1Z3L25U4ZV9r0= -golang.org/x/tools v0.0.0-20201121010211-780cb80bd7fb/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/net/net.go b/net/net.go deleted file mode 100644 index 51f8d9d..0000000 --- a/net/net.go +++ /dev/null @@ -1,132 +0,0 @@ -package net - -import ( - "encoding/json" - "fmt" - "git.timovolkmann.de/gyrogpsc/dispatcher" - "git.timovolkmann.de/gyrogpsc/serial_ubx" - "github.com/gorilla/websocket" - "github.com/tidwall/pretty" - "html/template" - "log" - "net" - "net/http" - "os" -) - -func echo(d *dispatcher.Dispatcher) func(w http.ResponseWriter, r *http.Request) { - var upgrader = websocket.Upgrader{} // use default options - return func(w http.ResponseWriter, r *http.Request) { - fmt.Println("upgrading to ws") - c, err := upgrader.Upgrade(w, r, nil) - if err != nil { - log.Print("upgrade:", err) - return - } - //defer c.Close() - go func() { - for { - if _, _, err := c.NextReader(); err != nil { - c.Close() - break - } - } - }() - - dispatcherId, channel := d.Subscribe() - defer d.Unsubscribe(dispatcherId) - for { - log.Println("") - //if err != nil { - // log.Println("read:", err) - // break - //} - cmsg := <-channel - err = c.WriteMessage(websocket.TextMessage, []byte(cmsg)) - if err != nil { - log.Println("write:", err) - - break - } - } - } -} - -func home(w http.ResponseWriter, r *http.Request) { - //var homeTemplate = template.Must(template.New("").ParseFiles("index.html")) - tpl, err := template.ParseFiles("index.html") - if err != nil { - log.Fatalln(err) - } - err = tpl.Execute(w, "ws://"+r.Host+"/echo") - if err != nil { - log.Fatalln(err) - } -} - -func NewHttpServer(d *dispatcher.Dispatcher, httpPort string) { - http.HandleFunc("/echo", echo(d)) - http.HandleFunc("/", home) - http.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir(".")))) - - log.Fatal(http.ListenAndServe(httpPort, nil)) -} - -func SerialUbxCollector(d *dispatcher.Dispatcher, serialPort string) { - r, err := serial_ubx.Setup(serialPort) - if err != nil { - log.Fatalln(err) - } - - for { - meas, err := r.NextMeasurement() - if err != nil { - continue - } - fmt.Println(meas) - measjson, err := json.Marshal(meas) - d.Publish(string(measjson)) - } -} - -func TcpJsonCollector(d *dispatcher.Dispatcher, tcpPort string) { - listener, err := net.Listen("tcp", tcpPort) - if err != nil { - fmt.Println("Error listening:", err.Error()) - os.Exit(1) - } - // Close the listener when the application closes. - defer listener.Close() - - for { - // Listen for an incoming connection. - conn, err := listener.Accept() - if err != nil { - fmt.Println("Error accepting: ", err.Error()) - os.Exit(1) - } - // Handle connections in a new goroutine. - go handleTcpJsonSensorData(conn, d) - } -} - -// Handles incoming requests. -func handleTcpJsonSensorData(conn net.Conn, d *dispatcher.Dispatcher) { - defer conn.Close() - // Make a buffer to hold incoming data. - for { - buf := make([]byte, 2048) - // Read the incoming connection into the buffer. - _, err := conn.Read(buf) - if err != nil { - fmt.Println("Error reading:", err.Error()) - break - } - json := pretty.Pretty(buf) - fmt.Println(string(json)) - d.Publish(string(json)) - // Send a response back to person contacting us. - //conn.Write([]byte("success")) - // Close the connection when you're done with it. - } -} diff --git a/serial_ubx/serial.go b/serial_ubx/serial.go deleted file mode 100644 index 6201a0f..0000000 --- a/serial_ubx/serial.go +++ /dev/null @@ -1,124 +0,0 @@ -package serial_ubx - -import ( - "errors" - "fmt" - "git.timovolkmann.de/gyrogpsc/ublox" - "go.bug.st/serial" - "log" - "time" -) - -type Measurement struct { - //Timestamp int64 `json:"timestamp"` - Timestamp time.Time `json:"timestamp"` - Position [3]int32 `json:"position"` // Latitude, Longitude, Height - Orientation [3]int32 `json:"orientation"` // Pitch, Roll, Heading -} - -type ubxReceiver struct { - decoder *ublox.Decoder - currentMeas Measurement -} - -func Setup(portname string) (*ubxReceiver, error) { - mode := &serial.Mode{ - BaudRate: 115200, - } - port, err := serial.Open(portname, mode) - if err != nil { - return nil, err - } - return &ubxReceiver{ - decoder: ublox.NewDecoder(port), - }, nil -} - -func (u *ubxReceiver) Next() (ublox.Message, error) { - return u.decoder.Decode() -} - -var ( - errNotImplemented = errors.New("message not implemented") -) - -// TODO: additional callback with adjustable timing -func (u *ubxReceiver) NextMeasurement() (*Measurement, error) { - - msg, err := u.decoder.Decode() - if err != nil { - return nil, err - } - //t := time.Time{} - switch v := msg.(type) { - case *ublox.NavPvt: - t, err := time.Parse(time.RFC3339Nano, formatTime(v.Year_y, v.Month_month, v.Day_d, v.Hour_h, v.Min_min, v.Sec_s, v.Nano_ns)) - if err != nil { - log.Println(err) - } - //u.currentMeas.Timestamp = t.UnixNano() - u.currentMeas.Timestamp = t - u.currentMeas.Position[0] = v.Lat_dege7 - u.currentMeas.Position[1] = v.Lon_dege7 - u.currentMeas.Position[2] = v.Height_mm - fmt.Printf("%T %v\n", *v, *v) - case *ublox.HnrPvt: - t, err := time.Parse(time.RFC3339Nano, formatTime(v.Year_y, v.Month_month, v.Day_d, v.Hour_h, v.Min_min, v.Sec_s, v.Nano_ns)) - if err != nil { - log.Println(err) - } - u.currentMeas.Timestamp = t - u.currentMeas.Position[0] = v.Lat_dege7 - u.currentMeas.Position[1] = v.Lon_dege7 - u.currentMeas.Position[2] = v.Height_mm - fmt.Printf("%T %v\n", *v, *v) - case *ublox.NavAtt: - u.currentMeas.Orientation[0] = v.Pitch_deg - u.currentMeas.Orientation[1] = v.Roll_deg - u.currentMeas.Orientation[2] = v.Heading_deg - fmt.Printf("%T %v\n", *v, *v) - - //case *ublox.RawMessage: - // //fmt.Printf("%T %v\n\n", *v, *v) - default: - return nil, errNotImplemented - } - m := u.currentMeas - return &m, nil -} - -func formatTime(Year_y uint16, Month_month byte, Day_d byte, Hour_h byte, Min_min byte, Sec_s byte, Nano_ns int32) string { - //Nano_ns *= 1e+3 - if Nano_ns < 0 { - Nano_ns += int32(time.Second) - if Sec_s > 0 { - Sec_s-- - } else if Min_min > 0 { - Sec_s = 59 - Min_min-- - } else if Hour_h > 0 { - Sec_s = 59 - Min_min = 59 - Hour_h-- - } else if Day_d > 1 { - Sec_s = 59 - Min_min = 59 - Hour_h = 23 - Day_d-- - } // TODO: more cases for exact behavior! good for now... - } - //fmt.Printf("%04d-%02d-%02dT%02d:%02d:%02d.%09dZ00:00\n", Year_y, Month_month, Day_d, Hour_h, Min_min, Sec_s, Nano_ns ) - return fmt.Sprintf("%04d-%02d-%02dT%02d:%02d:%02d.%09dZ", Year_y, Month_month, Day_d, Hour_h, Min_min, Sec_s, Nano_ns) -} - -func (u *ubxReceiver) Printloop() { - - for { - meas, err := u.NextMeasurement() - if err != nil { - continue - } - fmt.Println(meas) - } - -} diff --git a/ex_websocketMessage.json b/static/ex_websocketMessage.json similarity index 100% rename from ex_websocketMessage.json rename to static/ex_websocketMessage.json diff --git a/hyperimu.json b/static/hyperimu.json similarity index 99% rename from hyperimu.json rename to static/hyperimu.json index 1567da8..c5b6bfa 100644 --- a/hyperimu.json +++ b/static/hyperimu.json @@ -63,4 +63,4 @@ -0.006618400104343891 ], "tmd3702_proximity proximity sensor": [5, 0, 0] -} +} \ No newline at end of file diff --git a/index.html b/static/index.html similarity index 100% rename from index.html rename to static/index.html diff --git a/ublox/decode.go b/ublox/decode.go index ba019af..3e0ccab 100644 --- a/ublox/decode.go +++ b/ublox/decode.go @@ -14,17 +14,17 @@ import ( "io" ) -// A Decoder scans an io stream into UBX (0xB5-0x62 separated) or NMEA ("$xxx,,,,*FF\r\n") frames. +// A decoder scans an io stream into UBX (0xB5-0x62 separated) or NMEA ("$xxx,,,,*FF\r\n") frames. // If you have an unmixed stream of NMEA-only data you can use nmea.Decode() on bufio.Scanner.Bytes() directly. -type Decoder struct { +type decoder struct { s *bufio.Scanner } // NewDecoder creates a new bufio Scanner with a splitfunc that can handle both UBX and NMEA frames. -func NewDecoder(r io.Reader) *Decoder { +func NewDecoder(r io.Reader) *decoder { d := bufio.NewScanner(r) d.Split(splitFunc) - return &Decoder{s: d} + return &decoder{s: d} } // Assume we're either at the start of an NMEA sentence or at the start of a UBX message @@ -74,8 +74,8 @@ func splitFunc(data []byte, atEOF bool) (advance int, token []byte, err error) { return 1 + i1, nil, nil } -// Decode reads on NMEA or UBX frame and calls DecodeUbx accordingly to parse the message. Skips NMEA. -func (d *Decoder) Decode() (msg Message, err error) { +// Decode reads on NMEA or UBX frame and calls decodeUbx accordingly to parse the message, while skipping NMEA. +func (d *decoder) Decode() (msg interface{}, err error) { if !d.s.Scan() { if err = d.s.Err(); err == nil { err = io.EOF @@ -85,11 +85,10 @@ func (d *Decoder) Decode() (msg Message, err error) { switch d.s.Bytes()[0] { case '$': - //fmt.Println("NMEA message: skipping!") - return nil, err + return nil, errors.New("NMEA not implemented") //return nmea.Decode(d.s.Bytes()) case 0xB5: - return DecodeUbx(d.s.Bytes()) + return decodeUbx(d.s.Bytes()) } panic("impossible frame") } @@ -99,7 +98,7 @@ var ( errInvalidChkSum = errors.New("invalid UBX checksum") ) -func DecodeUbx(frame []byte) (msg Message, err error) { +func decodeUbx(frame []byte) (msg Message, err error) { buf := bytes.NewReader(frame) @@ -149,7 +148,7 @@ func DecodeUbx(frame []byte) (msg Message, err error) { if msg != nil { err = binary.Read(buf, binary.LittleEndian, msg) } else { - msg = &RawMessage{ClassID: header.ClassID, Data: append([]byte(nil), frame[6:len(frame)-2]...)} + msg = &RawMessage{classID: header.ClassID, Data: append([]byte(nil), frame[6:len(frame)-2]...)} } //fmt.Println(msg) diff --git a/ublox/messages.go b/ublox/messages.go index ad3a078..610f7b2 100644 --- a/ublox/messages.go +++ b/ublox/messages.go @@ -1,15 +1,21 @@ package ublox type Message interface { - classID() uint16 + ClassID() uint16 } +//type UbxMessage interface { +// Timestamp() (time.Time, error) +// Position() ([3]float64, error) +// Orientation() ([3]float64, error) +//} + type RawMessage struct { - ClassID uint16 + classID uint16 Data []byte } -func (msg *RawMessage) classID() uint16 { return msg.ClassID } +func (msg *RawMessage) ClassID() uint16 { return msg.classID } type NavPvt struct { ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details. @@ -47,7 +53,7 @@ type NavPvt struct { MagAcc_deg2e uint16 // 1e-2 Magnetic declination accuracy. Only supported in ADR 4.10 and later. } -func (NavPvt) classID() uint16 { return 0x0701 } +func (NavPvt) ClassID() uint16 { return 0x0701 } type HnrPvt struct { ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details. @@ -77,7 +83,7 @@ type HnrPvt struct { Reserved1 [4]byte // - Reserved } -func (HnrPvt) classID() uint16 { return 0x0028 } +func (HnrPvt) ClassID() uint16 { return 0x0028 } type NavAtt struct { ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details. @@ -91,7 +97,7 @@ type NavAtt struct { AccHeading_deg uint32 } -func (NavAtt) classID() uint16 { return 0x0501 } +func (NavAtt) ClassID() uint16 { return 0x0501 } //go:generate stringer -output=strings_navpvt.go -trimprefix NavPVT -type=NavPVTFixType,NavPVTValid,NavPVTFlags,NavPVTFlags2,NavPVTFlags3