package core import ( "fmt" "github.com/google/uuid" "log" "net" "os" "time" "git.timovolkmann.de/gyrogpsc/ublox" "go.bug.st/serial" ) type Collector interface { Collect() Stop() } type CollectorType uint8 const ( SERIAL CollectorType = iota TCP ) var tcpSingleton *tcpCollector func NewCollector(typ CollectorType, proc Processor, config *Configuration) Collector { var coll Collector switch typ { case SERIAL: coll = newSerial(proc, config) case TCP: if tcpSingleton == nil { tcpSingleton = newTcp(proc, config) } else { tcpSingleton.SetProcessor(proc) } coll = tcpSingleton //case DATABASE: // if len(trackingId) != 1 { // panic("only 1 tracking per collector") // } // coll = newDbReplay(proc, config, trackingId[0]) default: panic("selected collector type not implemented") } return coll } type serialCollector struct { active bool proc Processor config *Configuration } func (s *serialCollector) Collect() { go func() { log.Println("start serial collectors") mode := &serial.Mode{ BaudRate: 115200, } port, err := serial.Open(s.config.SerialCollectorPort, mode) if err != nil { log.Fatalln(err.Error()) } defer port.Close() decoder := ublox.NewDecoder(port) for s.active { meas, err := decoder.Decode() if err != nil { if err.Error() == "NMEA not implemented" { continue } log.Println("serial read err:", err) break } sd, err := ConvertUbxToSensorData(meas) if err != nil { log.Println("convert err:", err, meas) continue } if sd == nil { continue } err = s.proc.Push(sd) if err != nil { log.Println("process err:", err, *sd) continue } } }() } func (s *serialCollector) Stop() { panic("implement me") } func newSerial(proc Processor, config *Configuration) *serialCollector { return &serialCollector{ active: false, proc: proc, config: config, } } type tcpCollector struct { active bool processor Processor //config *Configuration } func (t *tcpCollector) Collect() { t.active = true } func (t *tcpCollector) Stop() { t.active = false } func (t *tcpCollector) SetProcessor(p Processor) { t.processor = p } func newTcp(proc Processor, config *Configuration) *tcpCollector { log.Println("start tcp collector") listener, err := net.Listen("tcp", config.TcpCollectorPort) if err != nil { fmt.Println("Error listening:", err.Error()) //os.Exit(1) } coll := &tcpCollector{ active: false, processor: proc, } go func() { for { // Listen for an incoming connection. conn, err := listener.Accept() if err != nil { fmt.Println("Error accepting: ", err.Error()) os.Exit(1) } log.Println("...new incoming tcp connection...") // Handle connections in a new goroutine. go coll.jsonHandler(conn) } }() return coll } // handles incoming tcp connections with json payload. func (c *tcpCollector) jsonHandler(conn net.Conn) { defer conn.Close() // TRY reader := bufio.NewReader(conn) OR NewScanner(conn) buf := make([]byte, 2048) for { // Read the incoming connection into the buffer. n, err := conn.Read(buf) if err != nil { fmt.Println("TCP error - reading from connection:", n, err.Error()) break } //json := pretty.Pretty(buf[:n]) //fmt.Println(string(json)) //fmt.Println(string(buf[:n])) sd, err := ConvertSensorDataPhone(buf[:n]) if err != nil { log.Println(err) continue } if !c.active { time.Sleep(50 * time.Millisecond) continue } err = c.processor.Push(sd) if err != nil { log.Fatalln(err) } } } type replayCollector struct{} func (r *replayCollector) Collect() { panic("implement me") } func (r *replayCollector) Stop() { panic("implement me") } func newDbReplay(proc Processor, config *Configuration, trackingId uuid.UUID) *replayCollector { return nil }