Browse Source

Allow different types of notifications to come through separate channels

main
Sam 4 years ago
parent
commit
3bcca84908
  1. 162
      chat.go
  2. 1
      types.go

162
chat.go

@ -11,8 +11,44 @@ import ( @@ -11,8 +11,44 @@ import (
"time"
"samhofi.us/x/keybase/types/chat1"
"samhofi.us/x/keybase/types/stellar1"
)
type SubscriptionType struct {
Type string `json:"type"`
}
type SubscriptionMessage struct {
Message chat1.MsgSummary
Conversation chat1.ConvSummary
}
type SubscriptionConversation struct {
Conversation chat1.ConvSummary
}
type SubscriptionWalletEvent struct {
Payment stellar1.PaymentDetailsLocal
}
type PaymentHolder struct {
Payment stellar1.PaymentDetailsLocal `json:"notification"`
}
type Handlers struct {
ChatHandler *func(SubscriptionMessage)
ConversationHandler *func(SubscriptionConversation)
WalletHandler *func(SubscriptionWalletEvent)
ErrorHandler *func(error)
}
type SubscriptionChannels struct {
chat chan SubscriptionMessage
conversation chan SubscriptionConversation
wallet chan SubscriptionWalletEvent
error chan error
}
// Returns a string representation of a message id suitable for use in a
// pagination struct
func getID(id uint) string {
@ -62,7 +98,7 @@ func createFiltersString(channels []chat1.ChatChannel) string { @@ -62,7 +98,7 @@ func createFiltersString(channels []chat1.ChatChannel) string {
}
// Run `keybase chat api-listen` to get new messages coming into keybase and send them into the channel
func getNewMessages(k *Keybase, c chan<- ChatAPI, execOptions []string) {
func getNewMessages(k *Keybase, subs *SubscriptionChannels, execOptions []string) {
execString := []string{"chat", "api-listen"}
if len(execOptions) > 0 {
execString = append(execString, execOptions...)
@ -72,24 +108,60 @@ func getNewMessages(k *Keybase, c chan<- ChatAPI, execOptions []string) { @@ -72,24 +108,60 @@ func getNewMessages(k *Keybase, c chan<- ChatAPI, execOptions []string) {
stdOut, _ := execCmd.StdoutPipe()
execCmd.Start()
scanner := bufio.NewScanner(stdOut)
go func(scanner *bufio.Scanner, c chan<- ChatAPI) {
for scanner.Scan() {
var jsonData ChatAPI
json.Unmarshal([]byte(scanner.Text()), &jsonData)
if jsonData.ErrorRaw != nil {
var errorListen = string(*jsonData.ErrorRaw)
jsonData.ErrorListen = &errorListen
go func(scanner *bufio.Scanner, subs *SubscriptionChannels) {
for {
scanner.Scan()
var subType SubscriptionType
t := scanner.Text()
json.Unmarshal([]byte(t), &subType)
switch subType.Type {
case "chat":
var notification chat1.MsgNotification
if err := json.Unmarshal([]byte(t), &notification); err != nil {
subs.error <- err
break
}
if notification.Msg != nil {
subscriptionMessage := SubscriptionMessage{
Message: *notification.Msg,
Conversation: chat1.ConvSummary{
Id: notification.Msg.ConvID,
Channel: notification.Msg.Channel,
},
}
subs.chat <- subscriptionMessage
}
case "chat_conv":
var notification chat1.ConvNotification
if err := json.Unmarshal([]byte(t), &notification); err != nil {
subs.error <- err
break
}
if notification.Conv != nil {
subscriptionConv := SubscriptionConversation{
Conversation: *notification.Conv,
}
subs.conversation <- subscriptionConv
}
case "wallet":
var holder PaymentHolder
if err := json.Unmarshal([]byte(t), &holder); err != nil {
subs.error <- err
break
}
subscriptionPayment := SubscriptionWalletEvent(holder)
subs.wallet <- subscriptionPayment
default:
continue
}
c <- jsonData
}
}(scanner, c)
}(scanner, subs)
execCmd.Wait()
}
}
// Run runs `keybase chat api-listen`, and passes incoming messages to the message handler func
func (k *Keybase) Run(handler func(ChatAPI), options ...RunOptions) {
var heartbeatFreq int64
func (k *Keybase) Run(handlers Handlers, options ...RunOptions) {
var channelCapacity = 100
runOptions := make([]string, 0)
@ -97,8 +169,8 @@ func (k *Keybase) Run(handler func(ChatAPI), options ...RunOptions) { @@ -97,8 +169,8 @@ func (k *Keybase) Run(handler func(ChatAPI), options ...RunOptions) {
if options[0].Capacity > 0 {
channelCapacity = options[0].Capacity
}
if options[0].Heartbeat > 0 {
heartbeatFreq = options[0].Heartbeat
if options[0].Wallet {
runOptions = append(runOptions, "--wallet")
}
if options[0].Local {
runOptions = append(runOptions, "--local")
@ -119,28 +191,52 @@ func (k *Keybase) Run(handler func(ChatAPI), options ...RunOptions) { @@ -119,28 +191,52 @@ func (k *Keybase) Run(handler func(ChatAPI), options ...RunOptions) {
runOptions = append(runOptions, createFilterString(options[0].FilterChannel))
}
}
c := make(chan ChatAPI, channelCapacity)
defer close(c)
if heartbeatFreq > 0 {
go heartbeat(c, time.Duration(heartbeatFreq)*time.Minute)
}
go getNewMessages(k, c, runOptions)
for {
go handler(<-c)
}
}
// heartbeat sends a message through the channel with a message type of `heartbeat`
func heartbeat(c chan<- ChatAPI, freq time.Duration) {
m := ChatAPI{
Type: "heartbeat",
chatCh := make(chan SubscriptionMessage, channelCapacity)
convCh := make(chan SubscriptionConversation, channelCapacity)
walletCh := make(chan SubscriptionWalletEvent, channelCapacity)
errorCh := make(chan error, channelCapacity)
subs := &SubscriptionChannels{
chat: chatCh,
conversation: convCh,
wallet: walletCh,
error: errorCh,
}
count := 0
defer close(subs.chat)
defer close(subs.conversation)
defer close(subs.wallet)
defer close(subs.error)
go getNewMessages(k, subs, runOptions)
for {
time.Sleep(freq)
m.Msg.ID = count
c <- m
count++
select {
case chatMsg := <-subs.chat:
if handlers.ChatHandler == nil {
continue
}
chatHandler := *handlers.ChatHandler
go chatHandler(chatMsg)
case walletMsg := <-subs.wallet:
if handlers.WalletHandler == nil {
continue
}
walletHandler := *handlers.WalletHandler
go walletHandler(walletMsg)
case newConv := <-subs.conversation:
if handlers.ConversationHandler == nil {
continue
}
convHandler := *handlers.ConversationHandler
go convHandler(newConv)
case errMsg := <-subs.error:
if handlers.ErrorHandler == nil {
continue
}
errHandler := *handlers.ErrorHandler
go errHandler(errMsg)
}
}
}

1
types.go

@ -12,7 +12,6 @@ import ( @@ -12,7 +12,6 @@ import (
// RunOptions holds a set of options to be passed to Run
type RunOptions struct {
Capacity int // Channel capacity for the buffered channel that holds messages. Defaults to 100 if not set
Heartbeat int64 // Send a heartbeat through the channel every X minutes (0 = off)
Local bool // Subscribe to local messages
HideExploding bool // Ignore exploding messages
Dev bool // Subscribe to dev channel messages

Loading…
Cancel
Save