gyrogpsc/core/service.go
2020-12-13 03:23:18 +01:00

177 lines
4.0 KiB
Go

package core
import (
"errors"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
"time"
)
type OpMode uint8
const (
STOPPED OpMode = iota
LIVE
RECORDING
REPLAY
)
type Service interface {
AllTrackings() ([]TrackingMetadata, error)
StartPipeline(cols ...CollectorType) (string, error)
StartRecord() (string, error)
StopRecord() (*TrackingMetadata, error)
StopAll() (*TrackingMetadata, error)
LoadTracking(trackingId uuid.UUID)
DeleteTracking(trackingId uuid.UUID)
StartReplay()
PauseReplay()
StopReplay()
}
type trackingService struct {
current *Tracking
config *Configuration
pipe *pipeline
repo Repo
opMode OpMode
collectors []Collector
}
func TrackingService(r Repo, d Publisher, c *Configuration) *trackingService {
t := &Tracking{}
ts := &trackingService{
current: t,
opMode: STOPPED,
config: c,
repo: r,
pipe: NewPipeline(d, t, c),
collectors: nil,
}
//ts.pipe.Run()
return ts
}
func (t *trackingService) StartPipeline(cols ...CollectorType) (string, error) {
logrus.Info("SERVICE: NEW SETUP")
if t.opMode == RECORDING {
txt := "trackingservice: please stop recording before resetting pipeline"
logrus.Warn(txt)
return "RECORDING", errors.New(txt)
}
if t.opMode == LIVE {
logrus.Warn("trackingservice: stop current running stream/record before creating new one")
t.StopAll()
//time.Sleep(1000 * time.Millisecond)
}
logrus.Debug("new tracking:", cols)
t.opMode = LIVE
t.collectors = nil
for _, col := range cols {
t.collectors = append(t.collectors, NewCollector(col, t.pipe, t.config))
}
t.safelyReplaceTracking(emptyTracking())
t.current.Collectors = cols
for _, e := range t.collectors {
e.Collect()
}
t.pipe.Run()
//time.Sleep(3 * time.Second)
return "LIVE", nil
}
func (t *trackingService) AllTrackings() ([]TrackingMetadata, error) {
logrus.Info("SERVICE: GET ALL TRACKINGS")
data, err := t.repo.LoadAll()
return data, err
}
func (t *trackingService) StartRecord() (string, error) {
logrus.Info("SERVICE: START RECORD")
if t.opMode != LIVE {
if t.opMode == RECORDING {
txt := "trackingservice: already recording"
logrus.Warn(txt)
return "record already running since: " + t.current.TimeCreated.String(), errors.New(txt)
} else {
txt := "trackingservice: start collector pipeline to record data"
logrus.Warn(txt)
return "record already running since: " + t.current.TimeCreated.String(), errors.New(txt)
}
}
t.opMode = RECORDING
t.current.TimeCreated = time.Now()
t.pipe.Record()
return "record started at: " + t.current.TimeCreated.String(), nil
}
func (t *trackingService) StopRecord() (*TrackingMetadata, error) {
logrus.Info("SERVICE: STOP RECORD")
if t.opMode != RECORDING {
txt := "trackingservice: couldn't stop. not recording"
logrus.Info(txt)
return nil, errors.New(txt)
}
t.pipe.StopRecord()
mRec.Lock()
mRaw.Lock()
err := t.repo.Save(*t.current)
mRaw.Unlock()
mRec.Unlock()
if err != nil {
logrus.Error(err)
}
t.opMode = LIVE
//time.Sleep(20 * time.Millisecond)
tm := t.current.TrackingMetadata
t.safelyReplaceTracking(emptyTracking())
t.current.Collectors = tm.Collectors
return &tm, err
}
func (t *trackingService) StopAll() (*TrackingMetadata, error) {
logrus.Info("SERVICE: STOP ALL")
var tm *TrackingMetadata = nil
var err error
t.pipe.Close()
for _, e := range t.collectors {
e.Close()
}
if t.opMode == RECORDING {
logrus.Warn("trackingservice: stop recording gracefully")
tm, err = t.StopRecord()
}
t.opMode = STOPPED
return tm, err
}
func (t *trackingService) LoadTracking(trackingId uuid.UUID) {
panic("implement me")
}
func (t *trackingService) DeleteTracking(trackingId uuid.UUID) {
panic("implement me")
}
func (t *trackingService) StartReplay() {
panic("implement me")
}
func (t *trackingService) PauseReplay() {
panic("implement me")
}
func (t *trackingService) StopReplay() {
panic("implement me")
}
func (t *trackingService) safelyReplaceTracking(tr Tracking) {
mRec.Lock()
mRaw.Lock()
*t.current = tr
mRaw.Unlock()
mRec.Unlock()
}