diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 0000000..73f69e0
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,8 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml
+# Editor-based HTTP Client requests
+/httpRequests/
diff --git a/.idea/misc.xml b/.idea/misc.xml
new file mode 100644
index 0000000..0e93714
--- /dev/null
+++ b/.idea/misc.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index ac4ad06..94a25f7 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -2,6 +2,5 @@
-
\ No newline at end of file
diff --git a/.idea/workspace.xml b/.idea/workspace.xml
index 4881256..7ca8493 100644
--- a/.idea/workspace.xml
+++ b/.idea/workspace.xml
@@ -22,13 +22,25 @@
+
+
+
+
+
+
+
+
+
-
-
-
-
-
+
+
+
+
+
+
+
+
@@ -69,27 +81,18 @@
-
+
+
-
-
-
-
-
-
-
-
-
-
-
+
@@ -99,6 +102,15 @@
+
+
+
+
+
+
+
+
+
@@ -124,23 +136,9 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
@@ -231,6 +229,9 @@
+
+
+
@@ -247,6 +248,10 @@
+
+
+
+
@@ -275,8 +280,8 @@
-
-
+
+
@@ -285,8 +290,9 @@
-
-
+
+
+
@@ -295,8 +301,9 @@
-
-
+
+
+
@@ -305,8 +312,9 @@
-
-
+
+
+
@@ -315,6 +323,7 @@
+
@@ -331,11 +340,12 @@
-
-
+
+
+
@@ -344,14 +354,19 @@
+
+
+
+
-
-
+
+
+
diff --git a/cmd/serial_only/serial_only.go b/cmd/serial_only/serial_only.go
index 3cd3139..352585e 100644
--- a/cmd/serial_only/serial_only.go
+++ b/cmd/serial_only/serial_only.go
@@ -1,14 +1,21 @@
package main
import (
- "git.timovolkmann.de/gyrogpsc/serial_ubx"
- "log"
+ "fmt"
+ "git.timovolkmann.de/gyrogpsc/core"
+)
+
+const (
+ SERIAL_PORT = "/dev/tty.usbmodem14201"
)
func main() {
- r, err := serial_ubx.Setup("/dev/tty.usbmodem14201")
- if err != nil {
- log.Fatal(err)
- }
- r.Printloop()
+ core.SerialCollector(&printer{}, SERIAL_PORT)
+}
+
+type printer struct{}
+
+func (p *printer) Process(data *core.Sensordata) error {
+ fmt.Println(data)
+ return nil
}
diff --git a/cmd/server/server.go b/cmd/server/server.go
index dc22536..536320a 100644
--- a/cmd/server/server.go
+++ b/cmd/server/server.go
@@ -1,161 +1,25 @@
package main
import (
-<<<<<<< Updated upstream
- "encoding/json"
- "fmt"
- "git.timovolkmann.de/gyrogpsc/dispatcher"
- "git.timovolkmann.de/gyrogpsc/serial_ubx"
- "github.com/gorilla/websocket"
- "github.com/tidwall/pretty"
- "html/template"
- "log"
- "net"
- "net/http"
- "os"
+ "git.timovolkmann.de/gyrogpsc/core"
)
const (
TCP_PORT = ":3010"
+ HTTP_PORT = ":3011"
SERIAL_PORT = "/dev/tty.usbmodem14201"
-=======
- "git.timovolkmann.de/gyrogpsc/gnet"
)
-const (
- TCP_PORT = ":3010"
- HTTP_PORT = ":3011"
- SERIAL_PORT = "/dev/tty.usbmodem14201"
- MAX_WS_DELAY = 20
->>>>>>> Stashed changes
-)
-
-var upgrader = websocket.Upgrader{} // use default options
-
-func echo(d *dispatcher.Dispatcher) func(w http.ResponseWriter, r *http.Request) {
- fmt.Println("echo")
- return func(w http.ResponseWriter, r *http.Request) {
- fmt.Println("upgrading to ws")
- c, err := upgrader.Upgrade(w, r, nil)
- if err != nil {
- log.Print("upgrade:", err)
- return
- }
- //defer c.Close()
- go func() {
- for {
- if _, _, err := c.NextReader(); err != nil {
- c.Close()
- break
- }
- }
- }()
-
- dispatcherId, channel := d.Subscribe()
- defer d.Unsubscribe(dispatcherId)
- for {
- log.Println("")
- //if err != nil {
- // log.Println("read:", err)
- // break
- //}
- cmsg := <-channel
- err = c.WriteMessage(websocket.TextMessage, []byte(cmsg))
- if err != nil {
- log.Println("write:", err)
-
- break
- }
- }
- }
-}
-
-func home(w http.ResponseWriter, r *http.Request) {
- //var homeTemplate = template.Must(template.New("").ParseFiles("index.html"))
- tpl, err := template.ParseFiles("index.html")
- if err != nil {
- log.Fatalln(err)
- }
- err = tpl.Execute(w, "ws://"+r.Host+"/echo")
- if err != nil {
- log.Fatalln(err)
- }
-}
-
func main() {
- d := gnet.NewDispatcher()
- collectRoutines(d)
- http.HandleFunc("/echo", echo(d))
- http.HandleFunc("/", home)
- http.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("."))))
-
- log.Fatal(http.ListenAndServe(":3011", nil))
+ dispatcher := core.NewDispatcher()
+ processor := core.NewPipeline(dispatcher, 20, 10000)
+ collectRoutines(processor)
+ core.NewHttpServer(dispatcher, HTTP_PORT)
}
-func collectRoutines(d *gnet.Dispatcher) {
- // TODO: Hier die Sensordaten zwischenspeichern und per
-
- // collectRoutines Serial UBX Sensor Data
- go serialUbxCollector(d)
- // collectRoutines TCP JSON Sensor Data
- go tcpJsonCollector(d)
-}
-
-func serialUbxCollector(d *dispatcher.Dispatcher) {
- r, err := serial_ubx.Setup(SERIAL_PORT)
- if err != nil {
- log.Fatalln(err)
- }
-
- for {
- meas, err := r.NextMeasurement()
- if err != nil {
- continue
- }
- fmt.Println(meas)
- measjson, err := json.Marshal(meas)
- d.Publish(string(measjson))
- }
-}
-
-func tcpJsonCollector(d *dispatcher.Dispatcher) {
- listener, err := net.Listen("tcp", TCP_PORT)
- if err != nil {
- fmt.Println("Error listening:", err.Error())
- os.Exit(1)
- }
- // Close the listener when the application closes.
- defer listener.Close()
-
- for {
- // Listen for an incoming connection.
- conn, err := listener.Accept()
- if err != nil {
- fmt.Println("Error accepting: ", err.Error())
- os.Exit(1)
- }
- // Handle connections in a new goroutine.
- go handleTcpJsonSensorData(conn, d)
- }
-}
-
-// Handles incoming requests.
-func handleTcpJsonSensorData(conn net.Conn, d *dispatcher.Dispatcher) {
- defer conn.Close()
- // Make a buffer to hold incoming data.
- for {
- buf := make([]byte, 2048)
- // Read the incoming connection into the buffer.
- _, err := conn.Read(buf)
- if err != nil {
- fmt.Println("Error reading:", err.Error())
- break
- }
- json := pretty.Pretty(buf)
- fmt.Println(string(json))
- d.Publish(string(json))
- // Send a response back to person contacting us.
- //conn.Write([]byte("success"))
- // Close the connection when you're done with it.
- }
+func collectRoutines(proc core.Processor) {
+ // collect Sensor data from Serial UBX in Goroutine
+ go core.SerialCollector(proc, SERIAL_PORT)
+ // collect Sensor data from JSON over TCP in Goroutine
+ go core.TcpCollector(proc, TCP_PORT)
}
diff --git a/cmd/server_only/server_only.go b/cmd/server_only/server_only.go
deleted file mode 100644
index bd77897..0000000
--- a/cmd/server_only/server_only.go
+++ /dev/null
@@ -1,25 +0,0 @@
-package main
-
-import (
- "git.timovolkmann.de/gyrogpsc/gnet"
-)
-
-const (
- TCP_PORT = ":3010"
- HTTP_PORT = ":3011"
- SERIAL_PORT = "/dev/tty.usbmodem14201"
- MAX_WS_DELAY = 20
-)
-
-func main() {
- d := gnet.NewDispatcher()
- collectRoutines(d)
- gnet.NewHttpServer(d, HTTP_PORT)
-}
-
-func collectRoutines(d *gnet.Dispatcher) {
- // collectRoutines Serial UBX Sensor Data
- // go gnet.SerialUbxCollector(d, SERIAL_PORT)
- // collectRoutines TCP JSON Sensor Data
- go gnet.TcpJsonCollector(d, TCP_PORT)
-}
diff --git a/cmd/tcp_only/tcp_only.go b/cmd/tcp_only/tcp_only.go
new file mode 100644
index 0000000..b9383f1
--- /dev/null
+++ b/cmd/tcp_only/tcp_only.go
@@ -0,0 +1,22 @@
+package main
+
+import (
+ "git.timovolkmann.de/gyrogpsc/core"
+)
+
+const (
+ TCP_PORT = ":3010"
+ HTTP_PORT = ":3011"
+)
+
+func main() {
+ dispatcher := core.NewDispatcher()
+ processor := core.NewPipeline(dispatcher, 20, 10000)
+ collectRoutines(processor)
+ core.NewHttpServer(dispatcher, HTTP_PORT)
+}
+
+func collectRoutines(proc core.Processor) {
+ // collect Sensor data from JSON over TCP in Goroutine
+ go core.TcpCollector(proc, TCP_PORT)
+}
diff --git a/core/collectors.go b/core/collectors.go
new file mode 100644
index 0000000..b5a5b21
--- /dev/null
+++ b/core/collectors.go
@@ -0,0 +1,93 @@
+package core
+
+import (
+ "fmt"
+ "git.timovolkmann.de/gyrogpsc/ublox"
+ "go.bug.st/serial"
+ "log"
+ "net"
+ "os"
+)
+
+func TcpCollector(proc Processor, tcpPort string) {
+ listener, err := net.Listen("tcp", tcpPort)
+ if err != nil {
+ fmt.Println("Error listening:", err.Error())
+ os.Exit(1)
+ }
+ // Close the listener when the application closes.
+ defer listener.Close()
+
+ for {
+ // Listen for an incoming connection.
+ conn, err := listener.Accept()
+ if err != nil {
+ fmt.Println("Error accepting: ", err.Error())
+ os.Exit(1)
+ }
+ // Handle connections in a new goroutine.
+ go jsonHandler(conn, proc)
+ }
+}
+
+// handles incoming tcp connections with json payload.
+func jsonHandler(conn net.Conn, proc Processor) {
+ defer conn.Close()
+
+ // TRY reader := bufio.NewReader(conn) OR NewScanner(conn)
+ buf := make([]byte, 2048)
+ for {
+ // Read the incoming connection into the buffer.
+ n, err := conn.Read(buf)
+ if err != nil {
+ fmt.Println("TCP error - reading from connection:", n, err.Error())
+ break
+ }
+ //json := pretty.Pretty(buf[:n])
+ //fmt.Println(string(json))
+ fmt.Println(string(buf[:n]))
+ sd, err := ConvertSensorDataPhone(buf[:n])
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+
+ err = proc.Process(sd)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+ }
+}
+
+func SerialCollector(proc Processor, serialPort string) {
+ mode := &serial.Mode{
+ BaudRate: 115200,
+ }
+ port, err := serial.Open(serialPort, mode)
+ if err != nil {
+ log.Fatalln(err)
+ }
+
+ decoder := ublox.NewDecoder(port)
+
+ for {
+ meas, err := decoder.Decode()
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+ sd, err := ConvertUbxToSensorData(meas)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+
+ err = proc.Process(sd)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+
+ }
+}
diff --git a/gnet/dispatcher.go b/core/dispatcher.go
similarity index 59%
rename from gnet/dispatcher.go
rename to core/dispatcher.go
index 0dd7144..fb5455c 100644
--- a/gnet/dispatcher.go
+++ b/core/dispatcher.go
@@ -1,31 +1,40 @@
-package gnet
+package core
import (
"errors"
"fmt"
)
-type Dispatcher struct {
+type Subscriber interface {
+ Subscribe() (int16, <-chan string)
+ Unsubscribe(id int16) error
+}
+
+type Publisher interface {
+ Publish(message string)
+}
+
+type dispatcher struct {
listeners map[int16]chan string
counter int16
}
-func NewDispatcher() *Dispatcher {
+func NewDispatcher() *dispatcher {
fmt.Println("new dispatcher")
- return &Dispatcher{
+ return &dispatcher{
listeners: make(map[int16]chan string),
counter: 0,
}
}
-func (d *Dispatcher) Publish(message string) {
+func (d *dispatcher) Publish(message string) {
//fmt.Println("publish to listeners", len(d.listeners))
for _, ch := range d.listeners {
ch <- message
}
}
-func (d *Dispatcher) Subscribe() (id int16, receiver <-chan string) {
+func (d *dispatcher) Subscribe() (id int16, receiver <-chan string) {
fmt.Println("subscribe")
key := d.counter
d.counter++
@@ -34,7 +43,7 @@ func (d *Dispatcher) Subscribe() (id int16, receiver <-chan string) {
return key, rec
}
-func (d *Dispatcher) Unsubscribe(id int16) error {
+func (d *dispatcher) Unsubscribe(id int16) error {
fmt.Println("unsubscribe")
receiver, ok := d.listeners[id]
if !ok {
diff --git a/core/format.go b/core/format.go
new file mode 100644
index 0000000..0ef83d1
--- /dev/null
+++ b/core/format.go
@@ -0,0 +1,108 @@
+package core
+
+import (
+ "errors"
+ "git.timovolkmann.de/gyrogpsc/ublox"
+ "log"
+ "time"
+
+ "github.com/m7shapan/njson"
+)
+
+/*{
+"smartphone": {
+// hier daten von hyperimu
+},
+"serial": {
+// hier Daten von M8U:
+"timestamp": 37539672354
+"position": [0, 0, 0],
+"orientation": [0, 0, 0]
+}
+}
+*/
+
+type sourceId string
+
+const (
+ SOURCE_TCP sourceId = "SOURCE_TCP"
+ SOURCE_SERIAL sourceId = "SOURCE_SERIAL"
+)
+
+type Sensordata struct {
+ itow uint32
+ SourceId sourceId
+ Timestamp int64
+ Position [3]float64
+ Orientation [3]float64
+}
+
+func (s *Sensordata) isSameEpoch(n *Sensordata) bool {
+ if n.itow == 0 {
+ return false
+ }
+ return s.itow == n.itow
+}
+
+func (s *Sensordata) Consolidate(n *Sensordata) *Sensordata {
+ if s.SourceId != n.SourceId {
+ log.Fatalln("Do not consolidate Sensordata from different Sources")
+ }
+ if s.isSameEpoch(n) {
+ // TODO: implement epoch consolidation logic
+ }
+ return n
+}
+
+var (
+ errNotImplemented = errors.New("message not implemented")
+)
+
+func ConvertUbxToSensorData(msg interface{}) (*Sensordata, error) {
+ sd := &Sensordata{}
+ switch v := msg.(type) {
+ case *ublox.NavPvt:
+ sd.itow = v.ITOW_ms
+ sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC).UnixNano()
+ sd.Position[0] = float64(v.Lat_dege7)
+ sd.Position[1] = float64(v.Lon_dege7)
+ sd.Position[2] = float64(v.Height_mm)
+ case *ublox.HnrPvt:
+ sd.itow = v.ITOW_ms
+ sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC).UnixNano()
+ sd.Position[0] = float64(v.Lat_dege7)
+ sd.Position[1] = float64(v.Lon_dege7)
+ sd.Position[2] = float64(v.Height_mm)
+ case *ublox.NavAtt:
+ sd.itow = v.ITOW_ms
+ sd.Orientation[0] = float64(v.Pitch_deg)
+ sd.Orientation[1] = float64(v.Roll_deg)
+ sd.Orientation[2] = float64(v.Heading_deg)
+ default:
+ return nil, errNotImplemented
+ }
+ return sd, nil
+}
+
+func ConvertSensorDataPhone(jsonData []byte) (*Sensordata, error) {
+ return convertAndroidHyperImu(jsonData)
+}
+
+func convertAndroidHyperImu(jsonData []byte) (*Sensordata, error) {
+ prep := struct {
+ Timestamp int64 `njson:"Timestamp"`
+ Position [3]float64 `njson:"GPS"`
+ Orientation [3]float64 `njson:"orientation"`
+ }{}
+ err := njson.Unmarshal(jsonData, &prep)
+ if err != nil {
+ return nil, err
+ }
+ sd := &Sensordata{
+ Timestamp: prep.Timestamp * int64(time.Millisecond),
+ //Timestamp: time.Unix(0, prep.Timestamp * int64(time.Millisecond)),
+ Position: prep.Position,
+ Orientation: prep.Orientation,
+ }
+ return sd, nil
+}
diff --git a/core/http.go b/core/http.go
new file mode 100644
index 0000000..0328766
--- /dev/null
+++ b/core/http.go
@@ -0,0 +1,67 @@
+package core
+
+import (
+ "fmt"
+ "github.com/gorilla/websocket"
+ "html/template"
+ "log"
+ "net/http"
+)
+
+func echo(sub Subscriber) func(w http.ResponseWriter, r *http.Request) {
+ var upgrader = websocket.Upgrader{} // use default options
+ return func(w http.ResponseWriter, r *http.Request) {
+ fmt.Println("upgrading to ws")
+ c, err := upgrader.Upgrade(w, r, nil)
+ if err != nil {
+ log.Print("upgrade:", err)
+ return
+ }
+ //defer c.Close()
+ go func() {
+ for {
+ if _, _, err := c.NextReader(); err != nil {
+ c.Close()
+ break
+ }
+ }
+ }()
+
+ dispatcherId, channel := sub.Subscribe()
+ defer sub.Unsubscribe(dispatcherId)
+ for {
+ log.Println("")
+ //if err != nil {
+ // log.Println("read:", err)
+ // break
+ //}
+ cmsg := <-channel
+ err = c.WriteMessage(websocket.TextMessage, []byte(cmsg))
+ if err != nil {
+ log.Println("write:", err)
+
+ break
+ }
+ }
+ }
+}
+
+func home(w http.ResponseWriter, r *http.Request) {
+ //var homeTemplate = template.Must(template.NewDispatcher("").ParseFiles("index.html"))
+ tpl, err := template.ParseFiles("index.html")
+ if err != nil {
+ log.Fatalln(err)
+ }
+ err = tpl.Execute(w, "ws://"+r.Host+"/echo")
+ if err != nil {
+ log.Fatalln(err)
+ }
+}
+
+func NewHttpServer(sub Subscriber, httpPort string) {
+ http.HandleFunc("/echo", echo(sub))
+ http.HandleFunc("/", home)
+ http.Handle("/static/", http.FileServer(http.Dir(".")))
+
+ log.Fatal(http.ListenAndServe(httpPort, nil))
+}
diff --git a/core/pipeline.go b/core/pipeline.go
new file mode 100644
index 0000000..8d98e81
--- /dev/null
+++ b/core/pipeline.go
@@ -0,0 +1,127 @@
+package core
+
+import (
+ "encoding/json"
+ "errors"
+ "log"
+ "sync"
+ "time"
+)
+
+type Processor interface {
+ Process(data *Sensordata) error
+}
+
+type pipeline struct {
+ synchronizer
+ aggregator
+ Publisher
+ publishTicker *time.Ticker
+}
+
+func NewPipeline(d Publisher, publishIntervalMs int, delayUpdateIntervalMs int) *pipeline {
+ return &pipeline{
+ synchronizer{
+ bufferSize: 100,
+ updateTicker: time.NewTicker(time.Duration(delayUpdateIntervalMs) * time.Second),
+ },
+ aggregator{},
+ d,
+ time.NewTicker(time.Duration(publishIntervalMs) * time.Second),
+ }
+}
+
+func (p *pipeline) Run() {
+ go p.schedule()
+ go func() {
+ for {
+ <-p.publishTicker.C
+ err := p.Publish()
+ if err != nil {
+ log.Println(err)
+ }
+ }
+ }()
+}
+
+func (p *pipeline) Publish() error {
+ if (p.tcpSensorData == Sensordata{} && p.serialSensorData == Sensordata{}) {
+ return errors.New("No Data available")
+ }
+ data := map[string]Sensordata{
+ string(p.tcpSensorData.SourceId): p.tcpSensorData,
+ string(p.serialSensorData.SourceId): p.serialSensorData,
+ }
+ jdata, err := json.Marshal(data)
+ if err != nil {
+ return err
+ }
+ p.Publisher.Publish(string(jdata))
+ return nil
+}
+
+type aggregator struct {
+ tcpSensorData Sensordata
+ serialSensorData Sensordata
+ tcpMutex *sync.Mutex
+ serialMutex *sync.Mutex
+}
+
+type UnixNanoTime int64
+
+type synchronizer struct {
+ tcpDelayMs int
+ serialDelayMs int
+ tcpBuffer map[UnixNanoTime]Sensordata
+ serialBuffer map[UnixNanoTime]Sensordata
+ bufferSize int
+ mutex *sync.Mutex
+ updateTicker *time.Ticker
+ // should run concurrently
+ //
+ // Methods:
+ // pushSensordata(Sensordata), remove oldest if larger than bufferSize
+ // refreshDelay()
+ // Schedule()
+}
+
+func (s *synchronizer) schedule() {
+ for {
+ <-s.updateTicker.C
+ err := s.refreshDelay()
+ if err != nil {
+ log.Println(err)
+ }
+ }
+}
+
+func (s *synchronizer) refreshDelay() error {
+ // TODO: implement
+ return nil
+}
+
+func (p *pipeline) Process(data *Sensordata) error {
+ switch data.SourceId {
+ case SOURCE_TCP:
+ go p.pushTcpDataToBuffer(*data)
+ return nil
+ case SOURCE_SERIAL:
+ go p.pushSerialDataToBuffer(*data)
+ return nil
+ default:
+ return errors.New("invalid data source")
+ }
+}
+
+func (p *pipeline) pushTcpDataToBuffer(data Sensordata) {
+ time.Sleep(time.Duration(p.tcpDelayMs))
+ p.tcpMutex.Lock()
+ p.tcpSensorData = data
+ p.tcpMutex.Unlock()
+}
+func (p *pipeline) pushSerialDataToBuffer(data Sensordata) {
+ time.Sleep(time.Duration(p.serialDelayMs))
+ p.serialMutex.Lock()
+ p.serialSensorData = data
+ p.serialMutex.Unlock()
+}
diff --git a/gnet/net.go b/gnet/net.go
deleted file mode 100644
index c8981e9..0000000
--- a/gnet/net.go
+++ /dev/null
@@ -1,131 +0,0 @@
-package gnet
-
-import (
- "encoding/json"
- "fmt"
- "git.timovolkmann.de/gyrogpsc/serial_ubx"
- "github.com/gorilla/websocket"
- "github.com/tidwall/pretty"
- "html/template"
- "log"
- "net"
- "net/http"
- "os"
-)
-
-func echo(d *Dispatcher) func(w http.ResponseWriter, r *http.Request) {
- var upgrader = websocket.Upgrader{} // use default options
- return func(w http.ResponseWriter, r *http.Request) {
- fmt.Println("upgrading to ws")
- c, err := upgrader.Upgrade(w, r, nil)
- if err != nil {
- log.Print("upgrade:", err)
- return
- }
- //defer c.Close()
- go func() {
- for {
- if _, _, err := c.NextReader(); err != nil {
- c.Close()
- break
- }
- }
- }()
-
- dispatcherId, channel := d.Subscribe()
- defer d.Unsubscribe(dispatcherId)
- for {
- log.Println("")
- //if err != nil {
- // log.Println("read:", err)
- // break
- //}
- cmsg := <-channel
- err = c.WriteMessage(websocket.TextMessage, []byte(cmsg))
- if err != nil {
- log.Println("write:", err)
-
- break
- }
- }
- }
-}
-
-func home(w http.ResponseWriter, r *http.Request) {
- //var homeTemplate = template.Must(template.NewDispatcher("").ParseFiles("index.html"))
- tpl, err := template.ParseFiles("index.html")
- if err != nil {
- log.Fatalln(err)
- }
- err = tpl.Execute(w, "ws://"+r.Host+"/echo")
- if err != nil {
- log.Fatalln(err)
- }
-}
-
-func NewHttpServer(d *Dispatcher, httpPort string) {
- http.HandleFunc("/echo", echo(d))
- http.HandleFunc("/", home)
- http.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("."))))
-
- log.Fatal(http.ListenAndServe(httpPort, nil))
-}
-
-func SerialUbxCollector(d *Dispatcher, serialPort string) {
- r, err := serial_ubx.Setup(serialPort)
- if err != nil {
- log.Fatalln(err)
- }
-
- for {
- meas, err := r.NextMeasurement()
- if err != nil {
- continue
- }
- measjson, err := json.Marshal(meas)
- fmt.Println(string(pretty.Pretty(measjson)))
- d.Publish(string(measjson))
- }
-}
-
-func TcpJsonCollector(d *Dispatcher, tcpPort string) {
- listener, err := net.Listen("tcp", tcpPort)
- if err != nil {
- fmt.Println("Error listening:", err.Error())
- os.Exit(1)
- }
- // Close the listener when the application closes.
- defer listener.Close()
-
- for {
- // Listen for an incoming connection.
- conn, err := listener.Accept()
- if err != nil {
- fmt.Println("Error accepting: ", err.Error())
- os.Exit(1)
- }
- // Handle connections in a new goroutine.
- go handleTcpJsonSensorData(conn, d)
- }
-}
-
-// Handles incoming requests.
-func handleTcpJsonSensorData(conn net.Conn, d *Dispatcher) {
- defer conn.Close()
- // Make a buffer to hold incoming data.
- for {
- buf := make([]byte, 2048)
- // Read the incoming connection into the buffer.
- _, err := conn.Read(buf)
- if err != nil {
- fmt.Println("Error reading:", err.Error())
- break
- }
- json := pretty.Pretty(buf)
- fmt.Println(string(json))
- d.Publish(string(buf))
- // Send a response back to person contacting us.
- //conn.Write([]byte("success"))
- // Close the connection when you're done with it.
- }
-}
diff --git a/go.mod b/go.mod
index 2773932..77abb86 100644
--- a/go.mod
+++ b/go.mod
@@ -3,11 +3,9 @@ module git.timovolkmann.de/gyrogpsc
go 1.15
require (
- github.com/daedaleanai/ublox v0.0.0-20201103121443-9befa131d32d
github.com/gorilla/websocket v1.4.2
- github.com/sparkfun/SparkFun_Ublox_Arduino_Library v1.8.7 // indirect
+ github.com/m7shapan/njson v1.0.1
github.com/tidwall/pretty v1.0.2
go.bug.st/serial v1.1.1
golang.org/x/sys v0.0.0-20201107080550-4d91cf3a1aaf // indirect
- golang.org/x/tools v0.0.0-20201121010211-780cb80bd7fb // indirect
)
diff --git a/go.sum b/go.sum
index 8877819..6d0f678 100644
--- a/go.sum
+++ b/go.sum
@@ -1,50 +1,33 @@
github.com/creack/goselect v0.1.1 h1:tiSSgKE1eJtxs1h/VgGQWuXUP0YS4CDIFMp6vaI1ls0=
github.com/creack/goselect v0.1.1/go.mod h1:a/NhLweNvqIYMuxcMOuWY516Cimucms3DglDzQP3hKY=
-github.com/daedaleanai/ublox v0.0.0-20201103121443-9befa131d32d h1:WbFmX8L79E02PgDJYWINhWvceaMGUzgmrwdE5CuUBBk=
-github.com/daedaleanai/ublox v0.0.0-20201103121443-9befa131d32d/go.mod h1:pfcwlN8XUYXVYAkPU2LrFZnXIS4EvpZaXh+qRKCN9Sg=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/m7shapan/njson v1.0.1 h1:s+odQrPkzcCCGRTp46cD0XCVYN3pvdoaVwbFVmjAvys=
+github.com/m7shapan/njson v1.0.1/go.mod h1:4sidL3oRZO1KV5FkclRBPI7nqFzlIq3BwdxHRMlOa9U=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/sparkfun/SparkFun_Ublox_Arduino_Library v1.8.7 h1:T112CHmp+v1bh0W7sp49tZe8SUbw3viSGRTVLziZxfc=
-github.com/sparkfun/SparkFun_Ublox_Arduino_Library v1.8.7/go.mod h1:FVoZAzJFrR5D6P8qd2rgpJAV/qF5oODtIT9YjVV+xzY=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
+github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
+github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/tidwall/gjson v1.6.0 h1:9VEQWz6LLMUsUl6PueE49ir4Ka6CzLymOAZDxpFsTDc=
+github.com/tidwall/gjson v1.6.0/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls=
+github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc=
+github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
+github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.0.2 h1:Z7S3cePv9Jwm1KwS0513MRaoUe3S01WPbLNV40pwWZU=
github.com/tidwall/pretty v1.0.2/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
-github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.bug.st/serial v1.1.1 h1:5J1DpaIaSIruBi7jVnKXnhRS+YQ9+2PLJMtIZKoIgnc=
go.bug.st/serial v1.1.1/go.mod h1:VmYBeyJWp5BnJ0tw2NUJHZdJTGl2ecBGABHlzRK1knY=
-golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
-golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
-golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
-golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
-golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
-golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
-golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
-golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
-golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
-golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200909081042-eff7692f9009/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201107080550-4d91cf3a1aaf h1:kt3wY1Lu5MJAnKTfoMR52Cu4gwvna4VTzNOiT8tY73s=
golang.org/x/sys v0.0.0-20201107080550-4d91cf3a1aaf/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
-golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
-golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
-golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
-golang.org/x/tools v0.0.0-20201121010211-780cb80bd7fb h1:z5+u0pkAUPUWd3taoTialQ2JAMo4Wo1Z3L25U4ZV9r0=
-golang.org/x/tools v0.0.0-20201121010211-780cb80bd7fb/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
-golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
-golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
-golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
-golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/serial_ubx/serial.go b/serial_ubx/serial.go
deleted file mode 100644
index 30ae70c..0000000
--- a/serial_ubx/serial.go
+++ /dev/null
@@ -1,124 +0,0 @@
-package serial_ubx
-
-import (
- "errors"
- "fmt"
- "git.timovolkmann.de/gyrogpsc/ublox"
- "go.bug.st/serial"
- "log"
- "time"
-)
-
-type Measurement struct {
- //Timestamp int64 `json:"timestamp"`
- Timestamp time.Time `json:"timestamp"`
- Position [3]int32 `json:"position"` // Latitude, Longitude, Height
- Orientation [3]int32 `json:"orientation"` // Pitch, Roll, Heading
-}
-
-type ubxReceiver struct {
- decoder *ublox.Decoder
- currentMeas Measurement
-}
-
-func Setup(portname string) (*ubxReceiver, error) {
- mode := &serial.Mode{
- BaudRate: 115200,
- }
- port, err := serial.Open(portname, mode)
- if err != nil {
- return nil, err
- }
- return &ubxReceiver{
- decoder: ublox.NewDecoder(port),
- }, nil
-}
-
-func (u *ubxReceiver) Next() (ublox.Message, error) {
- return u.decoder.Decode()
-}
-
-var (
- errNotImplemented = errors.New("message not implemented")
-)
-
-// TODO: additional callback with adjustable timing
-func (u *ubxReceiver) NextMeasurement() (*Measurement, error) {
-
- msg, err := u.decoder.Decode()
- if err != nil {
- return nil, err
- }
- //t := time.Time{}
- switch v := msg.(type) {
- case *ublox.NavPvt:
- t, err := time.Parse(time.RFC3339Nano, formatTime(v.Year_y, v.Month_month, v.Day_d, v.Hour_h, v.Min_min, v.Sec_s, v.Nano_ns))
- if err != nil {
- log.Println(err)
- }
- //u.currentMeas.Timestamp = t.UnixNano()
- u.currentMeas.Timestamp = t
- u.currentMeas.Position[0] = v.Lat_dege7
- u.currentMeas.Position[1] = v.Lon_dege7
- u.currentMeas.Position[2] = v.Height_mm
- //fmt.Printf("%T %v\n", *v, *v)
- case *ublox.HnrPvt:
- t, err := time.Parse(time.RFC3339Nano, formatTime(v.Year_y, v.Month_month, v.Day_d, v.Hour_h, v.Min_min, v.Sec_s, v.Nano_ns))
- if err != nil {
- log.Println(err)
- }
- u.currentMeas.Timestamp = t
- u.currentMeas.Position[0] = v.Lat_dege7
- u.currentMeas.Position[1] = v.Lon_dege7
- u.currentMeas.Position[2] = v.Height_mm
- //fmt.Printf("%T %v\n", *v, *v)
- case *ublox.NavAtt:
- u.currentMeas.Orientation[0] = v.Pitch_deg
- u.currentMeas.Orientation[1] = v.Roll_deg
- u.currentMeas.Orientation[2] = v.Heading_deg
- //fmt.Printf("%T %v\n", *v, *v)
-
- //case *ublox.RawMessage:
- // //fmt.Printf("%T %v\n\n", *v, *v)
- default:
- return nil, errNotImplemented
- }
- m := u.currentMeas
- return &m, nil
-}
-
-func formatTime(Year_y uint16, Month_month byte, Day_d byte, Hour_h byte, Min_min byte, Sec_s byte, Nano_ns int32) string {
- //Nano_ns *= 1e+3
- if Nano_ns < 0 {
- Nano_ns += int32(time.Second)
- if Sec_s > 0 {
- Sec_s--
- } else if Min_min > 0 {
- Sec_s = 59
- Min_min--
- } else if Hour_h > 0 {
- Sec_s = 59
- Min_min = 59
- Hour_h--
- } else if Day_d > 1 {
- Sec_s = 59
- Min_min = 59
- Hour_h = 23
- Day_d--
- } // TODO: more cases for exact behavior! good for now...
- }
- //fmt.Printf("%04d-%02d-%02dT%02d:%02d:%02d.%09dZ00:00\n", Year_y, Month_month, Day_d, Hour_h, Min_min, Sec_s, Nano_ns )
- return fmt.Sprintf("%04d-%02d-%02dT%02d:%02d:%02d.%09dZ", Year_y, Month_month, Day_d, Hour_h, Min_min, Sec_s, Nano_ns)
-}
-
-func (u *ubxReceiver) Printloop() {
-
- for {
- meas, err := u.NextMeasurement()
- if err != nil {
- continue
- }
- fmt.Println(meas)
- }
-
-}
diff --git a/ublox/decode.go b/ublox/decode.go
index ba019af..a940e08 100644
--- a/ublox/decode.go
+++ b/ublox/decode.go
@@ -14,17 +14,17 @@ import (
"io"
)
-// A Decoder scans an io stream into UBX (0xB5-0x62 separated) or NMEA ("$xxx,,,,*FF\r\n") frames.
+// A decoder scans an io stream into UBX (0xB5-0x62 separated) or NMEA ("$xxx,,,,*FF\r\n") frames.
// If you have an unmixed stream of NMEA-only data you can use nmea.Decode() on bufio.Scanner.Bytes() directly.
-type Decoder struct {
+type decoder struct {
s *bufio.Scanner
}
// NewDecoder creates a new bufio Scanner with a splitfunc that can handle both UBX and NMEA frames.
-func NewDecoder(r io.Reader) *Decoder {
+func NewDecoder(r io.Reader) *decoder {
d := bufio.NewScanner(r)
d.Split(splitFunc)
- return &Decoder{s: d}
+ return &decoder{s: d}
}
// Assume we're either at the start of an NMEA sentence or at the start of a UBX message
@@ -74,8 +74,8 @@ func splitFunc(data []byte, atEOF bool) (advance int, token []byte, err error) {
return 1 + i1, nil, nil
}
-// Decode reads on NMEA or UBX frame and calls DecodeUbx accordingly to parse the message. Skips NMEA.
-func (d *Decoder) Decode() (msg Message, err error) {
+// Decode reads on NMEA or UBX frame and calls decodeUbx accordingly to parse the message, while skipping NMEA.
+func (d *decoder) Decode() (msg interface{}, err error) {
if !d.s.Scan() {
if err = d.s.Err(); err == nil {
err = io.EOF
@@ -89,7 +89,7 @@ func (d *Decoder) Decode() (msg Message, err error) {
return nil, err
//return nmea.Decode(d.s.Bytes())
case 0xB5:
- return DecodeUbx(d.s.Bytes())
+ return decodeUbx(d.s.Bytes())
}
panic("impossible frame")
}
@@ -99,7 +99,7 @@ var (
errInvalidChkSum = errors.New("invalid UBX checksum")
)
-func DecodeUbx(frame []byte) (msg Message, err error) {
+func decodeUbx(frame []byte) (msg Message, err error) {
buf := bytes.NewReader(frame)
@@ -149,7 +149,7 @@ func DecodeUbx(frame []byte) (msg Message, err error) {
if msg != nil {
err = binary.Read(buf, binary.LittleEndian, msg)
} else {
- msg = &RawMessage{ClassID: header.ClassID, Data: append([]byte(nil), frame[6:len(frame)-2]...)}
+ msg = &RawMessage{classID: header.ClassID, Data: append([]byte(nil), frame[6:len(frame)-2]...)}
}
//fmt.Println(msg)
diff --git a/ublox/messages.go b/ublox/messages.go
index ad3a078..610f7b2 100644
--- a/ublox/messages.go
+++ b/ublox/messages.go
@@ -1,15 +1,21 @@
package ublox
type Message interface {
- classID() uint16
+ ClassID() uint16
}
+//type UbxMessage interface {
+// Timestamp() (time.Time, error)
+// Position() ([3]float64, error)
+// Orientation() ([3]float64, error)
+//}
+
type RawMessage struct {
- ClassID uint16
+ classID uint16
Data []byte
}
-func (msg *RawMessage) classID() uint16 { return msg.ClassID }
+func (msg *RawMessage) ClassID() uint16 { return msg.classID }
type NavPvt struct {
ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details.
@@ -47,7 +53,7 @@ type NavPvt struct {
MagAcc_deg2e uint16 // 1e-2 Magnetic declination accuracy. Only supported in ADR 4.10 and later.
}
-func (NavPvt) classID() uint16 { return 0x0701 }
+func (NavPvt) ClassID() uint16 { return 0x0701 }
type HnrPvt struct {
ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details.
@@ -77,7 +83,7 @@ type HnrPvt struct {
Reserved1 [4]byte // - Reserved
}
-func (HnrPvt) classID() uint16 { return 0x0028 }
+func (HnrPvt) ClassID() uint16 { return 0x0028 }
type NavAtt struct {
ITOW_ms uint32 // - GPS time of week of the navigation epoch. See the description of iTOW for details.
@@ -91,7 +97,7 @@ type NavAtt struct {
AccHeading_deg uint32
}
-func (NavAtt) classID() uint16 { return 0x0501 }
+func (NavAtt) ClassID() uint16 { return 0x0501 }
//go:generate stringer -output=strings_navpvt.go -trimprefix NavPVT -type=NavPVTFixType,NavPVTValid,NavPVTFlags,NavPVTFlags2,NavPVTFlags3