implemented automatic persistence for trackings

This commit is contained in:
Timo Volkmann 2020-12-12 03:23:00 +01:00
parent 09791727a4
commit 1ea6822202
4 changed files with 332 additions and 308 deletions

View File

@ -1,66 +1,68 @@
package main package main
import ( import (
"git.timovolkmann.de/gyrogpsc/core" "git.timovolkmann.de/gyrogpsc/core"
"git.timovolkmann.de/gyrogpsc/storage" "git.timovolkmann.de/gyrogpsc/storage"
"git.timovolkmann.de/gyrogpsc/web" "git.timovolkmann.de/gyrogpsc/web"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/viper" "github.com/spf13/viper"
"time" "os"
"time"
) )
func main() { func main() {
conf := configurationFromFile() conf := configurationFromFile()
repo := storage.NewRepository(conf) repo := storage.NewRepository(conf)
disp := core.NewDispatcher() disp := core.NewDispatcher()
service := core.TrackingService(repo, disp, conf) service := core.TrackingService(repo, disp, conf)
go func() { go func() {
service.NewSetup(core.TCP, core.SERIAL) service.NewSetup(core.TCP, core.SERIAL)
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
service.StartRecord() service.StartRecord()
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
service.StopRecord() service.StopRecord()
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
service.NewSetup(core.TCP) service.NewSetup(core.TCP, core.SERIAL)
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
service.StartRecord() service.StartRecord()
time.Sleep(5 * time.Second) time.Sleep(120 * time.Second)
service.StopRecord() service.StopRecord()
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
service.StopAll() service.StopAll()
}() os.Exit(0)
}()
web.CreateServer(service, disp, conf) web.CreateServer(service, disp, conf)
} }
func configurationFromFile() *core.Configuration { func configurationFromFile() *core.Configuration {
viper.SetDefault("collectors.porttcp", ":3010") viper.SetDefault("collectors.porttcp", ":3010")
viper.SetDefault("collectors.portserial", "/dev/tty.usbmodem14201") viper.SetDefault("collectors.portserial", "/dev/tty.usbmodem14201")
viper.SetDefault("webserver.port", ":3011") viper.SetDefault("webserver.port", ":3011")
viper.SetDefault("pipeline.publishIntervalMs", 50) viper.SetDefault("pipeline.publishIntervalMs", 50)
viper.SetDefault("pipeline.syncUpdateIntervalMs", 494) viper.SetDefault("pipeline.syncUpdateIntervalMs", 494)
viper.SetDefault("debuglevel", "INFO") viper.SetDefault("debuglevel", "INFO")
viper.SetConfigName("gpsconfig") // name of config file (without extension) viper.SetConfigName("gpsconfig") // name of config file (without extension)
viper.SetConfigType("yaml") viper.SetConfigType("yaml")
viper.AddConfigPath(".") viper.AddConfigPath(".")
viper.AddConfigPath("./../../") viper.AddConfigPath("./../../")
if err := viper.ReadInConfig(); err != nil { if err := viper.ReadInConfig(); err != nil {
logrus.Warn("couldn't find config file. using standard configuration") logrus.Warn("couldn't find config file. using standard configuration")
} }
c := core.Configuration{} c := core.Configuration{}
if err := viper.Unmarshal(&c); err != nil { if err := viper.Unmarshal(&c); err != nil {
logrus.Debug("couldn't load config...") logrus.Debug("couldn't load config...")
logrus.Error(err) logrus.Error(err)
} }
lvl, err := logrus.ParseLevel(c.Debuglevel) lvl, err := logrus.ParseLevel(c.Debuglevel)
if err != nil { if err != nil {
logrus.Error(err) logrus.Error(err)
} }
logrus.SetLevel(lvl) logrus.SetLevel(lvl)
return &c return &c
} }

View File

