package core import ( "bufio" "fmt" "git.timovolkmann.de/gyrogpsc/ublox" "github.com/sirupsen/logrus" "go.bug.st/serial" "net" "os" "time" ) type Collector interface { Collect() Stop() OutChannel() chan interface{} } type CollectorType string const ( SERIAL CollectorType = "SERIAL_COLLECTOR" TCP CollectorType = "TCP_COLLECTOR" ) var tcpSingleton *tcpCollector func NewCollector(config *Configuration, typ CollectorType) Collector { var coll Collector switch typ { case SERIAL: coll = newSerial(config) case TCP: if tcpSingleton == nil { tcpSingleton = newTcp(config) } coll = tcpSingleton default: panic("selected collector type not implemented") } return coll } type serialCollector struct { out chan interface{} config *Configuration } func (s *serialCollector) OutChannel() chan interface{} { return s.out } func (s *serialCollector) Collect() { go func(ch chan interface{}) { logrus.Println("start serial collector") mode := &serial.Mode{ BaudRate: 115200, } port, err := serial.Open(s.config.Collectors.SerialCollectorPort, mode) if err != nil { logrus.Warn("can't open serial port:", err) //if e, ok := err.(serial.PortError); ok && e.Code() == serial.PortBusy { for i := 3; i < 20; i = i + 2 { logrus.Warnf("try again in -> %vms", i*i) time.Sleep(time.Millisecond * time.Duration(i*i)) port, err = serial.Open(s.config.Collectors.SerialCollectorPort, mode) if err == nil { break } } if err != nil { logrus.Errorln(err) } //} } decoder := ublox.NewDecoder(port) defer func() { if r := recover(); r != nil { logrus.Infoln("stopped collecting channel: serial") } logrus.Infoln("close serial port") port.Close() }() maxSkip := 0 loop: for { //logrus.Trace("serial collector running") meas, err2 := decoder.Decode() if err2 != nil { if err2.Error() == "NMEA not implemented" { continue } logrus.Println("serial read err:", err2) break } select { case ch <- meas: maxSkip = 0 default: logrus.Traceln("skip collecting serial messages") if maxSkip >= 10 { break loop } maxSkip++ } } logrus.Println("serial collector stopped") }(s.out) } func (s *serialCollector) Stop() { defer func() { if recover() != nil { logrus.Debugln("channel already closed") } }() loop: for { select { case <-s.out: logrus.Debugln("waiting") default: logrus.Debugln("breaking") break loop } } close(s.out) s.out = make(chan interface{}, 16) } func newSerial(config *Configuration) *serialCollector { return &serialCollector{ out: make(chan interface{}, 16), config: config, } } type tcpCollector struct { out chan interface{} config *Configuration } func (t *tcpCollector) OutChannel() chan interface{} { return t.out } func (t *tcpCollector) Collect() { logrus.Info("starting tcp collector") } func (t *tcpCollector) Stop() { defer func() { if recover() != nil { logrus.Debugln("channel already closed") } }() loop: for { select { case <-t.out: logrus.Debugln("waiting") default: logrus.Debugln("breaking") break loop } } close(t.out) t.out = make(chan interface{}, 16) } func newTcp(config *Configuration) *tcpCollector { //func newTcp(proc Processor, config *Configuration) *tcpCollector { logrus.Println("listen for tcp connections on port", config.Collectors.TcpCollectorPort) listener, err := net.Listen("tcp", config.Collectors.TcpCollectorPort) if err != nil { fmt.Println("Error listening:", err) //os.Exit(1) } c := &tcpCollector{ out: make(chan interface{}, 16), config: config, } go func() { for { // Listen for an incoming connection. conn, err := listener.Accept() if err != nil { logrus.Errorln("Error accepting: ", err.Error()) os.Exit(1) } logrus.Infoln("new incoming tcp connection...") // Handle connections in a new goroutine. go c.connectionHandler(conn) } }() return c } // handles incoming tcp connections. func (t *tcpCollector) connectionHandler(conn net.Conn) { defer func() { if r := recover(); r != nil { logrus.Infoln("stopped collecting tcp: panic") } logrus.Infoln("close connection") conn.Close() }() sc := bufio.NewScanner(conn) skipped := 0 //loop: for sc.Scan() { // Read the incoming connection into the buffer. res := append([]byte{}, sc.Bytes()...) if err2 := sc.Err(); err2 != nil { logrus.Warn("lost tcp link:", err2) break } select { case t.out <- res: skipped = 0 default: logrus.Traceln("skip collecting tcp messages") //if skipped >= 10 { // break loop //} skipped++ } } }