gyrogpsc/core/service.go
2021-01-15 17:37:24 +01:00

280 lines
7.4 KiB
Go

package core
import (
"context"
"errors"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
"sync"
"time"
)
// indicates state of the TrackingService
type OpMode uint8
const (
STOPPED OpMode = iota
LIVE
RECORDING
REPLAY
)
// structs implementing this interface are expected to cache sensor data passed by Put(data) while recording and dropping data if not recording
type Tracker interface {
Put(data SensorData)
Recorder
}
type Recorder interface {
SetRecording(s bool) (ok bool)
IsRecording() bool
}
// this struct holds all relevant references for the TrackingService
type TrackingService struct {
opMode OpMode
tracking *Tracking
//pipeline *pipelineRecord
replaypipe *pipelineReplay
collectors []Collector
store Storer
publisher Publisher
config *Configuration
recSem *semaphore.Weighted
mu *sync.RWMutex
}
// constructor
func NewTrackingService(c *Configuration, s Storer, p Publisher) *TrackingService {
t := &Tracking{}
ts := &TrackingService{
tracking: t,
opMode: STOPPED,
collectors: nil,
recSem: semaphore.NewWeighted(1),
mu: &sync.RWMutex{},
config: c,
store: s,
publisher: p,
}
// first call to to open tcp port. makes app ready to accept connections
NewCollector(c, TCP)
return ts
}
// caches sensordata while recording
func (t *TrackingService) Put(data SensorData) {
if !t.IsRecording() {
return
}
t.mu.Lock()
t.tracking.Data = append(t.tracking.Data, data)
t.tracking.Size++
logrus.Traceln("raw data points: len->", len(t.tracking.Data))
t.mu.Unlock()
}
// changes recording state of service
func (t *TrackingService) SetRecording(s bool) (ok bool) {
if okay := t.recSem.TryAcquire(1); okay && s {
// if i want to turn on and can get semaphore then return success
return true
} else if !s && !okay {
// if i want to turn off and cant get semaphore, i can safely turn off by releasing semaphore and return success
t.recSem.Release(1)
return true
}
return false
}
func (t *TrackingService) IsRecording() bool {
if t.recSem.TryAcquire(1) {
t.recSem.Release(1)
return false
}
return true
}
// creates a new Pipeline with requested collectors
func (t *TrackingService) StartLivetracking(cols ...CollectorType) (string, error) {
logrus.Info("SERVICE: NEW PIPELINE")
// check if state machine is in right state
if t.opMode == RECORDING {
txt := "trackingservice: please stop recording before resetting pipeline"
logrus.Warn(txt)
return "RECORDING", errors.New(txt)
}
if t.opMode == REPLAY {
t.StopAll()
}
if t.opMode == LIVE {
txt := "trackingservice: stop tracking running stream before creating new one"
logrus.Warnln(txt)
return "record already running since: " + t.tracking.TimeCreated.String(), errors.New(txt)
}
logrus.Debugln("new tracking:", cols)
t.opMode = LIVE
// create and start collectors
t.collectors = nil
var tcp, ser chan interface{}
for _, col := range cols {
c := NewCollector(t.config, col)
t.collectors = append(t.collectors, c)
if col == TCP {
tcp = c.OutChannel()
}
if col == SERIAL {
ser = c.OutChannel()
}
c.Collect()
}
t.safelyReplaceTracking(newTracking())
t.tracking.Collectors = cols
// finally create pipeline
NewRecordPipeline(t.publisher, t, tcp, ser)
t.publisher.SetStreaming(true)
logrus.Debugln("current State:", t.opMode)
return "LIVE", nil
}
// retrieves all trackings. metadata only.
func (t *TrackingService) AllTrackings() ([]TrackingMetadata, error) {
logrus.Info("SERVICE: GET ALL TRACKINGS")
data, err := t.store.LoadAll()
logrus.Debugln("current State:", t.opMode)
return data, err
}
// starts recording and returns state
func (t *TrackingService) StartRecord() (string, error) {
logrus.Info("SERVICE: START RECORD")
if t.opMode != LIVE {
if t.opMode == RECORDING {
txt := "trackingservice: already recording"
logrus.Warn(txt)
return "record already running since: " + t.tracking.TimeCreated.String(), errors.New(txt)
} else {
txt := "trackingservice: start collector pipeline to record data"
logrus.Warn(txt)
return "record already running since: " + t.tracking.TimeCreated.String(), errors.New(txt)
}
}
t.opMode = RECORDING
t.tracking.TimeCreated = time.Now()
t.SetRecording(true)
logrus.Debugln("current State:", t.opMode)
return "record started at: " + t.tracking.TimeCreated.String(), nil
}
// stops recording and returns metadata of current tracking if successfully stopped
func (t *TrackingService) StopRecord() (*TrackingMetadata, error) {
logrus.Info("SERVICE: STOP RECORD")
if t.opMode != RECORDING {
txt := "trackingservice: couldn't stop. not recording"
logrus.Info(txt)
return nil, errors.New(txt)
}
t.SetRecording(false)
err := t.store.Save(*t.tracking)
if err != nil {
logrus.Error(err)
}
t.opMode = LIVE
tm := t.tracking.TrackingMetadata
t.safelyReplaceTracking(newTracking())
t.tracking.Collectors = tm.Collectors
logrus.Debugln("current State:", t.opMode)
return &tm, err
}
// stops live tracking and recording. if theres no active recording, no metadata will be returned
func (t *TrackingService) StopAll() (*TrackingMetadata, error) {
logrus.Info("SERVICE: STOP ALL")
var tm *TrackingMetadata = nil
var err error
for _, e := range t.collectors {
e.Stop()
}
if t.replaypipe != nil {
t.replaypipe.Stop()
t.replaypipe = nil
}
// let buffer run empty after collectors stopped
time.Sleep(time.Millisecond * 5)
t.publisher.SetStreaming(false)
if t.opMode == RECORDING {
logrus.Info("trackingservice: gracefully stop recording ")
tm, err = t.StopRecord()
}
t.opMode = STOPPED
logrus.Debugln("current State:", t.opMode)
return tm, err
}
// retrieves tracking with all data and starts replay pipeline if desired.
// in that case the application behaves like in live mode.
func (t *TrackingService) LoadTracking(trackingId uuid.UUID, replay bool) (*Tracking, error) {
logrus.Info("SERVICE: LOAD TRACKING")
if t.opMode == RECORDING {
txt := "trackingservice: please stop recording before load another tracking"
logrus.Warn(txt)
return nil, errors.New(txt)
}
if t.opMode == REPLAY || t.opMode == LIVE {
t.StopAll()
}
logrus.Info("LOAD TRACKING from database")
tracking, err := t.store.Load(trackingId)
fixSpeedValues(tracking)
if err != nil {
return nil, err
}
if replay == false {
return tracking, nil
}
t.safelyReplaceTracking(*tracking)
if t.replaypipe != nil {
select {
case <-t.replaypipe.stopChan:
logrus.Warnln("blocking channel closed")
default:
}
t.replaypipe = nil
}
t.replaypipe = NewReplayPipeline(t.publisher, t.tracking)
t.publisher.SetStreaming(true)
t.opMode = REPLAY
logrus.Debugln("current State:", t.opMode)
return t.tracking, nil
}
func (t *TrackingService) DeleteTracking(trackingId uuid.UUID) {
panic("implement me")
}
// helper function to replace tracking held by service. makes sure to keep the pointer and change only underlying data.
func (t *TrackingService) safelyReplaceTracking(tr Tracking) {
t.recSem.Acquire(context.Background(), 1)
*t.tracking = tr
t.recSem.Release(1)
}
// helper function to fixSpeedValues wrong values recorded before 12/20/2020
func fixSpeedValues(tracking *Tracking) {
logrus.Debugln("fixing speed values")
for i := 0; i < len(tracking.Data); i++ {
if tracking.Data[i].Servertime.Before(time.Unix(1608422400, 0)) && tracking.Data[i].Source() == SOURCE_SERIAL && tracking.Data[i].Speed != 0 {
tracking.Data[i].Speed = tracking.Data[i].Speed * 3.6
}
}
}