[WIP] add functionality
This commit is contained in:
parent
dfde29ed10
commit
2a9c3a1153
6
.gitignore
vendored
6
.gitignore
vendored
@ -155,4 +155,8 @@ Network Trash Folder
|
||||
Temporary Items
|
||||
.apdisk
|
||||
|
||||
# End of https://www.toptal.com/developers/gitignore/api/macos,go,intellij
|
||||
# End of https://www.toptal.com/developers/gitignore/api/macos,go,intellij
|
||||
|
||||
# CUSTOM
|
||||
.env
|
||||
config.yml
|
||||
@ -3,6 +3,8 @@ package main
|
||||
import (
|
||||
"git.timovolkmann.de/gyrogpsc/core"
|
||||
"log"
|
||||
"os"
|
||||
"runtime"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -12,14 +14,19 @@ const (
|
||||
)
|
||||
|
||||
func main() {
|
||||
log.Println("GOROOT:", runtime.GOROOT())
|
||||
wd, _ := os.Getwd()
|
||||
log.Println("WorkingDir:", wd)
|
||||
|
||||
log.Println("setup dispatcher")
|
||||
dispatcher := core.NewDispatcher()
|
||||
log.Println("initialize processing pipeline")
|
||||
processor := core.NewPipeline(dispatcher, 50, 494)
|
||||
processor.Run()
|
||||
collectRoutines(processor)
|
||||
//collectRoutines(processor)
|
||||
log.Println("start http server")
|
||||
core.HttpListenAndServe(dispatcher, HTTP_PORT)
|
||||
core.FiberListenAndServe(dispatcher)
|
||||
//core.HttpListenAndServe(dispatcher, HTTP_PORT)
|
||||
}
|
||||
|
||||
func collectRoutines(proc core.Processor) {
|
||||
|
||||
@ -104,3 +104,7 @@ func SerialCollector(proc Processor, serialPort string) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func replayCollector(storage string, proc Processor, trackingId int) {
|
||||
|
||||
}
|
||||
|
||||
11
core/config.go
Normal file
11
core/config.go
Normal file
@ -0,0 +1,11 @@
|
||||
package core
|
||||
|
||||
type Configuration struct {
|
||||
TcpCollectorPort string
|
||||
SerialCollectorPort string
|
||||
HttpPort string
|
||||
}
|
||||
|
||||
func LoadConfigYaml() error {
|
||||
return nil
|
||||
}
|
||||
57
core/http.go
57
core/http.go
@ -2,6 +2,8 @@ package core
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
ws "github.com/gofiber/websocket/v2"
|
||||
"github.com/gorilla/websocket"
|
||||
"html/template"
|
||||
"log"
|
||||
@ -69,3 +71,58 @@ func HttpListenAndServe(sub Subscriber, httpPort string) {
|
||||
log.Println("start server")
|
||||
log.Fatal(http.ListenAndServe(httpPort, nil))
|
||||
}
|
||||
|
||||
func FiberListenAndServe(sub Subscriber) {
|
||||
app := fiber.New()
|
||||
app.Static("/static", "static")
|
||||
|
||||
// Application Main Page
|
||||
app.Get("/", fiberHomeHandler)
|
||||
|
||||
// Websocket
|
||||
app.Get("/ws", ws.New(fiberWebsocketHandler))
|
||||
|
||||
// Tracking persistence controls
|
||||
trackings := app.Group("/trackings")
|
||||
trackings.Get("/") // Get all trackings Metadata
|
||||
trackings.Post("/") // Initialize new tracking by websocket and prepare for automatic recording. Toggle ?serial=true and ?tcp=true. Returns trackingId
|
||||
trackings.Patch("/") // Starts recording
|
||||
trackings.Put("/") // Stops current recording. Returns trackingId if record was successful
|
||||
trackings.Delete("/") // Stops websocket connection, pipelines and collectors
|
||||
|
||||
trackings.Get("/:trackingId") // Gets Tracking Metadata by Id.
|
||||
trackings.Post("/:trackingId") // Starts Replay.
|
||||
trackings.Patch("/:trackingId") // Pauses Replay.
|
||||
trackings.Put("/:trackingId") // Stops Replay.
|
||||
trackings.Delete("/:trackingId") // Deletes Tracking from database
|
||||
|
||||
log.Fatal(app.Listen(":3011"))
|
||||
}
|
||||
|
||||
func fiberWebsocketHandler(c *ws.Conn) {
|
||||
for {
|
||||
mt, msg, err := c.ReadMessage()
|
||||
if err != nil {
|
||||
log.Println("read:", err)
|
||||
break
|
||||
}
|
||||
log.Printf("recv: %s", msg)
|
||||
err = c.WriteMessage(mt, msg)
|
||||
if err != nil {
|
||||
log.Println("write:", err)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func fiberHomeHandler(c *fiber.Ctx) error {
|
||||
tpl, err := template.ParseFiles("static/index.html")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = tpl.Execute(c, "ws://"+c.Hostname()+"/ws")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -13,19 +13,25 @@ type Processor interface {
|
||||
Process(data *Sensordata) error
|
||||
}
|
||||
|
||||
type Storer interface {
|
||||
Persist()
|
||||
}
|
||||
|
||||
type pipeline struct {
|
||||
syn synchronizer
|
||||
agr aggregator
|
||||
pub Publisher
|
||||
syn synchronizer
|
||||
agr aggregator
|
||||
pub Publisher
|
||||
//stor Storer
|
||||
publishTicker *time.Ticker
|
||||
}
|
||||
|
||||
func NewPipeline(d Publisher, publishIntervalMs int, delayUpdateIntervalMs int) *pipeline {
|
||||
func NewPipeline(d Publisher, publishIntervalMs int, syncUpdateIntervalMs int) *pipeline {
|
||||
// TODO: replace timing params with config struct
|
||||
return &pipeline{
|
||||
synchronizer{
|
||||
//bufferSize: 100,
|
||||
mutex: &sync.Mutex{},
|
||||
updateTicker: time.NewTicker(time.Duration(delayUpdateIntervalMs) * time.Millisecond),
|
||||
updateTicker: time.NewTicker(time.Duration(syncUpdateIntervalMs) * time.Millisecond),
|
||||
},
|
||||
aggregator{
|
||||
tcpMutex: &sync.Mutex{},
|
||||
@ -143,6 +149,7 @@ func (p *pipeline) Process(data *Sensordata) error {
|
||||
return errors.New("nil processing not allowed")
|
||||
}
|
||||
//log.Println(string(data.SourceId))
|
||||
// TODO: persist data here with current timestamp
|
||||
switch data.SourceId {
|
||||
case SOURCE_TCP:
|
||||
go p.pushTcpDataToBuffer(*data)
|
||||
|
||||
12
example_config.yml
Normal file
12
example_config.yml
Normal file
@ -0,0 +1,12 @@
|
||||
# Server configurations
|
||||
server:
|
||||
port: 8000
|
||||
|
||||
collectors:
|
||||
port_tcp:
|
||||
port_serial:
|
||||
|
||||
# Database credentials
|
||||
database:
|
||||
user: "admin"
|
||||
pass: "super-pedro-1980"
|
||||
7
go.mod
7
go.mod
@ -3,10 +3,11 @@ module git.timovolkmann.de/gyrogpsc
|
||||
go 1.15
|
||||
|
||||
require (
|
||||
github.com/gofiber/fiber/v2 v2.2.4
|
||||
github.com/gofiber/websocket/v2 v2.0.2
|
||||
github.com/gorilla/websocket v1.4.2
|
||||
github.com/m7shapan/njson v1.0.1
|
||||
github.com/stretchr/testify v1.6.1 // indirect
|
||||
github.com/tidwall/gjson v1.6.0
|
||||
github.com/tidwall/pretty v1.0.2
|
||||
github.com/tidwall/pretty v1.0.2 // indirect
|
||||
go.bug.st/serial v1.1.1
|
||||
golang.org/x/sys v0.0.0-20201107080550-4d91cf3a1aaf // indirect
|
||||
)
|
||||
|
||||
43
go.sum
43
go.sum
@ -1,13 +1,25 @@
|
||||
github.com/andybalholm/brotli v1.0.0 h1:7UCwP93aiSfvWpapti8g88vVVGp2qqtGyePsSuDafo4=
|
||||
github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
|
||||
github.com/creack/goselect v0.1.1 h1:tiSSgKE1eJtxs1h/VgGQWuXUP0YS4CDIFMp6vaI1ls0=
|
||||
github.com/creack/goselect v0.1.1/go.mod h1:a/NhLweNvqIYMuxcMOuWY516Cimucms3DglDzQP3hKY=
|
||||
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/fasthttp/websocket v1.4.3 h1:qjhRJ/rTy4KB8oBxljEC00SDt6HUY9jLRfM601SUdS4=
|
||||
github.com/fasthttp/websocket v1.4.3/go.mod h1:5r4oKssgS7W6Zn6mPWap3NWzNPJNzUUh3baWTOhcYQk=
|
||||
github.com/gofiber/fiber/v2 v2.1.0/go.mod h1:aG+lMkwy3LyVit4CnmYUbUdgjpc3UYOltvlJZ78rgQ0=
|
||||
github.com/gofiber/fiber/v2 v2.2.4 h1:t2V2SxlbQGdt8+SS/Mo+tQB0pDQn7OajKdA72qHcBVw=
|
||||
github.com/gofiber/fiber/v2 v2.2.4/go.mod h1:Aso7/M+EQOinVkWp4LUYjdlTpKTBoCk2Qo4djnMsyHE=
|
||||
github.com/gofiber/websocket/v2 v2.0.2 h1:UA/6NpyG+vmPGlvJvW8MJPJpRFuS7abinZ5HbLuV8u0=
|
||||
github.com/gofiber/websocket/v2 v2.0.2/go.mod h1:7VBnzEVRK0K0eTIVc5GbXPF1JWUFnllY0X4cRtG2v78=
|
||||
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
|
||||
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/m7shapan/njson v1.0.1 h1:s+odQrPkzcCCGRTp46cD0XCVYN3pvdoaVwbFVmjAvys=
|
||||
github.com/m7shapan/njson v1.0.1/go.mod h1:4sidL3oRZO1KV5FkclRBPI7nqFzlIq3BwdxHRMlOa9U=
|
||||
github.com/klauspost/compress v1.10.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
|
||||
github.com/klauspost/compress v1.10.7 h1:7rix8v8GpI3ZBb0nSozFRgbtXKv+hOe+qfEpZqybrAg=
|
||||
github.com/klauspost/compress v1.10.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/savsgio/gotils v0.0.0-20200608150037-a5f6f5aef16c h1:2nF5+FZ4/qp7pZVL7fR6DEaSTzuDmNaFTyqp92/hwF8=
|
||||
github.com/savsgio/gotils v0.0.0-20200608150037-a5f6f5aef16c/go.mod h1:TWNAOTaVzGOXq8RbEvHnhzA/A2sLZzgn0m6URjnukY8=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
@ -20,11 +32,34 @@ github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0
|
||||
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
|
||||
github.com/tidwall/pretty v1.0.2 h1:Z7S3cePv9Jwm1KwS0513MRaoUe3S01WPbLNV40pwWZU=
|
||||
github.com/tidwall/pretty v1.0.2/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
|
||||
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
|
||||
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
|
||||
github.com/valyala/fasthttp v1.14.0/go.mod h1:ol1PCaL0dX20wC0htZ7sYCsvCYmrouYra0zHzaclZhE=
|
||||
github.com/valyala/fasthttp v1.16.0/go.mod h1:YOKImeEosDdBPnxc0gy7INqi3m1zK6A+xl6TwOBhHCA=
|
||||
github.com/valyala/fasthttp v1.17.0 h1:P8/koH4aSnJ4xbd0cUUFEGQs3jQqIxoDDyRQrUiAkqg=
|
||||
github.com/valyala/fasthttp v1.17.0/go.mod h1:jjraHZVbKOXftJfsOYoAjaeygpj5hr8ermTRJNroD7A=
|
||||
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a h1:0R4NLDRDZX6JcmhJgXi5E4b8Wg84ihbmUKp/GvSPEzc=
|
||||
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
|
||||
go.bug.st/serial v1.1.1 h1:5J1DpaIaSIruBi7jVnKXnhRS+YQ9+2PLJMtIZKoIgnc=
|
||||
go.bug.st/serial v1.1.1/go.mod h1:VmYBeyJWp5BnJ0tw2NUJHZdJTGl2ecBGABHlzRK1knY=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
|
||||
golang.org/x/net v0.0.0-20200602114024-627f9648deb9/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
|
||||
golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200909081042-eff7692f9009/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201107080550-4d91cf3a1aaf h1:kt3wY1Lu5MJAnKTfoMR52Cu4gwvna4VTzNOiT8tY73s=
|
||||
golang.org/x/sys v0.0.0-20201107080550-4d91cf3a1aaf/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201015000850-e3ed0017c211/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
|
||||
|
||||
Loading…
Reference in New Issue
Block a user