finished refactoring

This commit is contained in:
Timo Volkmann 2020-12-18 01:29:25 +01:00
parent b32cc0edeb
commit 5a3a9feca8
8 changed files with 140 additions and 90 deletions

View File

@ -52,7 +52,7 @@ func (s *serialCollector) OutChannel() chan interface{} {
} }
func (s *serialCollector) Collect() { func (s *serialCollector) Collect() {
go func() { go func(ch chan interface{}) {
logrus.Println("start serial collector") logrus.Println("start serial collector")
mode := &serial.Mode{ mode := &serial.Mode{
BaudRate: 115200, BaudRate: 115200,
@ -97,7 +97,7 @@ func (s *serialCollector) Collect() {
break break
} }
select { select {
case s.out <- meas: case ch <- meas:
maxSkip = 0 maxSkip = 0
default: default:
logrus.Traceln("skip collecting serial messages") logrus.Traceln("skip collecting serial messages")
@ -109,7 +109,7 @@ func (s *serialCollector) Collect() {
} }
logrus.Println("serial collector stopped") logrus.Println("serial collector stopped")
}() }(s.out)
} }
func (s *serialCollector) Stop() { func (s *serialCollector) Stop() {

View File

@ -3,33 +3,77 @@ package core
import ( import (
"errors" "errors"
"git.timovolkmann.de/gyrogpsc/ublox" "git.timovolkmann.de/gyrogpsc/ublox"
"github.com/google/uuid"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
"math" "math"
"time" "time"
) )
type sourceId string type Tracking struct {
TrackingMetadata
Data []SensorData
}
type TrackingMetadata struct {
UUID uuid.UUID
TimeCreated time.Time
Collectors []CollectorType
Size int
}
func newTracking() Tracking {
return Tracking{
TrackingMetadata: TrackingMetadata{
UUID: uuid.New(),
},
Data: []SensorData{},
}
}
func (s *Tracking) isEmpty() bool {
if len(s.Data) != s.Size {
logrus.Errorln("data inconsistent...", len(s.Data), s.Size)
}
return len(s.Data) == 0
}
type SourceId string
const ( const (
SOURCE_TCP sourceId = "SOURCE_TCP" SOURCE_TCP SourceId = "SOURCE_TCP"
SOURCE_SERIAL sourceId = "SOURCE_SERIAL" SOURCE_SERIAL SourceId = "SOURCE_SERIAL"
) )
var timeex int64 var timeex int64
type SensorData struct { type SensorData struct {
//MsgClass string
//FixType string
itow uint32 itow uint32
Source sourceId source SourceId
latency int
Servertime time.Time Servertime time.Time
Timestamp time.Time Timestamp time.Time
Position [3]float64 Position [3]float64 `json:",omitempty"`
Orientation [3]float64 PosAcc [2]float64 `json:",omitempty"`//[H,V]
Orientation [3]float64 `json:",omitempty"`
Speed float64 `json:",omitempty"`
PosHeading float64 `json:",omitempty"` // Course / Heading of Motion
HeadingAcc float64 `json:",omitempty"`
Gyroscope [3]float64 `json:",omitempty"`
LinearAcc [3]float64 `json:",omitempty"`
} }
//func SensorDataEmpty() SensorData { func (s *SensorData) Source() SourceId {
// return SensorData{} return s.source
//} }
func (s *SensorData) SetSource(si SourceId) {
s.source = si
}
func (s SensorData) isSameEpoch(n SensorData) bool { func (s SensorData) isSameEpoch(n SensorData) bool {
if n.itow == 0 { if n.itow == 0 {
@ -72,7 +116,7 @@ func (s SensorData) ConsolidateExTime(n SensorData) SensorData {
} }
func (s *SensorData) checkSources(n *SensorData) { func (s *SensorData) checkSources(n *SensorData) {
if (s.Source != n.Source && *s != SensorData{}) { if (s.source != n.source && *s != SensorData{}) {
logrus.Println(s) logrus.Println(s)
logrus.Println(n) logrus.Println(n)
logrus.Fatalln("Do not consolidate SensorData from different Sources") logrus.Fatalln("Do not consolidate SensorData from different Sources")
@ -87,7 +131,7 @@ var (
func ConvertUbxSensorData(msg interface{}) (*SensorData, error) { func ConvertUbxSensorData(msg interface{}) (*SensorData, error) {
sd := &SensorData{ sd := &SensorData{
//Servertime: time.Now().UTC(), //Servertime: time.Now().UTC(),
Source: SOURCE_SERIAL, source: SOURCE_SERIAL,
} }
switch v := msg.(type) { switch v := msg.(type) {
case *ublox.NavPvt: case *ublox.NavPvt:
@ -146,7 +190,7 @@ func convertIPhoneSensorLog(jsonData []byte) (*SensorData, error) {
//if ts == time.Date() //if ts == time.Date()
sd := &SensorData{ sd := &SensorData{
//Servertime: time.Now().UTC(), //Servertime: time.Now().UTC(),
Source: SOURCE_TCP, source: SOURCE_TCP,
Timestamp: ts, Timestamp: ts,
Position: [3]float64{lat, lon, alt}, Position: [3]float64{lat, lon, alt},
Orientation: [3]float64{pitch, roll, yaw}, Orientation: [3]float64{pitch, roll, yaw},
@ -168,7 +212,7 @@ func convertAndroidHyperImu(jsonData []byte) (*SensorData, error) {
sd := &SensorData{ sd := &SensorData{
//Servertime: time.Now().UTC(), //Servertime: time.Now().UTC(),
Source: SOURCE_TCP, source: SOURCE_TCP,
Timestamp: time.Unix(0, timestamp*int64(time.Millisecond)).UTC(), Timestamp: time.Unix(0, timestamp*int64(time.Millisecond)).UTC(),
Position: [3]float64{lat, lon, alt}, Position: [3]float64{lat, lon, alt},
Orientation: [3]float64{pitch, roll, yaw}, Orientation: [3]float64{pitch, roll, yaw},

View File

@ -21,11 +21,11 @@ func NewDispatcher() *dispatcher {
} }
func (d *dispatcher) SetStreaming(on bool) (ok bool) { func (d *dispatcher) SetStreaming(s bool) bool {
if oki := d.sem.TryAcquire(1); oki && on { if ok := d.sem.TryAcquire(1); s && ok {
// if i want to turn on and can get semaphore then return success // if i want to turn on and can get semaphore then return success
return true return true
} else if !on && !oki { } else if !s && !ok {
// if i want to turn off and cant get semaphore, i can safely turn off by releasing semaphore and return success // if i want to turn off and cant get semaphore, i can safely turn off by releasing semaphore and return success
d.sem.Release(1) d.sem.Release(1)
return true return true

View File

@ -98,14 +98,14 @@ func (p *pipeline) Push(data *SensorData) error {
if (data == nil || *data == SensorData{}) { if (data == nil || *data == SensorData{}) {
return errors.New("no data") return errors.New("no data")
} }
//logrus.Println("push data to pipeline:", string(data.Source)) //logrus.Println("push data to pipeline:", string(data.source))
switch data.Source { switch data.source {
case SOURCE_TCP: case SOURCE_TCP:
go p.pushTcpDataToBuffer(*data) go p.pushTcpDataToBuffer(*data)
case SOURCE_SERIAL: case SOURCE_SERIAL:
go p.pushSerialDataToBuffer(*data) go p.pushSerialDataToBuffer(*data)
default: default:
panic("pipeline: invalid data Source") panic("pipeline: invalid data source")
} }
return nil return nil
} }
@ -134,10 +134,10 @@ func (p *pipeline) publish() error {
p.buffer.LastMeasSerial = p.buffer.MeasSerial p.buffer.LastMeasSerial = p.buffer.MeasSerial
data := map[string]interface{}{} data := map[string]interface{}{}
if p.buffer.MeasTcp.Source == SOURCE_TCP { if p.buffer.MeasTcp.source == SOURCE_TCP {
data[string(SOURCE_TCP)] = p.buffer.MeasTcp data[string(SOURCE_TCP)] = p.buffer.MeasTcp
} }
if p.buffer.MeasSerial.Source == SOURCE_SERIAL { if p.buffer.MeasSerial.source == SOURCE_SERIAL {
data[string(SOURCE_SERIAL)] = p.buffer.MeasSerial data[string(SOURCE_SERIAL)] = p.buffer.MeasSerial
} }
p.buffer.tcpMutex.Unlock() p.buffer.tcpMutex.Unlock()

View File

@ -1,10 +1,11 @@
package core package core
import ( import (
"encoding/json"
ext "github.com/reugn/go-streams/extension" ext "github.com/reugn/go-streams/extension"
"github.com/reugn/go-streams/flow" "github.com/reugn/go-streams/flow"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"reflect" "time"
) )
type pipelineX struct { type pipelineX struct {
@ -25,43 +26,54 @@ func NewPipelineX(p Publisher, s Tracker, netChan chan interface{}, serialChan c
transNet := flow.NewFlatMap(transformNetFunc, 8) transNet := flow.NewFlatMap(transformNetFunc, 8)
transSer := flow.NewFlatMap(transformSerFunc, 8) transSer := flow.NewFlatMap(transformSerFunc, 8)
flowDelay := flow.NewMap(delayFunc(), 8) flowDelay := flow.NewMap(delayFunc(), 8)
flowStore := flow.NewMap(storeFunc(s), 1) flowStore := flow.NewMap(storeFunc(s), 8)
flowJson := flow.NewMap(jsonFunc, 8) //flowJson := flow.NewMap(jsonFunc, 8)
sinkPub := newPublishSink(p) sinkPub := newPublishSink(p)
// wire up and execute // wire up and execute
demux := flow.Merge(collNet.Via(transNet), collSer.Via(transSer)) demux := flow.Merge(collNet.Via(transNet), collSer.Via(transSer))
go demux.Via(flowDelay).Via(flowStore).Via(flowJson).To(sinkPub) //go demux.Via(flowDelay).Via(flowStore).Via(flowJson).To(sinkPub)
go demux.Via(flowDelay).Via(flowStore).To(sinkPub)
return &pipelineX{} return &pipelineX{}
} }
func (p *pipelineX) Run() {
}
func jsonFunc(i interface{}) interface{} {
return i
}
func storeFunc(s Tracker) flow.MapFunc { func storeFunc(s Tracker) flow.MapFunc {
return func(i interface{}) interface{} { return func(i interface{}) interface{} {
var sd *SensorData
if v, ok := i.(*SensorData); ok { if v, ok := i.(*SensorData); ok {
if v == nil { sd = v
logrus.Trace(v, ok)
} else {
s.Put(*v)
}
} else { } else {
logrus.Trace(i) panic("unexpected data struct")
logrus.Trace(reflect.TypeOf(i))
panic("pipeline storeFunc: wrong data type")
} }
return i if (*sd == SensorData{} || sd == nil) {
logrus.Info("empty data")
} else {
sd.Servertime = time.Now().UTC()
s.Put(*sd)
}
logrus.Debugf("%-14v %-40s %-40s %v %v", sd.Source(), sd.Timestamp.Format(time.RFC3339Nano), sd.Servertime.Format(time.RFC3339Nano), sd.Position, sd.Orientation)
data := map[string]interface{}{}
if sd.Source() == SOURCE_TCP {
data[string(SOURCE_TCP)] = *sd
}
if sd.Source() == SOURCE_SERIAL {
data[string(SOURCE_SERIAL)] = *sd
}
jdata, err := json.Marshal(data)
//logrus.Println(string(pretty.Pretty(jdata)))
if err != nil {
logrus.Fatalln(err)
}
return string(jdata)
} }
} }
type timeDelay struct { type timeDelay struct {
offsets map[sourceId]int offsets map[SourceId]int
} }
func delayFunc() flow.MapFunc { func delayFunc() flow.MapFunc {

View File

@ -6,6 +6,7 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore" "golang.org/x/sync/semaphore"
"sync"
"time" "time"
) )
@ -50,8 +51,9 @@ type trackingService struct {
collectors []Collector collectors []Collector
store Storer store Storer
publisher Publisher publisher Publisher
recSem *semaphore.Weighted
config *Configuration config *Configuration
recSem *semaphore.Weighted
mu *sync.RWMutex
} }
func TrackingService(c *Configuration, s Storer, p Publisher) *trackingService { func TrackingService(c *Configuration, s Storer, p Publisher) *trackingService {
@ -61,6 +63,7 @@ func TrackingService(c *Configuration, s Storer, p Publisher) *trackingService {
opMode: STOPPED, opMode: STOPPED,
collectors: nil, collectors: nil,
recSem: semaphore.NewWeighted(1), recSem: semaphore.NewWeighted(1),
mu: &sync.RWMutex{},
config: c, config: c,
store: s, store: s,
publisher: p, publisher: p,
@ -75,8 +78,11 @@ func (t *trackingService) Put(data SensorData) {
if !t.IsRecording() { if !t.IsRecording() {
return return
} }
t.mu.Lock()
t.tracking.Data = append(t.tracking.Data, data) t.tracking.Data = append(t.tracking.Data, data)
t.tracking.Size++
logrus.Traceln("raw data points: len->", len(t.tracking.Data)) logrus.Traceln("raw data points: len->", len(t.tracking.Data))
t.mu.Unlock()
} }
func (t *trackingService) SetRecording(s bool) (ok bool) { func (t *trackingService) SetRecording(s bool) (ok bool) {
@ -133,7 +139,7 @@ func (t *trackingService) StartPipeline(cols ...CollectorType) (string, error) {
t.tracking.Collectors = cols t.tracking.Collectors = cols
t.pipeline = NewPipelineX(t.publisher, t, tcp, ser) t.pipeline = NewPipelineX(t.publisher, t, tcp, ser)
//t.pipeline.Run() t.publisher.SetStreaming(true)
//time.Sleep(3 * time.Second) //time.Sleep(3 * time.Second)
return "LIVE", nil return "LIVE", nil

View File

@ -1,30 +0,0 @@
package core
import (
"github.com/google/uuid"
"time"
)
type Tracking struct {
TrackingMetadata
Data []SensorData
}
type TrackingMetadata struct {
UUID uuid.UUID
TimeCreated time.Time
Collectors []CollectorType
}
func newTracking() Tracking {
return Tracking{
TrackingMetadata: TrackingMetadata{
UUID: uuid.New(),
},
Data: []SensorData{},
}
}
func (s *Tracking) isEmpty() bool {
return len(s.Data) == 0
}

View File

@ -69,7 +69,7 @@ func (r *badgerStore) Save(tr core.Tracking) error {
} }
err = r.sensordatDb.Update(func(txn *badger.Txn) error { err = r.sensordatDb.Update(func(txn *badger.Txn) error {
for _, v := range tr.Data { for _, v := range tr.Data {
k := createRecordKey(tr.UUID, v.Servertime) k := createRecordKey(tr.UUID, v.Source(), v.Servertime)
logrus.Trace(v, " len key ->", len(k)) logrus.Trace(v, " len key ->", len(k))
j, err2 := json.Marshal(v) j, err2 := json.Marshal(v)
logrus.Traceln("save record k/v:\n", tr.UUID.String(), v.Servertime.Format(time.RFC3339Nano)) logrus.Traceln("save record k/v:\n", tr.UUID.String(), v.Servertime.Format(time.RFC3339Nano))
@ -169,9 +169,10 @@ func (r *badgerStore) Load(id uuid.UUID) (*core.Tracking, error) {
for it.Rewind(); it.Valid(); it.Next() { for it.Rewind(); it.Valid(); it.Next() {
item := it.Item() item := it.Item()
_, recTime := unmarshalDataKey(item.Key()) _, source, recTime := unmarshalDataKey(item.Key())
el := core.SensorData{} el := core.SensorData{}
el.Servertime = recTime el.Servertime = recTime
el.SetSource(source)
err2 := item.Value(func(val []byte) error { err2 := item.Value(func(val []byte) error {
logrus.Traceln(string(val)) logrus.Traceln(string(val))
err3 := json.Unmarshal(val, &el) err3 := json.Unmarshal(val, &el)
@ -195,33 +196,50 @@ func (r *badgerStore) Load(id uuid.UUID) (*core.Tracking, error) {
return t, nil return t, nil
} }
func createRecordKey(uid uuid.UUID, timestamp time.Time) []byte { func createRecordKey(uid uuid.UUID, source core.SourceId, timestamp time.Time) []byte {
prefix := []byte(uid.String()) prefix := []byte(uid.String())
var i string
switch source {
case core.SOURCE_TCP:
i = "1"
case core.SOURCE_SERIAL:
i = "2"
}
middle := []byte(i)
suffix := []byte(timestamp.Format(time.RFC3339Nano)) suffix := []byte(timestamp.Format(time.RFC3339Nano))
if timestamp.IsZero() { if timestamp.IsZero() {
err := errors.New("zero value detected") err := errors.New("zero value detected")
logrus.Errorln("unable to create key", err) logrus.Errorln("unable to create key", err)
} }
logrus.Traceln("save as:", string(prefix), string(suffix)) logrus.Traceln("save as:", string(prefix), string(middle), string(suffix))
//binary.BigEndian.PutUint64(suffix, uint64(timestamp.UnixNano())) //binary.BigEndian.PutUint64(suffix, uint64(timestamp.UnixNano()))
return append(prefix, suffix...) ret := append(prefix, middle...)
return append(ret, suffix...)
} }
func unmarshalDataKey(key []byte) (uuid.UUID, time.Time) { func unmarshalDataKey(key []byte) (uuid.UUID, core.SourceId, time.Time) {
logrus.Trace("key len ->", len(key)) logrus.Trace("key len ->", len(key))
prefix := key[:36] prefix := string(key[:36])
suffix := key[36:] suffix := string(key[37:])
logrus.Traceln("load as:", string(prefix), string(suffix)) middle := string(key[36:37])
uid, err := uuid.Parse(string(prefix)) logrus.Traceln("load as:", prefix, middle, suffix)
var source core.SourceId
switch middle {
case "1":
source = core.SOURCE_TCP
case "2":
source = core.SOURCE_SERIAL
}
uid, err := uuid.Parse(prefix)
if err != nil { if err != nil {
logrus.Errorln("corrupted key", err) logrus.Errorln("corrupted key", err)
} }
timestamp, err := time.Parse(time.RFC3339Nano, string(suffix)) timestamp, err := time.Parse(time.RFC3339Nano, suffix)
if err != nil { if err != nil {
logrus.Errorln("corrupted key", err) logrus.Errorln("corrupted key", err)
} }
logrus.Traceln(uid, timestamp) logrus.Traceln(uid, timestamp)
//timestamp := time.Unix(0, int64(binary.BigEndian.Uint64(suffix))) //timestamp := time.Unix(0, int64(binary.BigEndian.Uint64(suffix)))
return uid, timestamp return uid, source, timestamp
} }