From 5a3a9feca8877c25b972c3ad5f05dbfc04831c0d Mon Sep 17 00:00:00 2001 From: Timo Volkmann Date: Fri, 18 Dec 2020 01:29:25 +0100 Subject: [PATCH] finished refactoring --- core/collectors.go | 6 +-- core/{sensordata.go => datamodel.go} | 70 ++++++++++++++++++++++------ core/dispatcher.go | 6 +-- core/pipeline.go | 10 ++-- core/pipelinex.go | 56 +++++++++++++--------- core/service.go | 10 +++- core/trackings.go | 30 ------------ storage/kvstore.go | 42 ++++++++++++----- 8 files changed, 140 insertions(+), 90 deletions(-) rename core/{sensordata.go => datamodel.go} (78%) delete mode 100644 core/trackings.go diff --git a/core/collectors.go b/core/collectors.go index 9ec1964..a5095d1 100644 --- a/core/collectors.go +++ b/core/collectors.go @@ -52,7 +52,7 @@ func (s *serialCollector) OutChannel() chan interface{} { } func (s *serialCollector) Collect() { - go func() { + go func(ch chan interface{}) { logrus.Println("start serial collector") mode := &serial.Mode{ BaudRate: 115200, @@ -97,7 +97,7 @@ func (s *serialCollector) Collect() { break } select { - case s.out <- meas: + case ch <- meas: maxSkip = 0 default: logrus.Traceln("skip collecting serial messages") @@ -109,7 +109,7 @@ func (s *serialCollector) Collect() { } logrus.Println("serial collector stopped") - }() + }(s.out) } func (s *serialCollector) Stop() { diff --git a/core/sensordata.go b/core/datamodel.go similarity index 78% rename from core/sensordata.go rename to core/datamodel.go index 70aed8c..7ab1d98 100644 --- a/core/sensordata.go +++ b/core/datamodel.go @@ -3,33 +3,77 @@ package core import ( "errors" "git.timovolkmann.de/gyrogpsc/ublox" + "github.com/google/uuid" "github.com/sirupsen/logrus" "github.com/tidwall/gjson" "math" "time" ) -type sourceId string +type Tracking struct { + TrackingMetadata + Data []SensorData +} + +type TrackingMetadata struct { + UUID uuid.UUID + TimeCreated time.Time + Collectors []CollectorType + Size int +} + +func newTracking() Tracking { + return Tracking{ + TrackingMetadata: TrackingMetadata{ + UUID: uuid.New(), + }, + Data: []SensorData{}, + } +} + +func (s *Tracking) isEmpty() bool { + if len(s.Data) != s.Size { + logrus.Errorln("data inconsistent...", len(s.Data), s.Size) + } + return len(s.Data) == 0 +} + + +type SourceId string const ( - SOURCE_TCP sourceId = "SOURCE_TCP" - SOURCE_SERIAL sourceId = "SOURCE_SERIAL" + SOURCE_TCP SourceId = "SOURCE_TCP" + SOURCE_SERIAL SourceId = "SOURCE_SERIAL" ) var timeex int64 type SensorData struct { + //MsgClass string + //FixType string itow uint32 - Source sourceId + source SourceId + latency int Servertime time.Time Timestamp time.Time - Position [3]float64 - Orientation [3]float64 + Position [3]float64 `json:",omitempty"` + PosAcc [2]float64 `json:",omitempty"`//[H,V] + Orientation [3]float64 `json:",omitempty"` + Speed float64 `json:",omitempty"` + PosHeading float64 `json:",omitempty"` // Course / Heading of Motion + HeadingAcc float64 `json:",omitempty"` + Gyroscope [3]float64 `json:",omitempty"` + LinearAcc [3]float64 `json:",omitempty"` + } -//func SensorDataEmpty() SensorData { -// return SensorData{} -//} +func (s *SensorData) Source() SourceId { + return s.source +} + +func (s *SensorData) SetSource(si SourceId) { + s.source = si +} func (s SensorData) isSameEpoch(n SensorData) bool { if n.itow == 0 { @@ -72,7 +116,7 @@ func (s SensorData) ConsolidateExTime(n SensorData) SensorData { } func (s *SensorData) checkSources(n *SensorData) { - if (s.Source != n.Source && *s != SensorData{}) { + if (s.source != n.source && *s != SensorData{}) { logrus.Println(s) logrus.Println(n) logrus.Fatalln("Do not consolidate SensorData from different Sources") @@ -87,7 +131,7 @@ var ( func ConvertUbxSensorData(msg interface{}) (*SensorData, error) { sd := &SensorData{ //Servertime: time.Now().UTC(), - Source: SOURCE_SERIAL, + source: SOURCE_SERIAL, } switch v := msg.(type) { case *ublox.NavPvt: @@ -146,7 +190,7 @@ func convertIPhoneSensorLog(jsonData []byte) (*SensorData, error) { //if ts == time.Date() sd := &SensorData{ //Servertime: time.Now().UTC(), - Source: SOURCE_TCP, + source: SOURCE_TCP, Timestamp: ts, Position: [3]float64{lat, lon, alt}, Orientation: [3]float64{pitch, roll, yaw}, @@ -168,7 +212,7 @@ func convertAndroidHyperImu(jsonData []byte) (*SensorData, error) { sd := &SensorData{ //Servertime: time.Now().UTC(), - Source: SOURCE_TCP, + source: SOURCE_TCP, Timestamp: time.Unix(0, timestamp*int64(time.Millisecond)).UTC(), Position: [3]float64{lat, lon, alt}, Orientation: [3]float64{pitch, roll, yaw}, diff --git a/core/dispatcher.go b/core/dispatcher.go index ab0da72..d908319 100644 --- a/core/dispatcher.go +++ b/core/dispatcher.go @@ -21,11 +21,11 @@ func NewDispatcher() *dispatcher { } -func (d *dispatcher) SetStreaming(on bool) (ok bool) { - if oki := d.sem.TryAcquire(1); oki && on { +func (d *dispatcher) SetStreaming(s bool) bool { + if ok := d.sem.TryAcquire(1); s && ok { // if i want to turn on and can get semaphore then return success return true - } else if !on && !oki { + } else if !s && !ok { // if i want to turn off and cant get semaphore, i can safely turn off by releasing semaphore and return success d.sem.Release(1) return true diff --git a/core/pipeline.go b/core/pipeline.go index 655f3d0..f36683a 100644 --- a/core/pipeline.go +++ b/core/pipeline.go @@ -98,14 +98,14 @@ func (p *pipeline) Push(data *SensorData) error { if (data == nil || *data == SensorData{}) { return errors.New("no data") } - //logrus.Println("push data to pipeline:", string(data.Source)) - switch data.Source { + //logrus.Println("push data to pipeline:", string(data.source)) + switch data.source { case SOURCE_TCP: go p.pushTcpDataToBuffer(*data) case SOURCE_SERIAL: go p.pushSerialDataToBuffer(*data) default: - panic("pipeline: invalid data Source") + panic("pipeline: invalid data source") } return nil } @@ -134,10 +134,10 @@ func (p *pipeline) publish() error { p.buffer.LastMeasSerial = p.buffer.MeasSerial data := map[string]interface{}{} - if p.buffer.MeasTcp.Source == SOURCE_TCP { + if p.buffer.MeasTcp.source == SOURCE_TCP { data[string(SOURCE_TCP)] = p.buffer.MeasTcp } - if p.buffer.MeasSerial.Source == SOURCE_SERIAL { + if p.buffer.MeasSerial.source == SOURCE_SERIAL { data[string(SOURCE_SERIAL)] = p.buffer.MeasSerial } p.buffer.tcpMutex.Unlock() diff --git a/core/pipelinex.go b/core/pipelinex.go index 141fec5..cb96f2c 100644 --- a/core/pipelinex.go +++ b/core/pipelinex.go @@ -1,10 +1,11 @@ package core import ( + "encoding/json" ext "github.com/reugn/go-streams/extension" "github.com/reugn/go-streams/flow" "github.com/sirupsen/logrus" - "reflect" + "time" ) type pipelineX struct { @@ -25,43 +26,54 @@ func NewPipelineX(p Publisher, s Tracker, netChan chan interface{}, serialChan c transNet := flow.NewFlatMap(transformNetFunc, 8) transSer := flow.NewFlatMap(transformSerFunc, 8) flowDelay := flow.NewMap(delayFunc(), 8) - flowStore := flow.NewMap(storeFunc(s), 1) - flowJson := flow.NewMap(jsonFunc, 8) + flowStore := flow.NewMap(storeFunc(s), 8) + //flowJson := flow.NewMap(jsonFunc, 8) sinkPub := newPublishSink(p) // wire up and execute demux := flow.Merge(collNet.Via(transNet), collSer.Via(transSer)) - go demux.Via(flowDelay).Via(flowStore).Via(flowJson).To(sinkPub) + //go demux.Via(flowDelay).Via(flowStore).Via(flowJson).To(sinkPub) + go demux.Via(flowDelay).Via(flowStore).To(sinkPub) return &pipelineX{} } -func (p *pipelineX) Run() { - -} - -func jsonFunc(i interface{}) interface{} { - return i -} - func storeFunc(s Tracker) flow.MapFunc { return func(i interface{}) interface{} { + var sd *SensorData if v, ok := i.(*SensorData); ok { - if v == nil { - logrus.Trace(v, ok) - } else { - s.Put(*v) - } + sd = v } else { - logrus.Trace(i) - logrus.Trace(reflect.TypeOf(i)) - panic("pipeline storeFunc: wrong data type") + panic("unexpected data struct") } - return i + if (*sd == SensorData{} || sd == nil) { + logrus.Info("empty data") + } else { + sd.Servertime = time.Now().UTC() + s.Put(*sd) + } + + logrus.Debugf("%-14v %-40s %-40s %v %v", sd.Source(), sd.Timestamp.Format(time.RFC3339Nano), sd.Servertime.Format(time.RFC3339Nano), sd.Position, sd.Orientation) + + data := map[string]interface{}{} + if sd.Source() == SOURCE_TCP { + data[string(SOURCE_TCP)] = *sd + } + if sd.Source() == SOURCE_SERIAL { + data[string(SOURCE_SERIAL)] = *sd + } + + jdata, err := json.Marshal(data) + //logrus.Println(string(pretty.Pretty(jdata))) + if err != nil { + logrus.Fatalln(err) + } + return string(jdata) + } } type timeDelay struct { - offsets map[sourceId]int + offsets map[SourceId]int } func delayFunc() flow.MapFunc { diff --git a/core/service.go b/core/service.go index 0f7d348..bd96f43 100644 --- a/core/service.go +++ b/core/service.go @@ -6,6 +6,7 @@ import ( "github.com/google/uuid" "github.com/sirupsen/logrus" "golang.org/x/sync/semaphore" + "sync" "time" ) @@ -50,8 +51,9 @@ type trackingService struct { collectors []Collector store Storer publisher Publisher - recSem *semaphore.Weighted config *Configuration + recSem *semaphore.Weighted + mu *sync.RWMutex } func TrackingService(c *Configuration, s Storer, p Publisher) *trackingService { @@ -61,6 +63,7 @@ func TrackingService(c *Configuration, s Storer, p Publisher) *trackingService { opMode: STOPPED, collectors: nil, recSem: semaphore.NewWeighted(1), + mu: &sync.RWMutex{}, config: c, store: s, publisher: p, @@ -75,8 +78,11 @@ func (t *trackingService) Put(data SensorData) { if !t.IsRecording() { return } + t.mu.Lock() t.tracking.Data = append(t.tracking.Data, data) + t.tracking.Size++ logrus.Traceln("raw data points: len->", len(t.tracking.Data)) + t.mu.Unlock() } func (t *trackingService) SetRecording(s bool) (ok bool) { @@ -133,7 +139,7 @@ func (t *trackingService) StartPipeline(cols ...CollectorType) (string, error) { t.tracking.Collectors = cols t.pipeline = NewPipelineX(t.publisher, t, tcp, ser) - //t.pipeline.Run() + t.publisher.SetStreaming(true) //time.Sleep(3 * time.Second) return "LIVE", nil diff --git a/core/trackings.go b/core/trackings.go deleted file mode 100644 index cd34d8a..0000000 --- a/core/trackings.go +++ /dev/null @@ -1,30 +0,0 @@ -package core - -import ( - "github.com/google/uuid" - "time" -) - -type Tracking struct { - TrackingMetadata - Data []SensorData -} - -type TrackingMetadata struct { - UUID uuid.UUID - TimeCreated time.Time - Collectors []CollectorType -} - -func newTracking() Tracking { - return Tracking{ - TrackingMetadata: TrackingMetadata{ - UUID: uuid.New(), - }, - Data: []SensorData{}, - } -} - -func (s *Tracking) isEmpty() bool { - return len(s.Data) == 0 -} diff --git a/storage/kvstore.go b/storage/kvstore.go index 735d994..694cc3e 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -69,7 +69,7 @@ func (r *badgerStore) Save(tr core.Tracking) error { } err = r.sensordatDb.Update(func(txn *badger.Txn) error { for _, v := range tr.Data { - k := createRecordKey(tr.UUID, v.Servertime) + k := createRecordKey(tr.UUID, v.Source(), v.Servertime) logrus.Trace(v, " len key ->", len(k)) j, err2 := json.Marshal(v) logrus.Traceln("save record k/v:\n", tr.UUID.String(), v.Servertime.Format(time.RFC3339Nano)) @@ -169,9 +169,10 @@ func (r *badgerStore) Load(id uuid.UUID) (*core.Tracking, error) { for it.Rewind(); it.Valid(); it.Next() { item := it.Item() - _, recTime := unmarshalDataKey(item.Key()) + _, source, recTime := unmarshalDataKey(item.Key()) el := core.SensorData{} el.Servertime = recTime + el.SetSource(source) err2 := item.Value(func(val []byte) error { logrus.Traceln(string(val)) err3 := json.Unmarshal(val, &el) @@ -195,33 +196,50 @@ func (r *badgerStore) Load(id uuid.UUID) (*core.Tracking, error) { return t, nil } -func createRecordKey(uid uuid.UUID, timestamp time.Time) []byte { +func createRecordKey(uid uuid.UUID, source core.SourceId, timestamp time.Time) []byte { prefix := []byte(uid.String()) + var i string + switch source { + case core.SOURCE_TCP: + i = "1" + case core.SOURCE_SERIAL: + i = "2" + } + middle := []byte(i) suffix := []byte(timestamp.Format(time.RFC3339Nano)) if timestamp.IsZero() { err := errors.New("zero value detected") logrus.Errorln("unable to create key", err) } - logrus.Traceln("save as:", string(prefix), string(suffix)) + logrus.Traceln("save as:", string(prefix), string(middle), string(suffix)) //binary.BigEndian.PutUint64(suffix, uint64(timestamp.UnixNano())) - return append(prefix, suffix...) + ret := append(prefix, middle...) + return append(ret, suffix...) } -func unmarshalDataKey(key []byte) (uuid.UUID, time.Time) { +func unmarshalDataKey(key []byte) (uuid.UUID, core.SourceId, time.Time) { logrus.Trace("key len ->", len(key)) - prefix := key[:36] - suffix := key[36:] - logrus.Traceln("load as:", string(prefix), string(suffix)) - uid, err := uuid.Parse(string(prefix)) + prefix := string(key[:36]) + suffix := string(key[37:]) + middle := string(key[36:37]) + logrus.Traceln("load as:", prefix, middle, suffix) + var source core.SourceId + switch middle { + case "1": + source = core.SOURCE_TCP + case "2": + source = core.SOURCE_SERIAL + } + uid, err := uuid.Parse(prefix) if err != nil { logrus.Errorln("corrupted key", err) } - timestamp, err := time.Parse(time.RFC3339Nano, string(suffix)) + timestamp, err := time.Parse(time.RFC3339Nano, suffix) if err != nil { logrus.Errorln("corrupted key", err) } logrus.Traceln(uid, timestamp) //timestamp := time.Unix(0, int64(binary.BigEndian.Uint64(suffix))) - return uid, timestamp + return uid, source, timestamp }