From e66f631c2fe613fcf7b9b8a5b268fa2669a2a52b Mon Sep 17 00:00:00 2001 From: Timo Volkmann Date: Wed, 6 Jan 2021 17:05:30 +0100 Subject: [PATCH] added readme, commented functionality, cleaned up --- cmd/replay/replay.go | 6 +- cmd/server/server.go | 43 ++------ core/config.go | 36 ++++++ core/datamodel.go | 13 ++- core/dispatcher.go | 13 ++- core/interfaces.go | 8 +- core/pipeline.go | 239 ---------------------------------------- core/pipeline_record.go | 7 +- core/pipeline_replay.go | 10 +- core/service.go | 79 ++++++------- example_config.yml | 7 +- readme.md | 0 storage/kvstore.go | 14 +-- web/http.go | 62 +++-------- 14 files changed, 133 insertions(+), 404 deletions(-) delete mode 100644 core/pipeline.go create mode 100644 readme.md diff --git a/cmd/replay/replay.go b/cmd/replay/replay.go index 710bdd1..5d92773 100644 --- a/cmd/replay/replay.go +++ b/cmd/replay/replay.go @@ -8,7 +8,7 @@ import ( "github.com/sirupsen/logrus" "github.com/spf13/viper" "net/http" - _ "net/http/pprof" + _ "net/http/pprof" ) func main() { @@ -23,13 +23,13 @@ func main() { repo := storage.NewRepository(conf) disp := core.NewDispatcher() - service := core.TrackingService(conf, repo, disp) + service := core.NewTrackingService(conf, repo, disp) go func() { // Long Run //service.LoadTracking(uuid.MustParse("06b05aa3-6a13-4ffb-8ac7-cd35dfc0f949")) // Tunnel - service.LoadTracking(uuid.MustParse("c3dbee7c-512a-4cc8-9804-21f0f2cf3c22")) + service.LoadTracking(uuid.MustParse("c3dbee7c-512a-4cc8-9804-21f0f2cf3c22"), true) //pprof.StopCPUProfile() //os.Exit(0) diff --git a/cmd/server/server.go b/cmd/server/server.go index c3e0148..a817013 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -5,53 +5,26 @@ import ( "git.timovolkmann.de/gyrogpsc/storage" "git.timovolkmann.de/gyrogpsc/web" "github.com/sirupsen/logrus" - "github.com/spf13/viper" "net/http" _ "net/http/pprof" ) func main() { + // launch profiling server in goroutine to debug performance issues go func() { logrus.Println(http.ListenAndServe("localhost:6060", nil)) }() - conf := configurationFromFile() - - logrus.Debug(conf) - + // load configuration from file + conf := core.ConfigurationFromFile() + // initialize persistence layer repo := storage.NewRepository(conf) + // initialize message distribution layer disp := core.NewDispatcher() - service := core.TrackingService(conf, repo, disp) + // initialize core logic service and inject + service := core.NewTrackingService(conf, repo, disp) + // launch webserver web.CreateServer(service, disp, conf) } - -func configurationFromFile() *core.Configuration { - viper.SetDefault("collectors.porttcp", ":3010") - viper.SetDefault("collectors.portserial", "/dev/tty.usbmodem14201") - viper.SetDefault("webserver.port", ":3011") - viper.SetDefault("pipeline.publishIntervalMs", 50) - viper.SetDefault("pipeline.syncUpdateIntervalMs", 494) - viper.SetDefault("debuglevel", "INFO") - - viper.SetConfigName("gpsconfig") // name of config file (without extension) - viper.SetConfigType("yaml") - viper.AddConfigPath(".") - viper.AddConfigPath("./../../") - if err := viper.ReadInConfig(); err != nil { - logrus.Warn("couldn't find config file. using standard configuration") - } - - c := core.Configuration{} - if err := viper.Unmarshal(&c); err != nil { - logrus.Debug("couldn't load config...") - logrus.Error(err) - } - lvl, err := logrus.ParseLevel(c.Debuglevel) - if err != nil { - logrus.Error(err) - } - logrus.SetLevel(lvl) - return &c -} diff --git a/core/config.go b/core/config.go index 076f9a8..bd83b75 100644 --- a/core/config.go +++ b/core/config.go @@ -1,5 +1,11 @@ package core +import ( + "github.com/sirupsen/logrus" + "github.com/spf13/viper" +) + +// This struct represents and holds all configurable parameters type Configuration struct { Collectors struct { TcpCollectorPort string `mapstructure:"porttcp"` @@ -14,3 +20,33 @@ type Configuration struct { } `mapstructure:"pipeline"` Debuglevel string `mapstructure:"debuglevel"` } + +// Call this function to load configuration from gpsconfig.yml +func ConfigurationFromFile() *Configuration { + viper.SetDefault("collectors.porttcp", ":3010") + viper.SetDefault("collectors.portserial", "/dev/tty.usbmodem14201") + viper.SetDefault("webserver.port", ":3011") + viper.SetDefault("pipeline.publishIntervalMs", 50) + viper.SetDefault("pipeline.syncUpdateIntervalMs", 494) + viper.SetDefault("debuglevel", "INFO") + + viper.SetConfigName("gpsconfig") // name of config file (without extension) + viper.SetConfigType("yaml") + viper.AddConfigPath(".") + viper.AddConfigPath("./../../") + if err := viper.ReadInConfig(); err != nil { + logrus.Warn("couldn't find config file. using standard configuration") + } + + c := Configuration{} + if err := viper.Unmarshal(&c); err != nil { + logrus.Debug("couldn't load config...") + logrus.Error(err) + } + lvl, err := logrus.ParseLevel(c.Debuglevel) + if err != nil { + logrus.Error(err) + } + logrus.SetLevel(lvl) + return &c +} diff --git a/core/datamodel.go b/core/datamodel.go index 724337d..3cfc2be 100644 --- a/core/datamodel.go +++ b/core/datamodel.go @@ -10,11 +10,13 @@ import ( "time" ) +// holds all sensor data and metadata for a specific recording type Tracking struct { TrackingMetadata Data []SensorData } +// holds all metadata for a specific recording type TrackingMetadata struct { UUID uuid.UUID TimeCreated time.Time @@ -38,6 +40,7 @@ func (s *Tracking) isEmpty() bool { return len(s.Data) == 0 } +// enumerate sources type SourceId string const ( @@ -45,9 +48,7 @@ const ( SOURCE_SERIAL SourceId = "SOURCE_SERIAL" ) -var lastTimeOffsetIphone int64 -var lastTimeOffsetUblox int64 - +// internal unified representation for smartphone and ublox sensor data type SensorData struct { //MsgClass string //FixType string @@ -130,6 +131,9 @@ var ( errRawMessage = errors.New("raw message") ) +var lastTimeOffsetIphone int64 +var lastTimeOffsetUblox int64 + func ConvertUbxSensorData(msg interface{}) (*SensorData, error) { sd := &SensorData{ //Servertime: time.Now().UTC(), @@ -169,9 +173,6 @@ func ConvertUbxSensorData(msg interface{}) (*SensorData, error) { sd.Orientation[1] = float64(v.Roll_deg) * 1e-5 sd.Orientation[2] = float64(v.Heading_deg) * 1e-5 case *ublox.RawMessage: - //class := make([]byte, 2) - //binary.LittleEndian.PutUint16(class, v.ClassID()) - //logrus.Printf("%#v, %#v", class[0],class[1]) return nil, nil default: return nil, errNotImplemented diff --git a/core/dispatcher.go b/core/dispatcher.go index 23c3ff4..e503f10 100644 --- a/core/dispatcher.go +++ b/core/dispatcher.go @@ -6,12 +6,14 @@ import ( "golang.org/x/sync/semaphore" ) +// dispatcher is responsible to distribute messages to subscribed listeners type dispatcher struct { listeners map[int16]chan string counter int16 sem *semaphore.Weighted } +// Returns initialized dispatcher. func NewDispatcher() *dispatcher { return &dispatcher{ listeners: make(map[int16]chan string), @@ -20,18 +22,20 @@ func NewDispatcher() *dispatcher { } } +// disable or enable streaming without removing all listeners from dispatcher func (d *dispatcher) SetStreaming(s bool) bool { if ok := d.sem.TryAcquire(1); s && ok { - // if i want to turn on and can get semaphore then return success + // if you want to turn on and can get semaphore then return success return true } else if !s && !ok { - // if i want to turn off and cant get semaphore, i can safely turn off by releasing semaphore and return success + // if you want to turn off and cant get semaphore, you can safely turn off by releasing semaphore and return success d.sem.Release(1) return true } return false } +// if closed, dispatcher will not forward published messages and drops them. func (d *dispatcher) IsClosed() bool { if d.sem.TryAcquire(1) { d.sem.Release(1) @@ -40,6 +44,8 @@ func (d *dispatcher) IsClosed() bool { return false } +// publishes message to all subscribed listeners. +// if dispatcher closed, dispatcher will not forward published messages and drops them. func (d *dispatcher) Publish(message string) { if d.IsClosed() { return @@ -54,6 +60,7 @@ func (d *dispatcher) Publish(message string) { } } +// Registers new client as listener and returns his id and a channel which is used to receive all messages. func (d *dispatcher) Subscribe() (id int16, receiver <-chan string) { key := d.counter d.counter++ @@ -62,6 +69,8 @@ func (d *dispatcher) Subscribe() (id int16, receiver <-chan string) { return key, rec } +// Unsubscribes Listener with given ID. +// if listener with given ID exists, it will be deleted and no error will be returned. func (d *dispatcher) Unsubscribe(id int16) error { receiver, ok := d.listeners[id] if !ok { diff --git a/core/interfaces.go b/core/interfaces.go index 8d2c74e..0dbba55 100644 --- a/core/interfaces.go +++ b/core/interfaces.go @@ -2,25 +2,25 @@ package core import "github.com/google/uuid" +// abstraction for dispatcher to make it replaceable type Subscriber interface { Subscribe() (int16, <-chan string) Unsubscribe(id int16) error } +// abstraction for dispatcher to make it replaceable type Publisher interface { Publish(message string) Streamer } -type Processor interface { - Push(data *SensorData) error -} - +// implementing struct should be responsible for message forwarding (to client) type Streamer interface { SetStreaming(s bool) (ok bool) IsClosed() bool } +// abstraction for persistance layer type Storer interface { Save(tracking Tracking) error LoadAll() ([]TrackingMetadata, error) diff --git a/core/pipeline.go b/core/pipeline.go deleted file mode 100644 index f36683a..0000000 --- a/core/pipeline.go +++ /dev/null @@ -1,239 +0,0 @@ -package core - -import ( - "context" - "encoding/json" - "errors" - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "github.com/sirupsen/logrus" - "golang.org/x/sync/semaphore" - "sync" - "time" -) - -type pipeline struct { - active bool - record bool - synchroniz synchronizer - buffer pipeBuffer - publisher Publisher - storer Tracker - publishTicker *time.Ticker - mu sync.RWMutex - sema *semaphore.Weighted -} - -// pipeline implements Runner & Processor -func NewPipeline(d Publisher, s Tracker, conf *Configuration) *pipeline { - return &pipeline{ - false, - false, - synchronizer{ - //bufferSize: 100, - mutex: &sync.RWMutex{}, - updateTicker: time.NewTicker(time.Duration(conf.Pipeline.SyncUpdateIntervalMs) * time.Millisecond), - }, - pipeBuffer{ - tcpMutex: &sync.Mutex{}, - serialMutex: &sync.Mutex{}, - }, - d, - s, - time.NewTicker(time.Duration(conf.Pipeline.PublishIntervalMs) * time.Millisecond), - sync.RWMutex{}, - semaphore.NewWeighted(2), - } -} - -func (p *pipeline) isPipeActive() bool { - p.mu.RLock() - defer p.mu.RUnlock() - return p.active -} -func (p *pipeline) isPipeRecording() bool { - p.mu.RLock() - defer p.mu.RUnlock() - return p.record -} - -func (p *pipeline) Run() { - p.sema.Acquire(context.Background(), 1) // !!! n=2 wenn synchronizer mitläuft - p.mu.Lock() - p.active = true - p.mu.Unlock() - logrus.Println("pipeline: processing service started") - //go func() { - // for p.isPipeActive() { - // <-p.synchroniz.updateTicker.C - // err := p.refreshDelay() - // if err != nil { - // logrus.Debugln(err) - // } - // } - // p.sema.Release(1) - // logrus.Println("pipeline: updater stopped") - //}() - go func() { - for p.isPipeActive() { - <-p.publishTicker.C - err := p.publish() - if err != nil && err.Error() != "no data available" { - logrus.Trace(err) - } - } - p.sema.Release(1) - logrus.Println("pipeline: publisher stopped") - }() -} - -func (p *pipeline) Record() { - p.record = true -} -func (p *pipeline) StopRecord() { - p.record = false -} - -func (p *pipeline) Push(data *SensorData) error { - if (data == nil || *data == SensorData{}) { - return errors.New("no data") - } - //logrus.Println("push data to pipeline:", string(data.source)) - switch data.source { - case SOURCE_TCP: - go p.pushTcpDataToBuffer(*data) - case SOURCE_SERIAL: - go p.pushSerialDataToBuffer(*data) - default: - panic("pipeline: invalid data source") - } - return nil -} - -func (p *pipeline) publish() error { - p.buffer.serialMutex.Lock() - p.buffer.tcpMutex.Lock() - - if (p.buffer.MeasTcp == SensorData{} && p.buffer.MeasSerial == SensorData{}) { - p.buffer.tcpMutex.Unlock() - p.buffer.serialMutex.Unlock() - return errors.New("no data available") - } - if cmp.Equal(p.buffer.MeasTcp, p.buffer.LastMeasTcp, cmpopts.IgnoreUnexported(SensorData{})) && - cmp.Equal(p.buffer.MeasSerial, p.buffer.LastMeasSerial, cmpopts.IgnoreUnexported(SensorData{})) { - p.buffer.tcpMutex.Unlock() - p.buffer.serialMutex.Unlock() - return errors.New("same data") - } - logrus.Debugf("–––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––") - logrus.Tracef("SER old: %-40s %-40s %v %v", p.buffer.LastMeasSerial.Timestamp.Format(time.RFC3339Nano), p.buffer.LastMeasSerial.Servertime.Format(time.RFC3339Nano), p.buffer.LastMeasSerial.Position, p.buffer.LastMeasSerial.Orientation) - logrus.Debugf("SER new: %-40s %-40s %v %v", p.buffer.MeasSerial.Timestamp.Format(time.RFC3339Nano), p.buffer.MeasSerial.Servertime.Format(time.RFC3339Nano), p.buffer.MeasSerial.Position, p.buffer.MeasSerial.Orientation) - logrus.Tracef("TCP old: %-40s %-40s %v %v", p.buffer.LastMeasTcp.Timestamp.Format(time.RFC3339Nano), p.buffer.LastMeasTcp.Servertime.Format(time.RFC3339Nano), p.buffer.LastMeasTcp.Position, p.buffer.LastMeasTcp.Orientation) - logrus.Debugf("TCP new: %-40s %-40s %v %v", p.buffer.MeasTcp.Timestamp.Format(time.RFC3339Nano), p.buffer.MeasTcp.Servertime.Format(time.RFC3339Nano), p.buffer.MeasTcp.Position, p.buffer.MeasTcp.Orientation) - p.buffer.LastMeasTcp = p.buffer.MeasTcp - p.buffer.LastMeasSerial = p.buffer.MeasSerial - - data := map[string]interface{}{} - if p.buffer.MeasTcp.source == SOURCE_TCP { - data[string(SOURCE_TCP)] = p.buffer.MeasTcp - } - if p.buffer.MeasSerial.source == SOURCE_SERIAL { - data[string(SOURCE_SERIAL)] = p.buffer.MeasSerial - } - p.buffer.tcpMutex.Unlock() - p.buffer.serialMutex.Unlock() - - jdata, err := json.Marshal(data) - //logrus.Println(string(pretty.Pretty(jdata))) - if err != nil { - return err - } - p.publisher.Publish(string(jdata)) - return nil -} - -type pipeBuffer struct { - MeasTcp SensorData - MeasSerial SensorData - LastMeasTcp SensorData - LastMeasSerial SensorData - tcpMutex *sync.Mutex - serialMutex *sync.Mutex -} - -type UnixNanoTime int64 - -type synchronizer struct { - tcpSerialDelayMs int64 - mutex *sync.RWMutex - updateTicker *time.Ticker -} - -func (p *pipeline) refreshDelay() error { - logrus.Debugf("refreshing delay...") - p.buffer.serialMutex.Lock() - p.buffer.tcpMutex.Lock() - tcpTime := p.buffer.MeasTcp.Timestamp - serTime := p.buffer.MeasSerial.Timestamp - p.buffer.tcpMutex.Unlock() - p.buffer.serialMutex.Unlock() - - if tcpTime.IsZero() || serTime.IsZero() || tcpTime.UnixNano() == 0 || serTime.UnixNano() == 0 { - return errors.New("sync not possible. zero time value detected") - } - logrus.Debug("TCP", tcpTime.Format(time.RFC3339Nano)) - logrus.Debug("SER", serTime.Format(time.RFC3339Nano)) - - currentDelay := tcpTime.Sub(serTime).Milliseconds() - p.synchroniz.mutex.Lock() - defer p.synchroniz.mutex.Unlock() - logrus.Debugf("old delay-> %vms...", p.synchroniz.tcpSerialDelayMs) - if currentDelay > 5000 || currentDelay < -5000 { - p.synchroniz.tcpSerialDelayMs = 0 - return errors.New("skipping synchronisation! time not properly configured or facing network problems.") - } - p.synchroniz.tcpSerialDelayMs += currentDelay - logrus.Infof("new delay-> %vms", p.synchroniz.tcpSerialDelayMs) - return nil -} - -func (p *pipeline) pushTcpDataToBuffer(data SensorData) { - data.Servertime = time.Now().UTC() - - if p.isPipeRecording() { - p.storer.Put(data) - } - - p.synchroniz.mutex.RLock() - if p.synchroniz.tcpSerialDelayMs > 0 { - time.Sleep(time.Duration(p.synchroniz.tcpSerialDelayMs) * time.Millisecond) - } - p.synchroniz.mutex.RUnlock() - p.buffer.tcpMutex.Lock() - p.buffer.MeasTcp = data - //p.buffer.MeasTcp = p.buffer.MeasTcp.ConsolidateExTime(data) - p.buffer.tcpMutex.Unlock() -} -func (p *pipeline) pushSerialDataToBuffer(data SensorData) { - data.Servertime = time.Now().UTC() - - if p.isPipeRecording() { - p.storer.Put(data) - } - - p.synchroniz.mutex.RLock() - if p.synchroniz.tcpSerialDelayMs < 0 { - time.Sleep(time.Duration(-p.synchroniz.tcpSerialDelayMs) * time.Millisecond) - } - p.synchroniz.mutex.RUnlock() - p.buffer.serialMutex.Lock() - p.buffer.MeasSerial = data - //p.buffer.MeasSerial = p.buffer.MeasSerial.ConsolidateEpochsOnly(data) - p.buffer.serialMutex.Unlock() -} - -func (p *pipeline) Close() { - p.mu.Lock() - p.active = false - p.mu.Unlock() -} diff --git a/core/pipeline_record.go b/core/pipeline_record.go index cb38dda..89501d1 100644 --- a/core/pipeline_record.go +++ b/core/pipeline_record.go @@ -17,19 +17,17 @@ func NewRecordPipeline(p Publisher, s Tracker, netChan chan interface{}, serialC collSer := ext.NewChanSource(serialChan) transNet := flow.NewFlatMap(transformNetFunc, 1) transSer := flow.NewFlatMap(transformSerFunc, 1) - //flowDelay := flow.NewMap(delayFunc(), 1) flowStore := flow.NewMap(storeFunc(s), 1) dataSanitizer := flow.NewMap(replaySanitizeFunc(), 1) - //flowReorder := NewRearranger() flowJson := flow.NewMap(jsonFunc, 1) sinkPub := newPublishSink(p) // wire up and execute demux := flow.Merge(collNet.Via(transNet), collSer.Via(transSer)) - //go demux.Via(flowDelay).Via(flowStore).Via(flowJson).To(sinkPub) go demux.Via(flowStore).Via(dataSanitizer).Via(flowJson).To(sinkPub) + return &pipelineRecord{} } @@ -114,6 +112,9 @@ func transformSerFunc(i interface{}) []interface{} { return append(returnSlice, sd) } +// Publish sink will pass data to dispatcher after flowing through the stream processing +// matches api to use it with github.com/reugn/go-streams/flow + type publishSink struct { in chan interface{} p Publisher diff --git a/core/pipeline_replay.go b/core/pipeline_replay.go index 5dcebbc..1997b38 100644 --- a/core/pipeline_replay.go +++ b/core/pipeline_replay.go @@ -39,14 +39,6 @@ func NewReplayPipeline(p Publisher, t *Tracking) *pipelineReplay { return r } -func logFunc() flow.MapFunc { - return func(i interface{}) interface{} { - //s := i.(SensorData) - logrus.Debugln("logfunc", i) - return i - } -} - func (p *pipelineReplay) channelFromTracking(t *Tracking) chan interface{} { ch := make(chan interface{}) sort.Slice(t.Data, func(i, j int) bool { return t.Data[i].Servertime.Before(t.Data[j].Servertime) }) @@ -109,6 +101,8 @@ func replaySanitizeFunc() flow.MapFunc { } } +// The Rearranger is not used but kept, for later experiments. + func NewRearranger() *rearranger { rearran := &rearranger{ queue: &flow.PriorityQueue{}, diff --git a/core/service.go b/core/service.go index 455a893..ba6446e 100644 --- a/core/service.go +++ b/core/service.go @@ -10,6 +10,7 @@ import ( "time" ) +// indicates state of the TrackingService type OpMode uint8 const ( @@ -19,21 +20,7 @@ const ( REPLAY ) -type Service interface { - AllTrackings() ([]TrackingMetadata, error) - StartPipeline(cols ...CollectorType) (string, error) - StartRecord() (string, error) - StopRecord() (*TrackingMetadata, error) - StopAll() (*TrackingMetadata, error) - - LoadTracking(trackingId uuid.UUID, replay bool) (*Tracking, error) - DeleteTracking(trackingId uuid.UUID) - - StartReplay() - PauseReplay() - StopReplay() -} - +// structs implementing this interface are expected to cache sensor data passed by Put(data) while recording and dropping data if not recording type Tracker interface { Put(data SensorData) Recorder @@ -44,7 +31,8 @@ type Recorder interface { IsRecording() bool } -type trackingService struct { +// this struct holds all relevant references for the TrackingService +type TrackingService struct { opMode OpMode tracking *Tracking //pipeline *pipelineRecord @@ -57,9 +45,10 @@ type trackingService struct { mu *sync.RWMutex } -func TrackingService(c *Configuration, s Storer, p Publisher) *trackingService { +// constructor +func NewTrackingService(c *Configuration, s Storer, p Publisher) *TrackingService { t := &Tracking{} - ts := &trackingService{ + ts := &TrackingService{ tracking: t, opMode: STOPPED, collectors: nil, @@ -70,13 +59,14 @@ func TrackingService(c *Configuration, s Storer, p Publisher) *trackingService { publisher: p, } - // first initialize of tcp collector to to open tcp port + // first call to to open tcp port. makes app ready to accept connections NewCollector(c, TCP) return ts } -func (t *trackingService) Put(data SensorData) { +// caches sensordata while recording +func (t *TrackingService) Put(data SensorData) { if !t.IsRecording() { return } @@ -87,7 +77,8 @@ func (t *trackingService) Put(data SensorData) { t.mu.Unlock() } -func (t *trackingService) SetRecording(s bool) (ok bool) { +// changes recording state of service +func (t *TrackingService) SetRecording(s bool) (ok bool) { if okay := t.recSem.TryAcquire(1); okay && s { // if i want to turn on and can get semaphore then return success return true @@ -99,7 +90,7 @@ func (t *trackingService) SetRecording(s bool) (ok bool) { return false } -func (t *trackingService) IsRecording() bool { +func (t *TrackingService) IsRecording() bool { if t.recSem.TryAcquire(1) { t.recSem.Release(1) return false @@ -107,8 +98,10 @@ func (t *trackingService) IsRecording() bool { return true } -func (t *trackingService) StartPipeline(cols ...CollectorType) (string, error) { +// creates a new Pipeline with requested collectors +func (t *TrackingService) StartPipeline(cols ...CollectorType) (string, error) { logrus.Info("SERVICE: NEW PIPELINE") + // check if state machine is in right state if t.opMode == RECORDING { txt := "trackingservice: please stop recording before resetting pipeline" logrus.Warn(txt) @@ -117,13 +110,12 @@ func (t *trackingService) StartPipeline(cols ...CollectorType) (string, error) { if t.opMode == LIVE { txt := "trackingservice: stop tracking running stream before creating new one" logrus.Warnln(txt) - //t.StopAll() - //time.Sleep(1000 * time.Millisecond) return "record already running since: " + t.tracking.TimeCreated.String(), errors.New(txt) } logrus.Debugln("new tracking:", cols) t.opMode = LIVE + // create and start collectors t.collectors = nil var tcp, ser chan interface{} for _, col := range cols { @@ -140,20 +132,22 @@ func (t *trackingService) StartPipeline(cols ...CollectorType) (string, error) { t.safelyReplaceTracking(newTracking()) t.tracking.Collectors = cols + // finally create pipeline NewRecordPipeline(t.publisher, t, tcp, ser) t.publisher.SetStreaming(true) - //time.Sleep(3 * time.Second) return "LIVE", nil } -func (t *trackingService) AllTrackings() ([]TrackingMetadata, error) { +// retrieves all trackings. metadata only. +func (t *TrackingService) AllTrackings() ([]TrackingMetadata, error) { logrus.Info("SERVICE: GET ALL TRACKINGS") data, err := t.store.LoadAll() return data, err } -func (t *trackingService) StartRecord() (string, error) { +// starts recording and returns state +func (t *TrackingService) StartRecord() (string, error) { logrus.Info("SERVICE: START RECORD") if t.opMode != LIVE { if t.opMode == RECORDING { @@ -172,7 +166,8 @@ func (t *trackingService) StartRecord() (string, error) { return "record started at: " + t.tracking.TimeCreated.String(), nil } -func (t *trackingService) StopRecord() (*TrackingMetadata, error) { +// stops recording and returns metadata of current tracking if successfully stopped +func (t *TrackingService) StopRecord() (*TrackingMetadata, error) { logrus.Info("SERVICE: STOP RECORD") if t.opMode != RECORDING { txt := "trackingservice: couldn't stop. not recording" @@ -187,14 +182,15 @@ func (t *trackingService) StopRecord() (*TrackingMetadata, error) { logrus.Error(err) } t.opMode = LIVE - //time.Sleep(20 * time.Millisecond) + tm := t.tracking.TrackingMetadata t.safelyReplaceTracking(newTracking()) t.tracking.Collectors = tm.Collectors return &tm, err } -func (t *trackingService) StopAll() (*TrackingMetadata, error) { +// stops live tracking and recording. if theres no active recording, no metadata will be returned +func (t *TrackingService) StopAll() (*TrackingMetadata, error) { logrus.Info("SERVICE: STOP ALL") var tm *TrackingMetadata = nil var err error @@ -216,7 +212,9 @@ func (t *trackingService) StopAll() (*TrackingMetadata, error) { return tm, err } -func (t *trackingService) LoadTracking(trackingId uuid.UUID, replay bool) (*Tracking, error) { +// retrieves tracking with all data and starts replay pipeline if desired. +// in that case the application behaves like in live mode. +func (t *TrackingService) LoadTracking(trackingId uuid.UUID, replay bool) (*Tracking, error) { if !(t.opMode == REPLAY || t.opMode == STOPPED || t.opMode == LIVE || t.opMode == RECORDING) { t.StopAll() } @@ -239,23 +237,12 @@ func (t *trackingService) LoadTracking(trackingId uuid.UUID, replay bool) (*Trac return t.tracking, nil } -func (t *trackingService) DeleteTracking(trackingId uuid.UUID) { +func (t *TrackingService) DeleteTracking(trackingId uuid.UUID) { panic("implement me") } -func (t *trackingService) StartReplay() { - panic("implement me") -} - -func (t *trackingService) PauseReplay() { - panic("implement me") -} - -func (t *trackingService) StopReplay() { - panic("implement me") -} - -func (t *trackingService) safelyReplaceTracking(tr Tracking) { +// helper function to replace tracking held by service. makes sure to keep the pointer and change only underlying data. +func (t *TrackingService) safelyReplaceTracking(tr Tracking) { t.recSem.Acquire(context.Background(), 1) *t.tracking = tr t.recSem.Release(1) diff --git a/example_config.yml b/example_config.yml index 872032e..b1ae30e 100644 --- a/example_config.yml +++ b/example_config.yml @@ -10,11 +10,6 @@ collectors: porttcp: ":3010" portserial: "/dev/tty.usbmodem14201" -# processing pipeline configurations -pipeline: - publishIntervalMs: 50 - syncUpdateIntervalMs: 494 - debuglevel: "INFO" #// ErrorLevel level. Logs. Used for errors that should definitely be noted. @@ -29,4 +24,4 @@ debuglevel: "INFO" #InfoLevel #// DebugLevel level. Usually only enabled when debugging. Very verbose logging. -#DebugLevel \ No newline at end of file +#DebugLevel diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..e69de29 diff --git a/storage/kvstore.go b/storage/kvstore.go index 8677e19..0778f41 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -18,6 +18,7 @@ type badgerStore struct { sensordatDb *badger.DB } +// Returns a badgerDB K/V Store instance. Opens database if exist or creates a new one in working directory func NewRepository(c *core.Configuration) *badgerStore { dir, _ := os.Getwd() logrus.Debug(dir) @@ -104,6 +105,7 @@ func (r *badgerStore) Save(tr core.Tracking) error { return nil } +// Retrieves all existing Trackings. Only Metadata. If you want actual data of tracking, load a specific one with Load(uuid) func (r *badgerStore) LoadAll() ([]core.TrackingMetadata, error) { var result []core.TrackingMetadata err := r.trackingsDb.View(func(txn *badger.Txn) error { @@ -132,6 +134,7 @@ func (r *badgerStore) LoadAll() ([]core.TrackingMetadata, error) { return result, nil } +// Retrieves all data of a tracking from disk func (r *badgerStore) Load(id uuid.UUID) (*core.Tracking, error) { logrus.Debugln("try to load from db...", id) if ok := r.isDbClosed(); ok { @@ -190,11 +193,10 @@ func (r *badgerStore) Load(id uuid.UUID) (*core.Tracking, error) { logrus.Error(err) } - // implement retrieval of raw data only if needed - return t, nil } +// helper function to create []byte key from sensordata element func createRecordKey(uid uuid.UUID, source core.SourceId, timestamp time.Time) []byte { prefix := []byte(uid.String()) var i string @@ -211,17 +213,17 @@ func createRecordKey(uid uuid.UUID, source core.SourceId, timestamp time.Time) [ logrus.Errorln("unable to create key", err) } logrus.Traceln("save as:", string(prefix), string(middle), string(suffix)) - //binary.BigEndian.PutUint64(suffix, uint64(timestamp.UnixNano())) ret := append(prefix, middle...) return append(ret, suffix...) } +// helper function to split []byte key back to actual data func unmarshalDataKey(key []byte) (uuid.UUID, core.SourceId, time.Time) { - //logrus.Trace("key len ->", len(key)) + prefix := string(key[:36]) suffix := string(key[37:]) middle := string(key[36:37]) - //logrus.Traceln("load as:", prefix, middle, suffix) + var source core.SourceId switch middle { case "1": @@ -237,8 +239,6 @@ func unmarshalDataKey(key []byte) (uuid.UUID, core.SourceId, time.Time) { if err != nil { logrus.Errorln("corrupted key", err) } - //logrus.Traceln(uid, timestamp) - //timestamp := time.Unix(0, int64(binary.BigEndian.Uint64(suffix))) return uid, source, timestamp } diff --git a/web/http.go b/web/http.go index cfd70dd..29dc37d 100644 --- a/web/http.go +++ b/web/http.go @@ -10,7 +10,7 @@ import ( "github.com/sirupsen/logrus" ) -func CreateServer(s core.Service, sub core.Subscriber, c *core.Configuration) { +func CreateServer(s *core.TrackingService, sub core.Subscriber, c *core.Configuration) { app := fiber.New(fiber.Config{ Views: fiberTemplateEngine(c), }) @@ -25,29 +25,17 @@ func CreateServer(s core.Service, sub core.Subscriber, c *core.Configuration) { // Tracking persistence controls HTTP JSON RPC API trackings := app.Group("/trackings") - trackings.Get("/", allTrackingsHandler(s, c)) // Get all trackings Metadata - trackings.Post("/", startPipelineHandler(s, c)) // Initialize new tracking, open websocket and prepare for automatic recording. Toggle ?serial=true and ?tcp=true. Returns trackingId - trackings.Patch("/", startRecordingHandler(s, c)) // Starts recording - trackings.Put("/", stopRecordingHandler(s, c)) // Stops current recording. Returns trackingId if record was successful - trackings.Delete("/", stopAllHandler(s, c)) // Stops websocket connection, pipelines and collectors - + trackings.Get("/", allTrackingsHandler(s, c)) // Get all trackings Metadata + trackings.Post("/", startPipelineHandler(s, c)) // Initialize new tracking, open websocket and prepare for automatic recording. Toggle ?serial=true and ?tcp=true. Returns trackingId + trackings.Patch("/", startRecordingHandler(s, c)) // Starts recording + trackings.Put("/", stopRecordingHandler(s, c)) // Stops current recording. Returns trackingId if record was successful + trackings.Delete("/", stopAllHandler(s, c)) // Stops websocket connection, pipelines and collectors trackings.Get("/:trackingId", LoadTrackingHandler(s, c)) // Gets Tracking Metadata and loads sensorRecords from storage. - //trackings.Delete("/:trackingId", stubhander()) // Deletes Tracking from storage - // - //trackings.Post("/current", stubhander()) // Starts Replay. - //trackings.Patch("/current", stubhander()) // Pauses Replay. - //trackings.Put("/current", stubhander()) // Stops Replay. logrus.Fatal(app.Listen(c.Webserver.Port)) } -//func stubhander() fiber.Handler { -// return func(ctx *fiber.Ctx) error { -// return nil -// } -//} - -func LoadTrackingHandler(s core.Service, c *core.Configuration) fiber.Handler { +func LoadTrackingHandler(s *core.TrackingService, c *core.Configuration) fiber.Handler { return func(ctx *fiber.Ctx) error { trackId := ctx.Params("trackingId") uid, err := uuid.Parse(trackId) @@ -88,18 +76,14 @@ func LoadTrackingHandler(s core.Service, c *core.Configuration) fiber.Handler { return nil } } -func allTrackingsHandler(s core.Service, c *core.Configuration) fiber.Handler { +func allTrackingsHandler(s *core.TrackingService, c *core.Configuration) fiber.Handler { return func(ctx *fiber.Ctx) error { trackings, err := s.AllTrackings() - if err != nil { - //ctx.Status(500).JSON(err) - //return err - } + prepres := make(map[string]interface{}) prepres["data"] = trackings if err != nil { prepres["error"] = err.Error() - } err2 := ctx.JSON(prepres) if err2 != nil { @@ -110,7 +94,7 @@ func allTrackingsHandler(s core.Service, c *core.Configuration) fiber.Handler { } } -func startPipelineHandler(s core.Service, c *core.Configuration) fiber.Handler { +func startPipelineHandler(s *core.TrackingService, c *core.Configuration) fiber.Handler { return func(ctx *fiber.Ctx) error { var collecs []core.CollectorType ser := ctx.Query("serial", "true") @@ -125,10 +109,7 @@ func startPipelineHandler(s core.Service, c *core.Configuration) fiber.Handler { collecs = append(collecs, core.TCP) } res, err := s.StartPipeline(collecs...) - if err != nil { - //ctx.Status(500).JSON(err) - //return err - } + prepres := make(map[string]interface{}) prepres["tracking_state"] = res prepres["data"] = collecs @@ -148,13 +129,10 @@ func startPipelineHandler(s core.Service, c *core.Configuration) fiber.Handler { } } -func startRecordingHandler(s core.Service, c *core.Configuration) fiber.Handler { +func startRecordingHandler(s *core.TrackingService, c *core.Configuration) fiber.Handler { return func(ctx *fiber.Ctx) error { rec, err := s.StartRecord() - if err != nil { - //ctx.Status(500).JSON(err) - //return err - } + prepres := make(map[string]interface{}) prepres["tracking_state"] = "RECORD" prepres["data"] = rec @@ -171,13 +149,10 @@ func startRecordingHandler(s core.Service, c *core.Configuration) fiber.Handler } } -func stopRecordingHandler(s core.Service, c *core.Configuration) fiber.Handler { +func stopRecordingHandler(s *core.TrackingService, c *core.Configuration) fiber.Handler { return func(ctx *fiber.Ctx) error { rec, err := s.StopRecord() - if err != nil { - //ctx.Status(500).JSON(err) - //return err - } + prepres := make(map[string]interface{}) prepres["tracking_state"] = "LIVE" prepres["data"] = rec @@ -194,13 +169,10 @@ func stopRecordingHandler(s core.Service, c *core.Configuration) fiber.Handler { } } -func stopAllHandler(s core.Service, c *core.Configuration) fiber.Handler { +func stopAllHandler(s *core.TrackingService, c *core.Configuration) fiber.Handler { return func(ctx *fiber.Ctx) error { rec, err := s.StopAll() - if err != nil { - //ctx.Status(500).JSON(err) - //return err - } + prepres := make(map[string]interface{}) prepres["tracking_state"] = "STOPPED" prepres["data"] = rec