@ -1,37 +1,49 @@
package main package main
import ( import (
"git.timovolkmann.de/gyrogpsc/core" "git.timovolkmann.de/gyrogpsc/core"
"git.timovolkmann.de/gyrogpsc/storage" "git.timovolkmann.de/gyrogpsc/storage"
"git.timovolkmann.de/gyrogpsc/web" "git.timovolkmann.de/gyrogpsc/web"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
func main() { func main() {
conf := &core.Configuration{} conf := configurationFromFile()
configurationFromFile(conf)
repo := storage.NewRepository(conf) repo := storage.NewRepository(conf)
disp := core.NewDispatcher() disp := core.NewDispatcher()
service := core.TrackingService(repo, disp, conf) service := core.TrackingService(repo, disp, conf)
web.CreateServer(service, disp, conf) web.CreateServer(service, disp, conf)
} }
func configurationFromFile(c *core.Configuration) { func configurationFromFile() *core.Configuration {
viper.SetDefault("TcpCollectorPort", ":3010") viper.SetDefault("collectors.porttcp", ":3010")
viper.SetDefault("SerialCollectorPort", "/dev/tty.usbmodem14201") viper.SetDefault("collectors.portserial", "/dev/tty.usbmodem14201")
viper.SetDefault("HttpPort", "layouts") viper.SetDefault("webserver.port", ":3011")
viper.SetDefault("publishIntervalMs", 50) viper.SetDefault("pipeline.publishIntervalMs", 50)
viper.SetDefault("syncUpdateIntervalMs", 494) viper.SetDefault("pipeline.syncUpdateIntervalMs", 494)
viper.SetDefault("debuglevel", "INFO")
viper.SetConfigName("gpsconfig") // name of config file (without extension) viper.SetConfigName("gpsconfig") // name of config file (without extension)
viper.SetConfigType("yaml") viper.SetConfigType("yaml")
viper.AddConfigPath(".") viper.AddConfigPath(".")
viper.AddConfigPath("./../../") viper.AddConfigPath("./../../")
if err := viper.ReadInConfig(); err != nil {
logrus.Warn("couldn't find config file. using standard configuration")
}
viper.Unmarshal(c) c := core.Configuration{}
logrus.Println(c) if err := viper.Unmarshal(&c); err != nil {
logrus.Debug("couldn't load config...")
logrus.Error(err)
}
lvl, err := logrus.ParseLevel(c.Debuglevel)
if err != nil {
logrus.Error(err)
}
logrus.SetLevel(lvl)
return &c
} }

View File

@ -1,199 +1,210 @@
package core package core
import ( import (
"fmt" "fmt"
"git.timovolkmann.de/gyrogpsc/ublox" "git.timovolkmann.de/gyrogpsc/ublox"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"go.bug.st/serial" "go.bug.st/serial"
"net" "net"
"os" "os"
"sync" "sync"
"time"
) )
type Collector interface { type Collector interface {
Collect() Collect()
Close() Close()
} }
type CollectorType uint8 type CollectorType uint8
const ( const (
SERIAL CollectorType = iota SERIAL CollectorType = iota
TCP TCP
) )
var tcpSingleton *tcpCollector var tcpSingleton *tcpCollector
func NewCollector(typ CollectorType, proc Pusher, config *Configuration) Collector { func NewCollector(typ CollectorType, proc Pusher, config *Configuration) Collector {
var coll Collector var coll Collector
switch typ { switch typ {
case SERIAL: case SERIAL:
coll = newSerial(proc, config) coll = newSerial(proc, config)
case TCP: case TCP:
if tcpSingleton == nil { if tcpSingleton == nil {
tcpSingleton = newTcp(proc, config) tcpSingleton = newTcp(proc, config)
} else { } else {
tcpSingleton.SetProcessor(proc) tcpSingleton.SetProcessor(proc)
} }
coll = tcpSingleton coll = tcpSingleton
default: default:
panic("selected collector type not implemented") panic("selected collector type not implemented")
} }
return coll return coll
} }
type serialCollector struct { type serialCollector struct {
active bool active bool
proc Pusher proc Pusher
config *Configuration config *Configuration
mu sync.RWMutex mu sync.RWMutex
} }
func (s *serialCollector) isSerialCollActive() bool { func (s *serialCollector) isSerialCollActive() bool {
s.mu.RLock() s.mu.RLock()
defer s.mu.RUnlock() defer s.mu.RUnlock()
return s.active return s.active
} }
func (s *serialCollector) Collect() { func (s *serialCollector) Collect() {
s.mu.Lock() s.mu.Lock()
s.active = true s.active = true
s.mu.Unlock() s.mu.Unlock()
go func() { go func() {
logrus.Println("start serial collector") logrus.Println("start serial collector")
mode := &serial.Mode{ mode := &serial.Mode{
BaudRate: 115200, BaudRate: 115200,
} }
port, err := serial.Open(s.config.Collectors.SerialCollectorPort, mode) port, err := serial.Open(s.config.Collectors.SerialCollectorPort, mode)
if err != nil { if err != nil {
logrus.Fatalln("can't open serial port:", err.Error()) logrus.Warn("can't open serial port:", err)
} //if e, ok := err.(serial.PortError); ok && e.Code() == serial.PortBusy {
defer port.Close() for i := 3; i < 20; i = i + 2 {
logrus.Warnf("try again in -> %vms", i*i)
time.Sleep(time.Millisecond * time.Duration(i*i))
port, err = serial.Open(s.config.Collectors.SerialCollectorPort, mode)
if err == nil {
break
}
}
if err != nil {
logrus.Fatal(err)
}
//}
}
defer port.Close()
decoder := ublox.NewDecoder(port) decoder := ublox.NewDecoder(port)
for s.isSerialCollActive() { for s.isSerialCollActive() {
meas, err := decoder.Decode() meas, err := decoder.Decode()
if err != nil { if err != nil {
if err.Error() == "NMEA not implemented" { if err.Error() == "NMEA not implemented" {
continue continue
} }
logrus.Println("serial read err:", err) logrus.Println("serial read err:", err)
break break
} }
sd, err := ConvertUbxToSensorData(meas) sd, err := ConvertUbxToSensorData(meas)
if err != nil { if err != nil {
logrus.Println("convert err:", err, meas, sd) logrus.Println("convert err:", err, meas, sd)
continue continue
} }
// skip irrelevant messages // skip irrelevant messages
if sd == nil { if sd == nil {
continue continue
} }
err = s.proc.Push(sd) err = s.proc.Push(sd)
if err != nil { if err != nil {
logrus.Println("process err:", err, *sd) logrus.Println("process err:", err, *sd)
continue continue
} }
} }
logrus.Println("serial collector stopped") logrus.Println("serial collector stopped")
}() }()
} }
func (s *serialCollector) Close() { func (s *serialCollector) Close() {
s.mu.Lock() s.mu.Lock()
s.active = false s.active = false
s.mu.Unlock() s.mu.Unlock()
} }
func newSerial(proc Pusher, config *Configuration) *serialCollector { func newSerial(proc Pusher, config *Configuration) *serialCollector {
return &serialCollector{ return &serialCollector{
active: false, active: false,
proc: proc, proc: proc,
config: config, config: config,
} }
} }
type tcpCollector struct { type tcpCollector struct {
active bool active bool
processor Pusher processor Pusher
//config *Configuration //config *Configuration
} }
func (t *tcpCollector) Collect() { func (t *tcpCollector) Collect() {
t.active = true t.active = true
} }
func (t *tcpCollector) Close() { func (t *tcpCollector) Close() {
t.active = false t.active = false
} }
func (t *tcpCollector) SetProcessor(p Pusher) { func (t *tcpCollector) SetProcessor(p Pusher) {
t.processor = p t.processor = p
} }
func newTcp(proc Pusher, config *Configuration) *tcpCollector { func newTcp(proc Pusher, config *Configuration) *tcpCollector {
logrus.Println("start tcp collector") logrus.Println("start tcp collector")
listener, err := net.Listen("tcp", config.Collectors.TcpCollectorPort) listener, err := net.Listen("tcp", config.Collectors.TcpCollectorPort)
if err != nil { if err != nil {
fmt.Println("Error listening:", err) fmt.Println("Error listening:", err)
//os.Exit(1) //os.Exit(1)
} }
coll := &tcpCollector{ coll := &tcpCollector{
active: false, active: false,
processor: proc, processor: proc,
} }
go func() { go func() {
for { for {
// Listen for an incoming connection. // Listen for an incoming connection.
conn, err := listener.Accept() conn, err := listener.Accept()
if err != nil { if err != nil {
fmt.Println("Error accepting: ", err.Error()) fmt.Println("Error accepting: ", err.Error())
os.Exit(1) os.Exit(1)
} }
logrus.Println("...new incoming tcp connection...") logrus.Println("...new incoming tcp connection...")
// Handle connections in a new goroutine. // Handle connections in a new goroutine.
go coll.jsonHandler(conn) go coll.jsonHandler(conn)
} }
}() }()
return coll return coll
} }
// handles incoming tcp connections with json payload. // handles incoming tcp connections with json payload.
func (c *tcpCollector) jsonHandler(conn net.Conn) { func (c *tcpCollector) jsonHandler(conn net.Conn) {
defer conn.Close() defer conn.Close()
// TRY reader := bufio.NewReader(conn) OR NewScanner(conn) // TRY reader := bufio.NewReader(conn) OR NewScanner(conn)
buf := make([]byte, 2048) buf := make([]byte, 2048)
for { for {
// Read the incoming connection into the buffer. // Read the incoming connection into the buffer.
n, err := conn.Read(buf) n, err := conn.Read(buf)
if err != nil { if err != nil {
fmt.Println("TCP error - reading from connection:", n, err.Error()) fmt.Println("TCP error - reading from connection:", n, err.Error())
break break
} }
//json := pretty.Pretty(buf[:n]) //json := pretty.Pretty(buf[:n])
//fmt.Println(string(json)) //fmt.Println(string(json))
//fmt.Println(string(buf[:n])) //fmt.Println(string(buf[:n]))
sd, err := ConvertSensorDataPhone(buf[:n]) sd, err := ConvertSensorDataPhone(buf[:n])
if err != nil { if err != nil {
logrus.Println(err) logrus.Println(err)
continue continue
} }
if !c.active { if !c.active {
//time.Sleep(50 * time.Millisecond) //time.Sleep(50 * time.Millisecond)
continue continue
} }
err = c.processor.Push(sd) err = c.processor.Push(sd)
if err != nil { if err != nil {
logrus.Fatalln(err) logrus.Fatalln(err)
} }
} }
} }

View File

@ -1,41 +1,41 @@
package core package core
import ( import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"time" "time"
) )
type OpMode uint8 type OpMode uint8
const ( const (
STOPPED OpMode = iota STOPPED OpMode = iota
LIVE LIVE
RECORDING RECORDING
REPLAY REPLAY
) )
type trackingService struct { type trackingService struct {
current *Tracking current *Tracking
config *Configuration config *Configuration
pipe *pipeline pipe *pipeline
repo Repo repo Repo
opMode OpMode opMode OpMode
collectors []Collector collectors []Collector
} }
func TrackingService(r Repo, d Publisher, c *Configuration) *trackingService { func TrackingService(r Repo, d Publisher, c *Configuration) *trackingService {
t := &Tracking{} t := &Tracking{}
ts := &trackingService{ ts := &trackingService{
current: t, current: t,
opMode: STOPPED, opMode: STOPPED,
config: c, config: c,
repo: r, repo: r,
pipe: NewPipeline(d, t, c), pipe: NewPipeline(d, t, c),
collectors: nil, collectors: nil,
} }
//ts.pipe.Run() //ts.pipe.Run()
return ts return ts
} }
//const( //const(
@ -43,107 +43,106 @@ func TrackingService(r Repo, d Publisher, c *Configuration) *trackingService {
//) //)
func (t *trackingService) AllTrackings() { func (t *trackingService) AllTrackings() {
panic("implement me") panic("implement me")
} }
func (t *trackingService) NewSetup(cols ...CollectorType) { func (t *trackingService) NewSetup(cols ...CollectorType) {
logrus.Info("SERVICE: NEW SETUP") logrus.Info("SERVICE: NEW SETUP")
if t.opMode == RECORDING { if t.opMode == RECORDING {
logrus.Println("trackingservice: no reset while recording") logrus.Println("trackingservice: no reset while recording")
return return
} }
if t.opMode == LIVE { if t.opMode == LIVE {
logrus.Println("trackingservice: stop currently running setup before creating new one") logrus.Println("trackingservice: stop currently running setup before creating new one")
t.StopAll() t.StopAll()
} time.Sleep(20 * time.Millisecond)
logrus.Debug("new tracking:", cols) }
t.opMode = LIVE logrus.Debug("new tracking:", cols)
t.collectors = nil t.opMode = LIVE
for _, col := range cols { t.collectors = nil
t.collectors = append(t.collectors, NewCollector(col, t.pipe, t.config)) for _, col := range cols {
} t.collectors = append(t.collectors, NewCollector(col, t.pipe, t.config))
t.safelyReplaceTracking(emptyTracking()) }
t.current.Collectors = cols t.safelyReplaceTracking(emptyTracking())
for _, e := range t.collectors { t.current.Collectors = cols
e.Collect() for _, e := range t.collectors {
} e.Collect()
t.pipe.Run() }
//time.Sleep(3 * time.Second) t.pipe.Run()
//time.Sleep(3 * time.Second)
} }
func (t *trackingService) StartRecord() { func (t *trackingService) StartRecord() {
logrus.Info("SERVICE: START RECORD") logrus.Info("SERVICE: START RECORD")
if t.opMode != LIVE { if t.opMode != LIVE {
logrus.Println("trackingservice: wrong mode of operation") logrus.Println("trackingservice: wrong mode of operation")
return return
} }
t.opMode = RECORDING t.opMode = RECORDING
t.current.TimeCreated = time.Now() t.current.TimeCreated = time.Now()
t.pipe.Record() t.pipe.Record()
} }
func (t *trackingService) StopRecord() { func (t *trackingService) StopRecord() {
logrus.Info("SERVICE: STOP RECORD") logrus.Info("SERVICE: STOP RECORD")
if t.opMode != RECORDING { if t.opMode != RECORDING {
logrus.Println("trackingservice: couldn't stop. not recording") logrus.Println("trackingservice: couldn't stop. not recording")
return return
} }
t.opMode = LIVE t.opMode = LIVE
t.pipe.StopRecord() t.pipe.StopRecord()
m1.Lock() m1.Lock()
m2.Lock() m2.Lock()
err := t.repo.Save(*t.current) err := t.repo.Save(*t.current)
m2.Unlock() m2.Unlock()
m1.Unlock() m1.Unlock()
if err != nil { if err != nil {
logrus.Println(err) logrus.Println(err)
} }
t.safelyReplaceTracking(emptyTracking()) t.safelyReplaceTracking(emptyTracking())
} }
func (t *trackingService) StopAll() { func (t *trackingService) StopAll() {
logrus.Info("SERVICE: STOP ALL") logrus.Info("SERVICE: STOP ALL")
if t.opMode == RECORDING { if t.opMode == RECORDING {
logrus.Println("trackingservice: stop recording gracefully") logrus.Println("trackingservice: stop recording gracefully")
t.StopRecord() t.StopRecord()
} }
t.opMode = STOPPED t.opMode = STOPPED
t.pipe.Close() t.pipe.Close()
for _, e := range t.collectors { for _, e := range t.collectors {
e.Close() e.Close()
} }
t.collectors = nil t.collectors = nil
t.safelyReplaceTracking(emptyTracking()) t.safelyReplaceTracking(emptyTracking())
} }
func (t *trackingService) DeleteTracking(trackingId uuid.UUID) { func (t *trackingService) DeleteTracking(trackingId uuid.UUID) {
panic("implement me") panic("implement me")
} }
func (t *trackingService) StartReplay() { func (t *trackingService) StartReplay() {
panic("implement me") panic("implement me")
} }
func (t *trackingService) PauseReplay() { func (t *trackingService) PauseReplay() {
panic("implement me") panic("implement me")
} }
func (t *trackingService) StopReplay() { func (t *trackingService) StopReplay() {
panic("implement me") panic("implement me")
} }
func (t *trackingService) LoadTracking(trackingId uuid.UUID) { func (t *trackingService) LoadTracking(trackingId uuid.UUID) {
} }
func (t *trackingService) safelyReplaceTracking(tr Tracking) { func (t *trackingService) safelyReplaceTracking(tr Tracking) {
m1.Lock() m1.Lock()
m2.Lock() m2.Lock()
*t.current = tr *t.current = tr
m2.Unlock() m2.Unlock()
m1.Unlock() m1.Unlock()
} }