From 1ea68222029e831316f57de6ca72acde740387a7 Mon Sep 17 00:00:00 2001 From: Timo Volkmann Date: Sat, 12 Dec 2020 03:23:00 +0100 Subject: [PATCH] implemented automatic persistence for trackings --- cmd/autostart/autostart.go | 102 ++++++------- cmd/server/server.go | 58 +++++--- core/collectors.go | 293 +++++++++++++++++++------------------ core/service.go | 187 ++++++++++++----------- 4 files changed, 332 insertions(+), 308 deletions(-) diff --git a/cmd/autostart/autostart.go b/cmd/autostart/autostart.go index 8be4e32..6d57603 100644 --- a/cmd/autostart/autostart.go +++ b/cmd/autostart/autostart.go @@ -1,66 +1,68 @@ package main import ( - "git.timovolkmann.de/gyrogpsc/core" - "git.timovolkmann.de/gyrogpsc/storage" - "git.timovolkmann.de/gyrogpsc/web" - "github.com/sirupsen/logrus" - "github.com/spf13/viper" - "time" + "git.timovolkmann.de/gyrogpsc/core" + "git.timovolkmann.de/gyrogpsc/storage" + "git.timovolkmann.de/gyrogpsc/web" + "github.com/sirupsen/logrus" + "github.com/spf13/viper" + "os" + "time" ) func main() { - conf := configurationFromFile() + conf := configurationFromFile() - repo := storage.NewRepository(conf) - disp := core.NewDispatcher() + repo := storage.NewRepository(conf) + disp := core.NewDispatcher() - service := core.TrackingService(repo, disp, conf) + service := core.TrackingService(repo, disp, conf) - go func() { - service.NewSetup(core.TCP, core.SERIAL) - time.Sleep(5 * time.Second) - service.StartRecord() - time.Sleep(5 * time.Second) - service.StopRecord() - time.Sleep(5 * time.Second) - service.NewSetup(core.TCP) - time.Sleep(5 * time.Second) - service.StartRecord() - time.Sleep(5 * time.Second) - service.StopRecord() - time.Sleep(5 * time.Second) - service.StopAll() - }() + go func() { + service.NewSetup(core.TCP, core.SERIAL) + time.Sleep(5 * time.Second) + service.StartRecord() + time.Sleep(5 * time.Second) + service.StopRecord() + time.Sleep(5 * time.Second) + service.NewSetup(core.TCP, core.SERIAL) + time.Sleep(5 * time.Second) + service.StartRecord() + time.Sleep(120 * time.Second) + service.StopRecord() + time.Sleep(5 * time.Second) + service.StopAll() + os.Exit(0) + }() - web.CreateServer(service, disp, conf) + web.CreateServer(service, disp, conf) } func configurationFromFile() *core.Configuration { - viper.SetDefault("collectors.porttcp", ":3010") - viper.SetDefault("collectors.portserial", "/dev/tty.usbmodem14201") - viper.SetDefault("webserver.port", ":3011") - viper.SetDefault("pipeline.publishIntervalMs", 50) - viper.SetDefault("pipeline.syncUpdateIntervalMs", 494) - viper.SetDefault("debuglevel", "INFO") + viper.SetDefault("collectors.porttcp", ":3010") + viper.SetDefault("collectors.portserial", "/dev/tty.usbmodem14201") + viper.SetDefault("webserver.port", ":3011") + viper.SetDefault("pipeline.publishIntervalMs", 50) + viper.SetDefault("pipeline.syncUpdateIntervalMs", 494) + viper.SetDefault("debuglevel", "INFO") - viper.SetConfigName("gpsconfig") // name of config file (without extension) - viper.SetConfigType("yaml") - viper.AddConfigPath(".") - viper.AddConfigPath("./../../") - if err := viper.ReadInConfig(); err != nil { - logrus.Warn("couldn't find config file. using standard configuration") - } + viper.SetConfigName("gpsconfig") // name of config file (without extension) + viper.SetConfigType("yaml") + viper.AddConfigPath(".") + viper.AddConfigPath("./../../") + if err := viper.ReadInConfig(); err != nil { + logrus.Warn("couldn't find config file. using standard configuration") + } - c := core.Configuration{} - if err := viper.Unmarshal(&c); err != nil { - logrus.Debug("couldn't load config...") - logrus.Error(err) - } - lvl, err := logrus.ParseLevel(c.Debuglevel) - if err != nil { - logrus.Error(err) - } - logrus.SetLevel(lvl) - return &c + c := core.Configuration{} + if err := viper.Unmarshal(&c); err != nil { + logrus.Debug("couldn't load config...") + logrus.Error(err) + } + lvl, err := logrus.ParseLevel(c.Debuglevel) + if err != nil { + logrus.Error(err) + } + logrus.SetLevel(lvl) + return &c } diff --git a/cmd/server/server.go b/cmd/server/server.go index b7ebf58..f69458e 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -1,37 +1,49 @@ package main import ( - "git.timovolkmann.de/gyrogpsc/core" - "git.timovolkmann.de/gyrogpsc/storage" - "git.timovolkmann.de/gyrogpsc/web" - "github.com/sirupsen/logrus" - "github.com/spf13/viper" + "git.timovolkmann.de/gyrogpsc/core" + "git.timovolkmann.de/gyrogpsc/storage" + "git.timovolkmann.de/gyrogpsc/web" + "github.com/sirupsen/logrus" + "github.com/spf13/viper" ) func main() { - conf := &core.Configuration{} - configurationFromFile(conf) + conf := configurationFromFile() - repo := storage.NewRepository(conf) - disp := core.NewDispatcher() + repo := storage.NewRepository(conf) + disp := core.NewDispatcher() - service := core.TrackingService(repo, disp, conf) + service := core.TrackingService(repo, disp, conf) - web.CreateServer(service, disp, conf) + web.CreateServer(service, disp, conf) } -func configurationFromFile(c *core.Configuration) { - viper.SetDefault("TcpCollectorPort", ":3010") - viper.SetDefault("SerialCollectorPort", "/dev/tty.usbmodem14201") - viper.SetDefault("HttpPort", "layouts") - viper.SetDefault("publishIntervalMs", 50) - viper.SetDefault("syncUpdateIntervalMs", 494) +func configurationFromFile() *core.Configuration { + viper.SetDefault("collectors.porttcp", ":3010") + viper.SetDefault("collectors.portserial", "/dev/tty.usbmodem14201") + viper.SetDefault("webserver.port", ":3011") + viper.SetDefault("pipeline.publishIntervalMs", 50) + viper.SetDefault("pipeline.syncUpdateIntervalMs", 494) + viper.SetDefault("debuglevel", "INFO") - viper.SetConfigName("gpsconfig") // name of config file (without extension) - viper.SetConfigType("yaml") - viper.AddConfigPath(".") - viper.AddConfigPath("./../../") + viper.SetConfigName("gpsconfig") // name of config file (without extension) + viper.SetConfigType("yaml") + viper.AddConfigPath(".") + viper.AddConfigPath("./../../") + if err := viper.ReadInConfig(); err != nil { + logrus.Warn("couldn't find config file. using standard configuration") + } - viper.Unmarshal(c) - logrus.Println(c) + c := core.Configuration{} + if err := viper.Unmarshal(&c); err != nil { + logrus.Debug("couldn't load config...") + logrus.Error(err) + } + lvl, err := logrus.ParseLevel(c.Debuglevel) + if err != nil { + logrus.Error(err) + } + logrus.SetLevel(lvl) + return &c } diff --git a/core/collectors.go b/core/collectors.go index cddf112..f85be11 100644 --- a/core/collectors.go +++ b/core/collectors.go @@ -1,199 +1,210 @@ package core import ( - "fmt" - "git.timovolkmann.de/gyrogpsc/ublox" - "github.com/sirupsen/logrus" - "go.bug.st/serial" - "net" - "os" - "sync" + "fmt" + "git.timovolkmann.de/gyrogpsc/ublox" + "github.com/sirupsen/logrus" + "go.bug.st/serial" + "net" + "os" + "sync" + "time" ) type Collector interface { - Collect() - Close() + Collect() + Close() } type CollectorType uint8 const ( - SERIAL CollectorType = iota - TCP + SERIAL CollectorType = iota + TCP ) var tcpSingleton *tcpCollector func NewCollector(typ CollectorType, proc Pusher, config *Configuration) Collector { - var coll Collector - switch typ { - case SERIAL: - coll = newSerial(proc, config) - case TCP: - if tcpSingleton == nil { - tcpSingleton = newTcp(proc, config) - } else { - tcpSingleton.SetProcessor(proc) - } - coll = tcpSingleton - default: - panic("selected collector type not implemented") - } - return coll + var coll Collector + switch typ { + case SERIAL: + coll = newSerial(proc, config) + case TCP: + if tcpSingleton == nil { + tcpSingleton = newTcp(proc, config) + } else { + tcpSingleton.SetProcessor(proc) + } + coll = tcpSingleton + default: + panic("selected collector type not implemented") + } + return coll } type serialCollector struct { - active bool - proc Pusher - config *Configuration - mu sync.RWMutex + active bool + proc Pusher + config *Configuration + mu sync.RWMutex } func (s *serialCollector) isSerialCollActive() bool { - s.mu.RLock() - defer s.mu.RUnlock() - return s.active + s.mu.RLock() + defer s.mu.RUnlock() + return s.active } - func (s *serialCollector) Collect() { - s.mu.Lock() - s.active = true - s.mu.Unlock() - go func() { - logrus.Println("start serial collector") - mode := &serial.Mode{ - BaudRate: 115200, - } - port, err := serial.Open(s.config.Collectors.SerialCollectorPort, mode) - if err != nil { - logrus.Fatalln("can't open serial port:", err.Error()) - } - defer port.Close() + s.mu.Lock() + s.active = true + s.mu.Unlock() + go func() { + logrus.Println("start serial collector") + mode := &serial.Mode{ + BaudRate: 115200, + } + port, err := serial.Open(s.config.Collectors.SerialCollectorPort, mode) + if err != nil { + logrus.Warn("can't open serial port:", err) + //if e, ok := err.(serial.PortError); ok && e.Code() == serial.PortBusy { + for i := 3; i < 20; i = i + 2 { + logrus.Warnf("try again in -> %vms", i*i) + time.Sleep(time.Millisecond * time.Duration(i*i)) + port, err = serial.Open(s.config.Collectors.SerialCollectorPort, mode) + if err == nil { + break + } + } + if err != nil { + logrus.Fatal(err) + } + //} + } + defer port.Close() - decoder := ublox.NewDecoder(port) + decoder := ublox.NewDecoder(port) - for s.isSerialCollActive() { - meas, err := decoder.Decode() - if err != nil { - if err.Error() == "NMEA not implemented" { - continue - } - logrus.Println("serial read err:", err) - break - } - sd, err := ConvertUbxToSensorData(meas) - if err != nil { - logrus.Println("convert err:", err, meas, sd) - continue - } - // skip irrelevant messages - if sd == nil { - continue - } + for s.isSerialCollActive() { + meas, err := decoder.Decode() + if err != nil { + if err.Error() == "NMEA not implemented" { + continue + } + logrus.Println("serial read err:", err) + break + } + sd, err := ConvertUbxToSensorData(meas) + if err != nil { + logrus.Println("convert err:", err, meas, sd) + continue + } + // skip irrelevant messages + if sd == nil { + continue + } - err = s.proc.Push(sd) - if err != nil { - logrus.Println("process err:", err, *sd) - continue - } - } - logrus.Println("serial collector stopped") - }() + err = s.proc.Push(sd) + if err != nil { + logrus.Println("process err:", err, *sd) + continue + } + } + logrus.Println("serial collector stopped") + }() } - - func (s *serialCollector) Close() { - s.mu.Lock() - s.active = false - s.mu.Unlock() + s.mu.Lock() + s.active = false + s.mu.Unlock() } func newSerial(proc Pusher, config *Configuration) *serialCollector { - return &serialCollector{ - active: false, - proc: proc, - config: config, - } + return &serialCollector{ + active: false, + proc: proc, + config: config, + } } type tcpCollector struct { - active bool - processor Pusher - //config *Configuration + active bool + processor Pusher + //config *Configuration } func (t *tcpCollector) Collect() { - t.active = true + t.active = true } func (t *tcpCollector) Close() { - t.active = false + t.active = false } func (t *tcpCollector) SetProcessor(p Pusher) { - t.processor = p + t.processor = p } func newTcp(proc Pusher, config *Configuration) *tcpCollector { - logrus.Println("start tcp collector") + logrus.Println("start tcp collector") - listener, err := net.Listen("tcp", config.Collectors.TcpCollectorPort) - if err != nil { - fmt.Println("Error listening:", err) - //os.Exit(1) - } - coll := &tcpCollector{ - active: false, - processor: proc, - } - go func() { - for { - // Listen for an incoming connection. - conn, err := listener.Accept() - if err != nil { - fmt.Println("Error accepting: ", err.Error()) - os.Exit(1) - } - logrus.Println("...new incoming tcp connection...") + listener, err := net.Listen("tcp", config.Collectors.TcpCollectorPort) + if err != nil { + fmt.Println("Error listening:", err) + //os.Exit(1) + } + coll := &tcpCollector{ + active: false, + processor: proc, + } + go func() { + for { + // Listen for an incoming connection. + conn, err := listener.Accept() + if err != nil { + fmt.Println("Error accepting: ", err.Error()) + os.Exit(1) + } + logrus.Println("...new incoming tcp connection...") - // Handle connections in a new goroutine. - go coll.jsonHandler(conn) - } - }() - return coll + // Handle connections in a new goroutine. + go coll.jsonHandler(conn) + } + }() + return coll } // handles incoming tcp connections with json payload. func (c *tcpCollector) jsonHandler(conn net.Conn) { - defer conn.Close() + defer conn.Close() - // TRY reader := bufio.NewReader(conn) OR NewScanner(conn) - buf := make([]byte, 2048) - for { - // Read the incoming connection into the buffer. - n, err := conn.Read(buf) - if err != nil { - fmt.Println("TCP error - reading from connection:", n, err.Error()) - break - } - //json := pretty.Pretty(buf[:n]) - //fmt.Println(string(json)) - //fmt.Println(string(buf[:n])) - sd, err := ConvertSensorDataPhone(buf[:n]) - if err != nil { - logrus.Println(err) - continue - } - if !c.active { - //time.Sleep(50 * time.Millisecond) - continue - } - err = c.processor.Push(sd) - if err != nil { - logrus.Fatalln(err) - } - } + // TRY reader := bufio.NewReader(conn) OR NewScanner(conn) + buf := make([]byte, 2048) + for { + // Read the incoming connection into the buffer. + n, err := conn.Read(buf) + if err != nil { + fmt.Println("TCP error - reading from connection:", n, err.Error()) + break + } + //json := pretty.Pretty(buf[:n]) + //fmt.Println(string(json)) + //fmt.Println(string(buf[:n])) + sd, err := ConvertSensorDataPhone(buf[:n]) + if err != nil { + logrus.Println(err) + continue + } + if !c.active { + //time.Sleep(50 * time.Millisecond) + continue + } + err = c.processor.Push(sd) + if err != nil { + logrus.Fatalln(err) + } + } } diff --git a/core/service.go b/core/service.go index d67c702..d650cd9 100644 --- a/core/service.go +++ b/core/service.go @@ -1,41 +1,41 @@ package core import ( - "github.com/google/uuid" - "github.com/sirupsen/logrus" - "time" + "github.com/google/uuid" + "github.com/sirupsen/logrus" + "time" ) type OpMode uint8 const ( - STOPPED OpMode = iota - LIVE - RECORDING - REPLAY + STOPPED OpMode = iota + LIVE + RECORDING + REPLAY ) type trackingService struct { - current *Tracking - config *Configuration - pipe *pipeline - repo Repo - opMode OpMode - collectors []Collector + current *Tracking + config *Configuration + pipe *pipeline + repo Repo + opMode OpMode + collectors []Collector } func TrackingService(r Repo, d Publisher, c *Configuration) *trackingService { - t := &Tracking{} - ts := &trackingService{ - current: t, - opMode: STOPPED, - config: c, - repo: r, - pipe: NewPipeline(d, t, c), - collectors: nil, - } - //ts.pipe.Run() - return ts + t := &Tracking{} + ts := &trackingService{ + current: t, + opMode: STOPPED, + config: c, + repo: r, + pipe: NewPipeline(d, t, c), + collectors: nil, + } + //ts.pipe.Run() + return ts } //const( @@ -43,107 +43,106 @@ func TrackingService(r Repo, d Publisher, c *Configuration) *trackingService { //) func (t *trackingService) AllTrackings() { - panic("implement me") + panic("implement me") } func (t *trackingService) NewSetup(cols ...CollectorType) { - logrus.Info("SERVICE: NEW SETUP") - if t.opMode == RECORDING { - logrus.Println("trackingservice: no reset while recording") - return - } - if t.opMode == LIVE { - logrus.Println("trackingservice: stop currently running setup before creating new one") - t.StopAll() - } - logrus.Debug("new tracking:", cols) - t.opMode = LIVE - t.collectors = nil - for _, col := range cols { - t.collectors = append(t.collectors, NewCollector(col, t.pipe, t.config)) - } - t.safelyReplaceTracking(emptyTracking()) - t.current.Collectors = cols - for _, e := range t.collectors { - e.Collect() - } - t.pipe.Run() - //time.Sleep(3 * time.Second) + logrus.Info("SERVICE: NEW SETUP") + if t.opMode == RECORDING { + logrus.Println("trackingservice: no reset while recording") + return + } + if t.opMode == LIVE { + logrus.Println("trackingservice: stop currently running setup before creating new one") + t.StopAll() + time.Sleep(20 * time.Millisecond) + } + logrus.Debug("new tracking:", cols) + t.opMode = LIVE + t.collectors = nil + for _, col := range cols { + t.collectors = append(t.collectors, NewCollector(col, t.pipe, t.config)) + } + t.safelyReplaceTracking(emptyTracking()) + t.current.Collectors = cols + for _, e := range t.collectors { + e.Collect() + } + t.pipe.Run() + //time.Sleep(3 * time.Second) } func (t *trackingService) StartRecord() { - logrus.Info("SERVICE: START RECORD") - if t.opMode != LIVE { - logrus.Println("trackingservice: wrong mode of operation") - return - } - t.opMode = RECORDING - t.current.TimeCreated = time.Now() - t.pipe.Record() + logrus.Info("SERVICE: START RECORD") + if t.opMode != LIVE { + logrus.Println("trackingservice: wrong mode of operation") + return + } + t.opMode = RECORDING + t.current.TimeCreated = time.Now() + t.pipe.Record() } func (t *trackingService) StopRecord() { - logrus.Info("SERVICE: STOP RECORD") - if t.opMode != RECORDING { - logrus.Println("trackingservice: couldn't stop. not recording") - return - } - t.opMode = LIVE - t.pipe.StopRecord() + logrus.Info("SERVICE: STOP RECORD") + if t.opMode != RECORDING { + logrus.Println("trackingservice: couldn't stop. not recording") + return + } + t.opMode = LIVE + t.pipe.StopRecord() - m1.Lock() - m2.Lock() - err := t.repo.Save(*t.current) - m2.Unlock() - m1.Unlock() + m1.Lock() + m2.Lock() + err := t.repo.Save(*t.current) + m2.Unlock() + m1.Unlock() - if err != nil { - logrus.Println(err) - } - t.safelyReplaceTracking(emptyTracking()) + if err != nil { + logrus.Println(err) + } + t.safelyReplaceTracking(emptyTracking()) } func (t *trackingService) StopAll() { - logrus.Info("SERVICE: STOP ALL") - if t.opMode == RECORDING { - logrus.Println("trackingservice: stop recording gracefully") - t.StopRecord() - } - t.opMode = STOPPED - t.pipe.Close() - for _, e := range t.collectors { - e.Close() - } - t.collectors = nil - t.safelyReplaceTracking(emptyTracking()) + logrus.Info("SERVICE: STOP ALL") + if t.opMode == RECORDING { + logrus.Println("trackingservice: stop recording gracefully") + t.StopRecord() + } + t.opMode = STOPPED + t.pipe.Close() + for _, e := range t.collectors { + e.Close() + } + t.collectors = nil + t.safelyReplaceTracking(emptyTracking()) } func (t *trackingService) DeleteTracking(trackingId uuid.UUID) { - panic("implement me") + panic("implement me") } func (t *trackingService) StartReplay() { - panic("implement me") + panic("implement me") } func (t *trackingService) PauseReplay() { - panic("implement me") + panic("implement me") } func (t *trackingService) StopReplay() { - panic("implement me") + panic("implement me") } func (t *trackingService) LoadTracking(trackingId uuid.UUID) { } - func (t *trackingService) safelyReplaceTracking(tr Tracking) { - m1.Lock() - m2.Lock() - *t.current = tr - m2.Unlock() - m1.Unlock() + m1.Lock() + m2.Lock() + *t.current = tr + m2.Unlock() + m1.Unlock() } -