From eae4bc2e67144b6eacb7cb30ad5c203c04032346 Mon Sep 17 00:00:00 2001 From: Timo Volkmann Date: Sun, 6 Dec 2020 22:26:49 +0100 Subject: [PATCH] changed ubx height reference and optimized logging --- cmd/server/server.go | 1 - core/collectors.go | 13 +++++++--- core/format.go | 34 +++++++++++++++++++++------ core/http.go | 2 +- core/pipeline.go | 56 ++++++++++++++++++++++---------------------- go.mod | 1 + 6 files changed, 67 insertions(+), 40 deletions(-) diff --git a/cmd/server/server.go b/cmd/server/server.go index 3b6bb8f..5714c83 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -17,7 +17,6 @@ func main() { log.Println("initialize processing pipeline") processor := core.NewPipeline(dispatcher, 50, 1000) processor.Run() - log.Println("start data collectors") collectRoutines(processor) log.Println("start http server") core.HttpListenAndServe(dispatcher, HTTP_PORT) diff --git a/core/collectors.go b/core/collectors.go index 47146c4..923d6fe 100644 --- a/core/collectors.go +++ b/core/collectors.go @@ -2,14 +2,17 @@ package core import ( "fmt" - "git.timovolkmann.de/gyrogpsc/ublox" - "go.bug.st/serial" "log" "net" "os" + + "git.timovolkmann.de/gyrogpsc/ublox" + "go.bug.st/serial" ) func TcpCollector(proc Processor, tcpPort string) { + log.Println("start tcp collectors") + listener, err := net.Listen("tcp", tcpPort) if err != nil { fmt.Println("Error listening:", err.Error()) @@ -25,6 +28,8 @@ func TcpCollector(proc Processor, tcpPort string) { fmt.Println("Error accepting: ", err.Error()) os.Exit(1) } + log.Println("...new incoming tcp connection...") + // Handle connections in a new goroutine. go jsonHandler(conn, proc) } @@ -32,6 +37,7 @@ func TcpCollector(proc Processor, tcpPort string) { // handles incoming tcp connections with json payload. func jsonHandler(conn net.Conn, proc Processor) { + defer conn.Close() // TRY reader := bufio.NewReader(conn) OR NewScanner(conn) @@ -45,7 +51,7 @@ func jsonHandler(conn net.Conn, proc Processor) { } //json := pretty.Pretty(buf[:n]) //fmt.Println(string(json)) - fmt.Println(string(buf[:n])) + //fmt.Println(string(buf[:n])) sd, err := ConvertSensorDataPhone(buf[:n]) if err != nil { log.Println(err) @@ -61,6 +67,7 @@ func jsonHandler(conn net.Conn, proc Processor) { } func SerialCollector(proc Processor, serialPort string) { + log.Println("start serial collectors") mode := &serial.Mode{ BaudRate: 115200, } diff --git a/core/format.go b/core/format.go index 5da9ef1..4ff5a08 100644 --- a/core/format.go +++ b/core/format.go @@ -3,6 +3,7 @@ package core import ( "errors" "git.timovolkmann.de/gyrogpsc/ublox" + "github.com/tidwall/gjson" "log" "time" @@ -84,16 +85,16 @@ func ConvertUbxToSensorData(msg interface{}) (*Sensordata, error) { //log.Println("NAV-PVT") sd.itow = v.ITOW_ms sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC).UnixNano() - sd.Position[0] = float64(v.Lat_dege7) - sd.Position[1] = float64(v.Lon_dege7) - sd.Position[2] = float64(v.Height_mm) + sd.Position[0] = float64(v.Lat_dege7) / 1e+7 + sd.Position[1] = float64(v.Lon_dege7) / 1e+7 + sd.Position[2] = float64(v.HMSL_mm) / 1e+3 // mm in m case *ublox.HnrPvt: //log.Println("HNR-PVT") sd.itow = v.ITOW_ms sd.Timestamp = time.Date(int(v.Year_y), time.Month(v.Month_month), int(v.Day_d), int(v.Hour_h), int(v.Min_min), int(v.Sec_s), int(v.Nano_ns), time.UTC).UnixNano() - sd.Position[0] = float64(v.Lat_dege7) - sd.Position[1] = float64(v.Lon_dege7) - sd.Position[2] = float64(v.Height_mm) + sd.Position[0] = float64(v.Lat_dege7) / 1e+7 + sd.Position[1] = float64(v.Lon_dege7) / 1e+7 + sd.Position[2] = float64(v.HMSL_mm) / 1e+3 // mm in m case *ublox.NavAtt: //log.Println("NAV-ATT") sd.itow = v.ITOW_ms @@ -112,9 +113,28 @@ func ConvertUbxToSensorData(msg interface{}) (*Sensordata, error) { } func ConvertSensorDataPhone(jsonData []byte) (*Sensordata, error) { - return convertAndroidHyperImu(jsonData) + if gjson.Get(string(jsonData), "os").String() == "hyperimu" { + return convertAndroidHyperImu(jsonData) + } + return convertIPhoneSensorLog(jsonData) } +func convertIPhoneSensorLog(jsonData []byte) (*Sensordata, error) { + timestamp := gjson.Get(string(jsonData), "locationTimestamp_since1970").Float() + lat := gjson.Get(string(jsonData), "locationLatitude").Float() + lon := gjson.Get(string(jsonData), "locationLongitude").Float() + alt := gjson.Get(string(jsonData), "locationAltitude").Float() + sd := &Sensordata{ + SourceId: SOURCE_TCP, + Timestamp: int64(timestamp * float64(time.Second)), + //Timestamp: time.Unix(0, prep.Timestamp * int64(time.Millisecond)), + Position: [3]float64{lat, lon, alt}, + Orientation: [3]float64{0, 0, 0}, + } + return sd, nil +} + + func convertAndroidHyperImu(jsonData []byte) (*Sensordata, error) { prep := struct { Timestamp int64 `njson:"Timestamp"` diff --git a/core/http.go b/core/http.go index b957ac6..8e9468d 100644 --- a/core/http.go +++ b/core/http.go @@ -30,7 +30,7 @@ func echo(sub Subscriber) func(w http.ResponseWriter, r *http.Request) { dispatcherId, channel := sub.Subscribe() defer sub.Unsubscribe(dispatcherId) for { - log.Println("") + //log.Println("") //if err != nil { // log.Println("read:", err) // break diff --git a/core/pipeline.go b/core/pipeline.go index 9f031ee..2bd447e 100644 --- a/core/pipeline.go +++ b/core/pipeline.go @@ -15,9 +15,9 @@ type Processor interface { } type pipeline struct { - synchronizer - aggregator - Publisher + syn synchronizer + agr aggregator + pub Publisher publishTicker *time.Ticker } @@ -38,12 +38,12 @@ func NewPipeline(d Publisher, publishIntervalMs int, delayUpdateIntervalMs int) } func (p *pipeline) Run() { - go p.schedule() + go p.syn.schedule() go func() { for { <-p.publishTicker.C err := p.Publish() - if err != nil /*&& err.Error() != "no data available"*/ { + if err != nil && err.Error() != "no data available" { log.Println(err) } } @@ -52,30 +52,30 @@ func (p *pipeline) Run() { } func (p *pipeline) Publish() error { - p.tcpMutex.Lock() - p.serialMutex.Lock() - //log.Println(p.tcpSensorData) - //log.Println(p.serialSensorData) - if (p.tcpSensorData == Sensordata{} && p.serialSensorData == Sensordata{}) { - p.tcpMutex.Unlock() - p.serialMutex.Unlock() + p.agr.tcpMutex.Lock() + p.agr.serialMutex.Lock() + //log.Println(pub.tcpSensorData) + //log.Println(pub.serialSensorData) + if (p.agr.tcpSensorData == Sensordata{} && p.agr.serialSensorData == Sensordata{}) { + p.agr.tcpMutex.Unlock() + p.agr.serialMutex.Unlock() return errors.New("no data available") } data := map[string]Sensordata{ - string(SOURCE_TCP): p.tcpSensorData, - string(SOURCE_SERIAL): p.serialSensorData, + string(SOURCE_TCP): p.agr.tcpSensorData, + string(SOURCE_SERIAL): p.agr.serialSensorData, } - p.tcpSensorData = Sensordata{} - p.serialSensorData = Sensordata{} - p.tcpMutex.Unlock() - p.serialMutex.Unlock() + p.agr.tcpSensorData = Sensordata{} + p.agr.serialSensorData = Sensordata{} + p.agr.tcpMutex.Unlock() + p.agr.serialMutex.Unlock() jdata, err := json.Marshal(data) - log.Println(string(jdata)) + //log.Println(string(pretty.Pretty(jdata))) if err != nil { return err } - p.Publisher.Publish(string(jdata)) + p.pub.Publish(string(jdata)) return nil } @@ -137,14 +137,14 @@ func (p *pipeline) Process(data *Sensordata) error { } func (p *pipeline) pushTcpDataToBuffer(data Sensordata) { - time.Sleep(time.Duration(p.tcpDelayMs)) - p.tcpMutex.Lock() - p.tcpSensorData = data - p.tcpMutex.Unlock() + time.Sleep(time.Duration(p.syn.tcpDelayMs)) + p.agr.tcpMutex.Lock() + p.agr.tcpSensorData = data + p.agr.tcpMutex.Unlock() } func (p *pipeline) pushSerialDataToBuffer(data Sensordata) { - time.Sleep(time.Duration(p.serialDelayMs)) - p.serialMutex.Lock() - p.serialSensorData = p.serialSensorData.Consolidate(data) - p.serialMutex.Unlock() + time.Sleep(time.Duration(p.syn.serialDelayMs)) + p.agr.serialMutex.Lock() + p.agr.serialSensorData = p.agr.serialSensorData.Consolidate(data) + p.agr.serialMutex.Unlock() } diff --git a/go.mod b/go.mod index 77abb86..1b52ad7 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.15 require ( github.com/gorilla/websocket v1.4.2 github.com/m7shapan/njson v1.0.1 + github.com/tidwall/gjson v1.6.0 github.com/tidwall/pretty v1.0.2 go.bug.st/serial v1.1.1 golang.org/x/sys v0.0.0-20201107080550-4d91cf3a1aaf // indirect