package core import ( "errors" "github.com/google/uuid" "github.com/sirupsen/logrus" "time" ) type OpMode uint8 const ( STOPPED OpMode = iota LIVE RECORDING REPLAY ) type Service interface { AllTrackings() ([]TrackingMetadata, error) StartPipeline(cols ...CollectorType) (string, error) StartRecord() (string, error) StopRecord() (*TrackingMetadata, error) StopAll() (*TrackingMetadata, error) LoadTracking(trackingId uuid.UUID) DeleteTracking(trackingId uuid.UUID) StartReplay() PauseReplay() StopReplay() } type trackingService struct { current *Tracking config *Configuration pipe *pipeline repo Repo opMode OpMode collectors []Collector } func TrackingService(r Repo, d Publisher, c *Configuration) *trackingService { t := &Tracking{} ts := &trackingService{ current: t, opMode: STOPPED, config: c, repo: r, pipe: NewPipeline(d, t, c), collectors: nil, } //ts.pipe.Run() return ts } func (t *trackingService) StartPipeline(cols ...CollectorType) (string, error) { logrus.Info("SERVICE: NEW SETUP") if t.opMode == RECORDING { txt := "trackingservice: please stop recording before resetting pipeline" logrus.Warn(txt) return "RECORDING", errors.New(txt) } if t.opMode == LIVE { logrus.Warn("trackingservice: stop current running stream/record before creating new one") t.StopAll() //time.Sleep(1000 * time.Millisecond) } logrus.Debug("new tracking:", cols) t.opMode = LIVE t.collectors = nil for _, col := range cols { t.collectors = append(t.collectors, NewCollector(col, t.pipe, t.config)) } t.safelyReplaceTracking(emptyTracking()) t.current.Collectors = cols for _, e := range t.collectors { e.Collect() } t.pipe.Run() //time.Sleep(3 * time.Second) return "LIVE", nil } func (t *trackingService) AllTrackings() ([]TrackingMetadata, error) { panic("implement me") return nil, nil } func (t *trackingService) StartRecord() (string, error) { logrus.Info("SERVICE: START RECORD") if t.opMode != LIVE { if t.opMode == RECORDING { txt := "trackingservice: already recording" logrus.Warn(txt) return "record already running since: " + t.current.TimeCreated.String(), errors.New(txt) } else { txt := "trackingservice: start collector pipeline to record data" logrus.Warn(txt) return "record already running since: " + t.current.TimeCreated.String(), errors.New(txt) } } t.opMode = RECORDING t.current.TimeCreated = time.Now() t.pipe.Record() return "record started at: " + t.current.TimeCreated.String(), nil } func (t *trackingService) StopRecord() (*TrackingMetadata, error) { logrus.Info("SERVICE: STOP RECORD") if t.opMode != RECORDING { txt := "trackingservice: couldn't stop. not recording" logrus.Info(txt) return nil, errors.New(txt) } t.pipe.StopRecord() mRec.Lock() mRaw.Lock() err := t.repo.Save(*t.current) mRaw.Unlock() mRec.Unlock() if err != nil { logrus.Error(err) } t.opMode = LIVE //time.Sleep(20 * time.Millisecond) tm := t.current.TrackingMetadata t.safelyReplaceTracking(emptyTracking()) t.current.Collectors = tm.Collectors return &tm, err } func (t *trackingService) StopAll() (*TrackingMetadata, error) { logrus.Info("SERVICE: STOP ALL") var tm *TrackingMetadata = nil var err error t.pipe.Close() for _, e := range t.collectors { e.Close() } if t.opMode == RECORDING { logrus.Warn("trackingservice: stop recording gracefully") tm, err = t.StopRecord() } t.opMode = STOPPED return tm, err } func (t *trackingService) LoadTracking(trackingId uuid.UUID) { panic("implement me") } func (t *trackingService) DeleteTracking(trackingId uuid.UUID) { panic("implement me") } func (t *trackingService) StartReplay() { panic("implement me") } func (t *trackingService) PauseReplay() { panic("implement me") } func (t *trackingService) StopReplay() { panic("implement me") } func (t *trackingService) safelyReplaceTracking(tr Tracking) { mRec.Lock() mRaw.Lock() *t.current = tr mRaw.Unlock() mRec.Unlock() }