package core import ( "errors" "github.com/sirupsen/logrus" "golang.org/x/sync/semaphore" ) // dispatcher is responsible to distribute messages to subscribed listeners type dispatcher struct { listeners map[int16]chan string counter int16 sem *semaphore.Weighted } // Returns initialized dispatcher. func NewDispatcher() *dispatcher { return &dispatcher{ listeners: make(map[int16]chan string), counter: 0, sem: semaphore.NewWeighted(1), } } // disable or enable streaming without removing all listeners from dispatcher func (d *dispatcher) SetStreaming(s bool) bool { if ok := d.sem.TryAcquire(1); s && ok { // if you want to turn on and can get semaphore then return success return true } else if !s && !ok { // if you want to turn off and cant get semaphore, you can safely turn off by releasing semaphore and return success d.sem.Release(1) return true } return false } // if closed, dispatcher will not forward published messages and drops them. func (d *dispatcher) IsClosed() bool { if d.sem.TryAcquire(1) { d.sem.Release(1) return true } return false } // publishes message to all subscribed listeners. // if dispatcher closed, dispatcher will not forward published messages and drops them. func (d *dispatcher) Publish(message string) { if d.IsClosed() { return } logrus.Tracef("publishing to %v listeners\n", len(d.listeners)) for _, ch := range d.listeners { select { case ch <- message: default: logrus.Traceln("dispatcher: skip closed channel") } } } // Registers new client as listener and returns his id and a channel which is used to receive all messages. func (d *dispatcher) Subscribe() (id int16, receiver <-chan string) { key := d.counter d.counter++ rec := make(chan string) d.listeners[key] = rec return key, rec } // Unsubscribes Listener with given ID. // if listener with given ID exists, it will be deleted and no error will be returned. func (d *dispatcher) Unsubscribe(id int16) error { receiver, ok := d.listeners[id] if !ok { return errors.New("no subscription with id") } delete(d.listeners, id) close(receiver) return nil }