From e0ebf17cab6b13f27e3c97361dd3df37e1d45225 Mon Sep 17 00:00:00 2001 From: Timo Volkmann Date: Mon, 28 Dec 2020 14:12:28 +0100 Subject: [PATCH] some improvements --- core/datamodel.go | 4 +- core/dispatcher.go | 1 - core/pipeline_record.go | 4 +- core/pipeline_replay.go | 21 +-- static/scripts/speedometer.js | 2 +- static/scripts/websocket.js | 254 +++++++++++++++++++--------------- storage/kvstore.go | 15 +- templates/index.html | 208 +++------------------------- templates/replay.html | 6 +- web/http.go | 1 + 10 files changed, 188 insertions(+), 328 deletions(-) diff --git a/core/datamodel.go b/core/datamodel.go index 6c85690..724337d 100644 --- a/core/datamodel.go +++ b/core/datamodel.go @@ -148,7 +148,7 @@ func ConvertUbxSensorData(msg interface{}) (*SensorData, error) { sd.HeadMotion = float64(v.HeadMot_dege5) / 1e+5 sd.HeadDevice = float64(v.HeadVeh_dege5) / 1e+5 sd.HeadingAcc = float64(v.HeadAcc_dege5) / 1e+5 - sd.Speed = float64(v.GSpeed_mm_s) * 3.6e-3 + sd.Speed = float64(v.GSpeed_mm_s) * 1e-3 case *ublox.HnrPvt: //logrus.Println("HNR-PVT") sd.itow = v.ITOW_ms @@ -161,7 +161,7 @@ func ConvertUbxSensorData(msg interface{}) (*SensorData, error) { sd.HeadMotion = float64(v.HeadMot_dege5) / 1e+5 sd.HeadDevice = float64(v.HeadVeh_dege5) / 1e+5 sd.HeadingAcc = float64(v.HeadAcc_dege5) / 1e+5 - sd.Speed = float64(v.GSpeed_mm_s) + 3.6e-3 + sd.Speed = float64(v.GSpeed_mm_s) * 1e-3 // mm in m/s case *ublox.NavAtt: //logrus.Println("NAV-ATT") sd.itow = v.ITOW_ms diff --git a/core/dispatcher.go b/core/dispatcher.go index 270ac88..23c3ff4 100644 --- a/core/dispatcher.go +++ b/core/dispatcher.go @@ -45,7 +45,6 @@ func (d *dispatcher) Publish(message string) { return } logrus.Tracef("publishing to %v listeners\n", len(d.listeners)) - logrus.Trace(message) for _, ch := range d.listeners { select { case ch <- message: diff --git a/core/pipeline_record.go b/core/pipeline_record.go index 748b725..cb38dda 100644 --- a/core/pipeline_record.go +++ b/core/pipeline_record.go @@ -21,7 +21,7 @@ func NewRecordPipeline(p Publisher, s Tracker, netChan chan interface{}, serialC flowStore := flow.NewMap(storeFunc(s), 1) dataSanitizer := flow.NewMap(replaySanitizeFunc(), 1) - flowReorder := NewRearranger() + //flowReorder := NewRearranger() flowJson := flow.NewMap(jsonFunc, 1) sinkPub := newPublishSink(p) @@ -29,7 +29,7 @@ func NewRecordPipeline(p Publisher, s Tracker, netChan chan interface{}, serialC // wire up and execute demux := flow.Merge(collNet.Via(transNet), collSer.Via(transSer)) //go demux.Via(flowDelay).Via(flowStore).Via(flowJson).To(sinkPub) - go demux.Via(flowStore).Via(dataSanitizer).Via(flowReorder).Via(flowJson).To(sinkPub) + go demux.Via(flowStore).Via(dataSanitizer).Via(flowJson).To(sinkPub) return &pipelineRecord{} } diff --git a/core/pipeline_replay.go b/core/pipeline_replay.go index e534643..449e7d8 100644 --- a/core/pipeline_replay.go +++ b/core/pipeline_replay.go @@ -18,14 +18,14 @@ func NewReplayPipeline(p Publisher, t *Tracking) *pipelineReplay { // set pipeline up and wire it together collNet := ext.NewChanSource(channelFromTracking(t)) dataSanitizer := flow.NewMap(replaySanitizeFunc(), 1) - flowReorder := NewRearranger() + //flowReorder := NewRearranger() flowJson := flow.NewMap(jsonFunc, 1) sinkPub := newPublishSink(p) // wire up and execute //go collNet.Via(dataSanitizer).Via(flowJson).To(sinkPub) - go collNet.Via(dataSanitizer).Via(flowReorder).Via(flowJson).To(sinkPub) + go collNet.Via(dataSanitizer).Via(flowJson).To(sinkPub) return &pipelineReplay{} } @@ -43,19 +43,20 @@ func channelFromTracking(t *Tracking) chan interface{} { go func() { lastTs := t.Data[0].Servertime.UnixNano() lastTsNow := time.Now().UTC().UnixNano() - i := 0 - for i <= len(t.Data)-1 { - durationSinceLastEvent := t.Data[i].Servertime.UnixNano() - lastTs - timeCounter := time.Now().UTC().UnixNano() - lastTsNow - if timeCounter >= durationSinceLastEvent { + i := 0 + for i <= len(t.Data)-1 { + durationSinceLastEvent := t.Data[i].Servertime.UnixNano() - lastTs + timeCounter := time.Now().UTC().UnixNano() - lastTsNow + if timeCounter >= durationSinceLastEvent { ch <- &(t.Data[i]) + logrus.Traceln("replay tracking: ", t.Data[i]) lastTs = t.Data[i].Servertime.UnixNano() lastTsNow = time.Now().UTC().UnixNano() i++ } - } + } - logrus.Infoln("replay: pushed all tracking data to pipeline") + logrus.Infoln("replay: pushed all tracking data to pipeline") }() return ch } @@ -86,7 +87,7 @@ func replaySanitizeFunc() flow.MapFunc { sd.Timestamp = sd.Servertime.Add(time.Duration(lastOff)) } if sd.Servertime.Before(time.Unix(1608422400, 0)) && sd.Speed != 0 && sd.Source() == SOURCE_SERIAL { - sd.Speed = sd.Speed * 3.6 * 3.6 + sd.Speed = sd.Speed * 3.6 } return sd } diff --git a/static/scripts/speedometer.js b/static/scripts/speedometer.js index 18d0de1..fd53fc3 100644 --- a/static/scripts/speedometer.js +++ b/static/scripts/speedometer.js @@ -162,4 +162,4 @@ function addSpeedTcp(speedTCP){ mySpeedometer.data.datasets[0].data = [speedTCPpercent, 100-speedTCPpercent]; document.getElementById("speedTCP").innerHTML = "Speed Smartphone (km/h): " + speedTCPkmh.toFixed(1) mySpeedometer.update(); -} \ No newline at end of file +} diff --git a/static/scripts/websocket.js b/static/scripts/websocket.js index 8f1575c..5df27bc 100644 --- a/static/scripts/websocket.js +++ b/static/scripts/websocket.js @@ -1,13 +1,107 @@ const GRAPH_RES = 100; var dataSmartphone = []; window.addEventListener("load", function(evt) { - var orientation = [0,0,0]; - var multiplier = 180/Math.PI/15 var output = document.getElementById("output"); - var input = document.getElementById("input"); var checkBoxSmartphone = document.getElementById("checkbox1"); var checkBoxUblox = document.getElementById("checkbox2"); var ws; + ws = new WebSocket("ws://localhost:3011/ws"); + ws.onopen = function(evt) { + print("OPEN"); + } + ws.onclose = function(evt) { + ws = null; + print2("CLOSE"); + } + ws.onmessage = function(evt) { + //print2("RESPONSE: " + evt.data); + // let dat = JSON.parse(evt.data)["bmi26x gyroscope"] + // let dat = JSON.parse(evt.data)["lsm6dsm gyroscope"] + //let dat = JSON.parse(evt.data)["lsm6ds3c gyroscope"] + let dat = JSON.parse(evt.data) + dataSmartphone.push(dat) + //console.log(evt.data) + console.log("JSON geparsed onmessage", dat) + + try{ + if(!(dat.SOURCE_TCP.Orientation[0] === 0) && !(dat.SOURCE_TCP.Orientation[1] === 0) && !(dat.SOURCE_TCP.Orientation[2] === 0)){ + document.getElementById("gyroscopeTCP").style.transform = `rotateX(${dat.SOURCE_TCP.Orientation[0]}deg) rotateY(${dat.SOURCE_TCP.Orientation[1]}deg) rotateZ(${dat.SOURCE_TCP.Orientation[2]}deg)` + } + if(!(dat.SOURCE_TCP.Position[1] === 0) && !(dat.SOURCE_TCP.Position[0] === 0)){ + document.getElementById("TCPlong").innerHTML = "Smartphone long: " + dat.SOURCE_TCP.Position[1] + document.getElementById("TCPlat").innerHTML = "Smartphone lat: " + dat.SOURCE_TCP.Position[0] + updateMapTCP(dat.SOURCE_TCP.Position[1], dat.SOURCE_TCP.Position[0]) + map.panTo([dat.SOURCE_TCP.Position[1], dat.SOURCE_TCP.Position[0]]) + } + if(!(dat.SOURCE_TCP.Speed === 0)){ + addSpeedTcp(dat.SOURCE_TCP.Speed); + } + if(!(dat.SOURCE_TCP.HeadDevice === 0)){ + addCompassTCP(dat.SOURCE_TCP.HeadDevice) + document.getElementById("compassTCP").innerHTML = "Smartphone: " + dat.SOURCE_TCP.HeadDevice.toFixed(0) + "°" + } + } + catch{ + console.log("no TCP data") + } + + try{ + if(!(dat.SOURCE_SERIAL.Orientation[0] === 0) && /*!(dat.SOURCE_SERIAL.Orientation[1] === 0) &&*/ !(dat.SOURCE_SERIAL.Orientation[2] === 0)){ + document.getElementById("gyroscopeSERIAL").style.transform = `rotateX(${dat.SOURCE_SERIAL.Orientation[0]}deg) rotateY(${dat.SOURCE_SERIAL.Orientation[1]}deg) rotateZ(${dat.SOURCE_SERIAL.Orientation[2]}deg)` + } + if(!(dat.SOURCE_SERIAL.Position[1] === 0) && !(dat.SOURCE_SERIAL.Position[0] === 0)){ + document.getElementById("SERIALlong").innerHTML = "Ublox long: " + dat.SOURCE_SERIAL.Position[1] + document.getElementById("SERIALlat").innerHTML = "Ublox lat: " + dat.SOURCE_SERIAL.Position[0] + updateMapSERIAL(dat.SOURCE_SERIAL.Position[1], dat.SOURCE_SERIAL.Position[0]) + map.panTo([dat.SOURCE_SERIAL.Position[1], dat.SOURCE_SERIAL.Position[0]]) + } + if(!(dat.SOURCE_SERIAL.Speed === 0)){ + addSpeedSerial(dat.SOURCE_SERIAL.Speed); + } + if(!(dat.SOURCE_SERIAL.HeadDevice === 0)){ + addCompassSerial(dat.SOURCE_SERIAL.HeadDevice) + document.getElementById("compassSERIAL").innerHTML = "Ublox: " + dat.SOURCE_SERIAL.HeadDevice.toFixed(0) + "°" + } + }catch{ + console.log("no serial data") + } + + try{ + if(!(dat.SOURCE_TCP.Position[1] === 0) && !(dat.SOURCE_SERIAL.Position[1] === 0)){ + document.getElementById("diffLong").innerHTML = "Differenz long: " + Math.abs(dat.SOURCE_TCP.Position[1] - dat.SOURCE_SERIAL.Position[1]) + } + if(!(dat.SOURCE_TCP.Position[0] === 0) && !(dat.SOURCE_SERIAL.Position[0] === 0)){ + document.getElementById("diffLat").innerHTML = "Differenz lat: " + Math.abs(dat.SOURCE_TCP.Position[0] - dat.SOURCE_SERIAL.Position[0]) + } + } + catch{ + console.log("no data to compare") + } + + if(!(dat.SOURCE_SERIAL.Position[2] === 0)){ + addSerialAltData(dat.SOURCE_SERIAL.Position[2]) + } + + + /* + console.log(dat) + orientation[0] += dat[0] * multiplier + orientation[1] += dat[1] * multiplier + orientation[2] += dat[2] * multiplier + // dataset.push(orientation[0]) + // while (dataset.length >= 50) { + // dataset.shift(); + // } + // addData(orientation[0] / multiplier) + */ + } + + ws.onerror = function(evt) { + print("ERROR: " + evt.data); + } + + + var print = function(message) { var d = document.createElement("div"); d.textContent = message; @@ -20,18 +114,10 @@ window.addEventListener("load", function(evt) { output.replaceChild(d, oldNode) }; document.getElementById("open").onclick = function(evt) { - if (ws && ws.OPEN) { - print2("Websocket already open") - return false; - } - ws = new WebSocket("ws://localhost:3011/ws"); - ws.onopen = function(evt) { - print("OPEN"); - } - ws.onclose = function(evt) { - ws = null; - print2("CLOSE"); - } + // if (ws && ws.OPEN) { + // print2("Websocket already open") + // return false; + // } if(checkBoxSmartphone.checked && checkBoxUblox.checked){ fetch('http://localhost:3011/trackings?serial=true&tcp=true', { method: 'POST', body: 'some test data'}) @@ -59,100 +145,6 @@ window.addEventListener("load", function(evt) { checkBoxSmartphone.disabled = true; checkBoxUblox.disabled = true; - ws.onmessage = function(evt) { - //print2("RESPONSE: " + evt.data); - // let dat = JSON.parse(evt.data)["bmi26x gyroscope"] - // let dat = JSON.parse(evt.data)["lsm6dsm gyroscope"] - //let dat = JSON.parse(evt.data)["lsm6ds3c gyroscope"] - let dat = JSON.parse(evt.data) - dataSmartphone.push(dat) - //console.log(evt.data) - console.log("JSON geparsed onmessage", dat) - - try{ - if(!(dat.SOURCE_TCP.Orientation[0] === 0) && !(dat.SOURCE_TCP.Orientation[1] === 0) && !(dat.SOURCE_TCP.Orientation[2] === 0)){ - document.getElementById("gyroscopeTCP").style.transform = `rotateX(${dat.SOURCE_TCP.Orientation[0]}deg) rotateY(${dat.SOURCE_TCP.Orientation[1]}deg) rotateZ(${dat.SOURCE_TCP.Orientation[2]}deg)` - } - if(!(dat.SOURCE_TCP.Position[1] === 0) && !(dat.SOURCE_TCP.Position[0] === 0)){ - document.getElementById("TCPlong").innerHTML = "Smartphone long: " + dat.SOURCE_TCP.Position[1] - document.getElementById("TCPlat").innerHTML = "Smartphone lat: " + dat.SOURCE_TCP.Position[0] - updateMapTCP(dat.SOURCE_TCP.Position[1], dat.SOURCE_TCP.Position[0]) - map.panTo([dat.SOURCE_TCP.Position[1], dat.SOURCE_TCP.Position[0]]) - } - if(!(dat.SOURCE_TCP.Speed === 0)){ - addSpeedTcp(dat.SOURCE_TCP.Speed); - } - if(!(dat.SOURCE_TCP.HeadDevice === 0)){ - addCompassTCP(dat.SOURCE_TCP.HeadDevice) - document.getElementById("compassTCP").innerHTML = "Smartphone: " + dat.SOURCE_TCP.HeadDevice.toFixed(0) + "°" - } - } - catch{ - console.log("no TCP data") - } - - try{ - if(!(dat.SOURCE_SERIAL.Orientation[0] === 0) && /*!(dat.SOURCE_SERIAL.Orientation[1] === 0) &&*/ !(dat.SOURCE_SERIAL.Orientation[2] === 0)){ - document.getElementById("gyroscopeSERIAL").style.transform = `rotateX(${dat.SOURCE_SERIAL.Orientation[0]}deg) rotateY(${dat.SOURCE_SERIAL.Orientation[1]}deg) rotateZ(${dat.SOURCE_SERIAL.Orientation[2]}deg)` - } - if(!(dat.SOURCE_SERIAL.Position[1] === 0) && !(dat.SOURCE_SERIAL.Position[0] === 0)){ - document.getElementById("SERIALlong").innerHTML = "Ublox long: " + dat.SOURCE_SERIAL.Position[1] - document.getElementById("SERIALlat").innerHTML = "Ublox lat: " + dat.SOURCE_SERIAL.Position[0] - updateMapSERIAL(dat.SOURCE_SERIAL.Position[1], dat.SOURCE_SERIAL.Position[0]) - map.panTo([dat.SOURCE_SERIAL.Position[1], dat.SOURCE_SERIAL.Position[0]]) - } - if(!(dat.SOURCE_SERIAL.Speed === 0)){ - addSpeedSerial(dat.SOURCE_SERIAL.Speed); - } - if(!(dat.SOURCE_SERIAL.HeadDevice === 0)){ - addCompassSerial(dat.SOURCE_SERIAL.HeadDevice) - document.getElementById("compassSERIAL").innerHTML = "Ublox: " + dat.SOURCE_SERIAL.HeadDevice.toFixed(0) + "°" - } - }catch{ - console.log("no serial data") - } - - try{ - if(!(dat.SOURCE_TCP.Position[1] === 0) && !(dat.SOURCE_SERIAL.Position[1] === 0)){ - document.getElementById("diffLong").innerHTML = "Differenz long: " + Math.abs(dat.SOURCE_TCP.Position[1] - dat.SOURCE_SERIAL.Position[1]) - } - if(!(dat.SOURCE_TCP.Position[0] === 0) && !(dat.SOURCE_SERIAL.Position[0] === 0)){ - document.getElementById("diffLat").innerHTML = "Differenz lat: " + Math.abs(dat.SOURCE_TCP.Position[0] - dat.SOURCE_SERIAL.Position[0]) - } - } - catch{ - console.log("no data to compare") - } - - if(!(dat.SOURCE_SERIAL.Position[2] === 0)){ - addSerialAltData(dat.SOURCE_SERIAL.Position[2]) - } - - - /* - console.log(dat) - orientation[0] += dat[0] * multiplier - orientation[1] += dat[1] * multiplier - orientation[2] += dat[2] * multiplier - // dataset.push(orientation[0]) - // while (dataset.length >= 50) { - // dataset.shift(); - // } - // addData(orientation[0] / multiplier) - */ - } - - ws.onerror = function(evt) { - print("ERROR: " + evt.data); - } - return false; - }; - document.getElementById("send").onclick = function(evt) { - if (!ws) { - return false; - } - print("SEND: " + input.value); - ws.send(input.value); return false; }; document.getElementById("close").onclick = function(evt) { @@ -165,7 +157,7 @@ window.addEventListener("load", function(evt) { //------------------------Buttons------------------------------ - document.getElementById("messung starten").onclick = function(evt) { + document.getElementById("messungstarten").onclick = function(evt) { if (ws) { fetch('http://localhost:3011/trackings/', { method: 'PATCH', body: 'some data'}) .then(results => results.json()) @@ -175,7 +167,7 @@ window.addEventListener("load", function(evt) { return false; }; - document.getElementById("messung beenden").onclick = function(evt) { + document.getElementById("messungbeenden").onclick = function(evt) { if (ws) { fetch('http://localhost:3011/trackings/', { method: 'PUT', body: 'some data'}) .then(results => results.json()) @@ -185,7 +177,7 @@ window.addEventListener("load", function(evt) { return false; }; - document.getElementById("alles beenden").onclick = function(evt) { + document.getElementById("allesbeenden").onclick = function(evt) { if (ws) { fetch('http://localhost:3011/trackings/', { method: 'DELETE', body: 'some data'}) .then(results => results.json()) @@ -196,4 +188,40 @@ window.addEventListener("load", function(evt) { } return false; }; -}); \ No newline at end of file + var trackings = null; + document.getElementById("messungladen").onclick = function(evt) { + fetch('http://localhost:3011/trackings/', { method: 'GET'}).then(results => { + return results.json() + }).then(r => { + console.log(r) + if (!'data' in r) { + return + } + trackings = r.data + let sel = document.getElementById("meas") + r.data.forEach(tracking => { + console.log(tracking) + var option = document.createElement("option"); + option.text = tracking.TimeCreated + " Size: " + tracking.Size + sel.add(option) + }) + sel.disabled = false + document.getElementById("replaystarten").disabled = false + + }) + }; + + document.getElementById("replaystarten").onclick = function(evt) { + let sel = document.getElementById("meas") + console.log(trackings[sel.selectedIndex].UUID) + fetch(`http://localhost:3011/trackings/${trackings[sel.selectedIndex].UUID}`, { method: 'GET'}).then(results => { + return results.json() + }).then(r => { + console.log(r.data.Data) + }) + + } + + +}); + diff --git a/storage/kvstore.go b/storage/kvstore.go index db5730b..8677e19 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -7,7 +7,6 @@ import ( "github.com/dgraph-io/badger/v2" "github.com/google/uuid" "github.com/sirupsen/logrus" - "github.com/tidwall/pretty" "os" "path/filepath" "time" @@ -70,10 +69,10 @@ func (r *badgerStore) Save(tr core.Tracking) error { err = r.sensordatDb.Update(func(txn *badger.Txn) error { for _, v := range tr.Data { k := createRecordKey(tr.UUID, v.Source(), v.Timestamp) - logrus.Trace(v, " len key ->", len(k)) + //logrus.Trace(v, " len key ->", len(k)) j, err2 := json.Marshal(v) logrus.Traceln("save record k/v:\n", tr.UUID.String(), v.Timestamp.Format(time.RFC3339Nano)) - logrus.Traceln(string(pretty.Pretty(j))) + //logrus.Traceln(string(pretty.Pretty(j))) if err2 != nil { return err2 } @@ -174,9 +173,9 @@ func (r *badgerStore) Load(id uuid.UUID) (*core.Tracking, error) { el.Timestamp = recTime el.SetSource(source) err2 := item.Value(func(val []byte) error { - logrus.Traceln(string(val)) + //logrus.Traceln(string(val)) err3 := json.Unmarshal(val, &el) - logrus.Traceln(err3, el) + //logrus.Traceln(err3, el) return err3 }) if err2 != nil { @@ -218,11 +217,11 @@ func createRecordKey(uid uuid.UUID, source core.SourceId, timestamp time.Time) [ } func unmarshalDataKey(key []byte) (uuid.UUID, core.SourceId, time.Time) { - logrus.Trace("key len ->", len(key)) + //logrus.Trace("key len ->", len(key)) prefix := string(key[:36]) suffix := string(key[37:]) middle := string(key[36:37]) - logrus.Traceln("load as:", prefix, middle, suffix) + //logrus.Traceln("load as:", prefix, middle, suffix) var source core.SourceId switch middle { case "1": @@ -238,7 +237,7 @@ func unmarshalDataKey(key []byte) (uuid.UUID, core.SourceId, time.Time) { if err != nil { logrus.Errorln("corrupted key", err) } - logrus.Traceln(uid, timestamp) + //logrus.Traceln(uid, timestamp) //timestamp := time.Unix(0, int64(binary.BigEndian.Uint64(suffix))) return uid, source, timestamp diff --git a/templates/index.html b/templates/index.html index f8356f8..ebe9472 100644 --- a/templates/index.html +++ b/templates/index.html @@ -6,198 +6,34 @@ - - +
-

"Verbinden" clicken um eine Verbindung mit dem Server aufzubauen. {{.}}

-
-
- - -

+

"Verbinden" klicken um eine Verbindung mit dem Server aufzubauen. {{.}}

+
+
+
+ + - - - -
+ + + + + + + + + +
+

@@ -289,4 +125,4 @@ - \ No newline at end of file + diff --git a/templates/replay.html b/templates/replay.html index 7ba9b53..daf0cd9 100644 --- a/templates/replay.html +++ b/templates/replay.html @@ -107,10 +107,6 @@
-

- - -

@@ -177,4 +173,4 @@ - \ No newline at end of file + diff --git a/web/http.go b/web/http.go index b4034c7..89592bb 100644 --- a/web/http.go +++ b/web/http.go @@ -230,6 +230,7 @@ func createFiberWebsocketHandler(s core.Subscriber) func(conn *websocket.Conn) { defer s.Unsubscribe(dispatcherId) for { cmsg := <-channel + logrus.Traceln("write to ws:", cmsg) err := c.WriteMessage(websocket.TextMessage, []byte(cmsg)) if err != nil { logrus.Info("close websocket connection")