gyrogpsc/core/collectors.go

237 lines
4.6 KiB
Go

package core
import (
"bufio"
"fmt"
"git.timovolkmann.de/gyrogpsc/ublox"
"github.com/sirupsen/logrus"
"go.bug.st/serial"
"net"
"os"
"time"
)
type Collector interface {
Collect()
Stop()
OutChannel() chan interface{}
}
type CollectorType string
const (
SERIAL CollectorType = "SERIAL_COLLECTOR"
TCP CollectorType = "TCP_COLLECTOR"
)
var tcpSingleton *tcpCollector
func NewCollector(config *Configuration, typ CollectorType) Collector {
var coll Collector
switch typ {
case SERIAL:
coll = newSerial(config)
case TCP:
if tcpSingleton == nil {
tcpSingleton = newTcp(config)
}
coll = tcpSingleton
default:
panic("selected collector type not implemented")
}
return coll
}
type serialCollector struct {
out chan interface{}
config *Configuration
}
func (s *serialCollector) OutChannel() chan interface{} {
return s.out
}
func (s *serialCollector) Collect() {
go func(ch chan interface{}) {
logrus.Println("start serial collector")
mode := &serial.Mode{
BaudRate: 115200,
}
port, err := serial.Open(s.config.Collectors.SerialCollectorPort, mode)
if err != nil {
logrus.Warn("can't open serial port:", err)
//if e, ok := err.(serial.PortError); ok && e.Code() == serial.PortBusy {
for i := 3; i < 20; i = i + 2 {
logrus.Warnf("try again in -> %vms", i*i)
time.Sleep(time.Millisecond * time.Duration(i*i))
port, err = serial.Open(s.config.Collectors.SerialCollectorPort, mode)
if err == nil {
break
}
}
if err != nil {
logrus.Errorln(err)
}
//}
}
decoder := ublox.NewDecoder(port)
defer func() {
if r := recover(); r != nil {
logrus.Infoln("stopped collecting channel: serial")
}
logrus.Infoln("close serial port")
port.Close()
}()
maxSkip := 0
loop:
for {
//logrus.Trace("serial collector running")
meas, err2 := decoder.Decode()
if err2 != nil {
if err2.Error() == "NMEA not implemented" {
continue
}
logrus.Println("serial read err:", err2)
break
}
select {
case ch <- meas:
maxSkip = 0
default:
logrus.Traceln("skip collecting serial messages")
if maxSkip >= 10 {
break loop
}
maxSkip++
}
}
logrus.Println("serial collector stopped")
}(s.out)
}
func (s *serialCollector) Stop() {
defer func() {
if recover() != nil {
logrus.Debugln("channel already closed")
}
}()
loop:
for {
select {
case <-s.out:
logrus.Debugln("waiting")
default:
logrus.Debugln("breaking")
break loop
}
}
close(s.out)
s.out = make(chan interface{}, 16)
}
func newSerial(config *Configuration) *serialCollector {
return &serialCollector{
out: make(chan interface{}, 16),
config: config,
}
}
type tcpCollector struct {
out chan interface{}
config *Configuration
}
func (t *tcpCollector) OutChannel() chan interface{} {
return t.out
}
func (t *tcpCollector) Collect() {
logrus.Info("starting tcp collector")
}
func (t *tcpCollector) Stop() {
defer func() {
if recover() != nil {
logrus.Debugln("channel already closed")
}
}()
loop:
for {
select {
case <-t.out:
logrus.Debugln("waiting")
default:
logrus.Debugln("breaking")
break loop
}
}
close(t.out)
t.out = make(chan interface{}, 16)
}
func newTcp(config *Configuration) *tcpCollector {
//func newTcp(proc Processor, config *Configuration) *tcpCollector {
logrus.Println("listen for tcp connections on port", config.Collectors.TcpCollectorPort)
listener, err := net.Listen("tcp", config.Collectors.TcpCollectorPort)
if err != nil {
fmt.Println("Error listening:", err)
//os.Exit(1)
}
c := &tcpCollector{
out: make(chan interface{}, 16),
config: config,
}
go func() {
for {
// Listen for an incoming connection.
conn, err := listener.Accept()
if err != nil {
logrus.Errorln("Error accepting: ", err.Error())
os.Exit(1)
}
logrus.Infoln("new incoming tcp connection...")
// Handle connections in a new goroutine.
go c.connectionHandler(conn)
}
}()
return c
}
// handles incoming tcp connections.
func (t *tcpCollector) connectionHandler(conn net.Conn) {
defer func() {
if r := recover(); r != nil {
logrus.Infoln("stopped collecting tcp: panic")
}
logrus.Infoln("close connection")
conn.Close()
}()
sc := bufio.NewScanner(conn)
skipped := 0
//loop:
for sc.Scan() {
// Read the incoming connection into the buffer.
res := append([]byte{}, sc.Bytes()...)
if err2 := sc.Err(); err2 != nil {
logrus.Warn("lost tcp link:", err2)
break
}
select {
case t.out <- res:
skipped = 0
default:
logrus.Traceln("skip collecting tcp messages")
//if skipped >= 10 {
// break loop
//}
skipped++
}
}
}