From 3bcca849088b7268c411dfbb44a2a78236873c97 Mon Sep 17 00:00:00 2001 From: Sam Date: Sat, 29 Feb 2020 00:24:40 -0500 Subject: [PATCH] Allow different types of notifications to come through separate channels --- chat.go | 162 +++++++++++++++++++++++++++++++++++++++++++------------ types.go | 1 - 2 files changed, 129 insertions(+), 34 deletions(-) diff --git a/chat.go b/chat.go index 0545e8c..ac23043 100644 --- a/chat.go +++ b/chat.go @@ -11,8 +11,44 @@ import ( "time" "samhofi.us/x/keybase/types/chat1" + "samhofi.us/x/keybase/types/stellar1" ) +type SubscriptionType struct { + Type string `json:"type"` +} + +type SubscriptionMessage struct { + Message chat1.MsgSummary + Conversation chat1.ConvSummary +} + +type SubscriptionConversation struct { + Conversation chat1.ConvSummary +} + +type SubscriptionWalletEvent struct { + Payment stellar1.PaymentDetailsLocal +} + +type PaymentHolder struct { + Payment stellar1.PaymentDetailsLocal `json:"notification"` +} + +type Handlers struct { + ChatHandler *func(SubscriptionMessage) + ConversationHandler *func(SubscriptionConversation) + WalletHandler *func(SubscriptionWalletEvent) + ErrorHandler *func(error) +} + +type SubscriptionChannels struct { + chat chan SubscriptionMessage + conversation chan SubscriptionConversation + wallet chan SubscriptionWalletEvent + error chan error +} + // Returns a string representation of a message id suitable for use in a // pagination struct func getID(id uint) string { @@ -62,7 +98,7 @@ func createFiltersString(channels []chat1.ChatChannel) string { } // Run `keybase chat api-listen` to get new messages coming into keybase and send them into the channel -func getNewMessages(k *Keybase, c chan<- ChatAPI, execOptions []string) { +func getNewMessages(k *Keybase, subs *SubscriptionChannels, execOptions []string) { execString := []string{"chat", "api-listen"} if len(execOptions) > 0 { execString = append(execString, execOptions...) @@ -72,24 +108,60 @@ func getNewMessages(k *Keybase, c chan<- ChatAPI, execOptions []string) { stdOut, _ := execCmd.StdoutPipe() execCmd.Start() scanner := bufio.NewScanner(stdOut) - go func(scanner *bufio.Scanner, c chan<- ChatAPI) { - for scanner.Scan() { - var jsonData ChatAPI - json.Unmarshal([]byte(scanner.Text()), &jsonData) - if jsonData.ErrorRaw != nil { - var errorListen = string(*jsonData.ErrorRaw) - jsonData.ErrorListen = &errorListen + go func(scanner *bufio.Scanner, subs *SubscriptionChannels) { + for { + scanner.Scan() + var subType SubscriptionType + t := scanner.Text() + json.Unmarshal([]byte(t), &subType) + switch subType.Type { + case "chat": + var notification chat1.MsgNotification + if err := json.Unmarshal([]byte(t), ¬ification); err != nil { + subs.error <- err + break + } + if notification.Msg != nil { + subscriptionMessage := SubscriptionMessage{ + Message: *notification.Msg, + Conversation: chat1.ConvSummary{ + Id: notification.Msg.ConvID, + Channel: notification.Msg.Channel, + }, + } + subs.chat <- subscriptionMessage + } + case "chat_conv": + var notification chat1.ConvNotification + if err := json.Unmarshal([]byte(t), ¬ification); err != nil { + subs.error <- err + break + } + if notification.Conv != nil { + subscriptionConv := SubscriptionConversation{ + Conversation: *notification.Conv, + } + subs.conversation <- subscriptionConv + } + case "wallet": + var holder PaymentHolder + if err := json.Unmarshal([]byte(t), &holder); err != nil { + subs.error <- err + break + } + subscriptionPayment := SubscriptionWalletEvent(holder) + subs.wallet <- subscriptionPayment + default: + continue } - c <- jsonData } - }(scanner, c) + }(scanner, subs) execCmd.Wait() } } // Run runs `keybase chat api-listen`, and passes incoming messages to the message handler func -func (k *Keybase) Run(handler func(ChatAPI), options ...RunOptions) { - var heartbeatFreq int64 +func (k *Keybase) Run(handlers Handlers, options ...RunOptions) { var channelCapacity = 100 runOptions := make([]string, 0) @@ -97,8 +169,8 @@ func (k *Keybase) Run(handler func(ChatAPI), options ...RunOptions) { if options[0].Capacity > 0 { channelCapacity = options[0].Capacity } - if options[0].Heartbeat > 0 { - heartbeatFreq = options[0].Heartbeat + if options[0].Wallet { + runOptions = append(runOptions, "--wallet") } if options[0].Local { runOptions = append(runOptions, "--local") @@ -119,28 +191,52 @@ func (k *Keybase) Run(handler func(ChatAPI), options ...RunOptions) { runOptions = append(runOptions, createFilterString(options[0].FilterChannel)) } } - c := make(chan ChatAPI, channelCapacity) - defer close(c) - if heartbeatFreq > 0 { - go heartbeat(c, time.Duration(heartbeatFreq)*time.Minute) - } - go getNewMessages(k, c, runOptions) - for { - go handler(<-c) - } -} -// heartbeat sends a message through the channel with a message type of `heartbeat` -func heartbeat(c chan<- ChatAPI, freq time.Duration) { - m := ChatAPI{ - Type: "heartbeat", + chatCh := make(chan SubscriptionMessage, channelCapacity) + convCh := make(chan SubscriptionConversation, channelCapacity) + walletCh := make(chan SubscriptionWalletEvent, channelCapacity) + errorCh := make(chan error, channelCapacity) + + subs := &SubscriptionChannels{ + chat: chatCh, + conversation: convCh, + wallet: walletCh, + error: errorCh, } - count := 0 + + defer close(subs.chat) + defer close(subs.conversation) + defer close(subs.wallet) + defer close(subs.error) + + go getNewMessages(k, subs, runOptions) for { - time.Sleep(freq) - m.Msg.ID = count - c <- m - count++ + select { + case chatMsg := <-subs.chat: + if handlers.ChatHandler == nil { + continue + } + chatHandler := *handlers.ChatHandler + go chatHandler(chatMsg) + case walletMsg := <-subs.wallet: + if handlers.WalletHandler == nil { + continue + } + walletHandler := *handlers.WalletHandler + go walletHandler(walletMsg) + case newConv := <-subs.conversation: + if handlers.ConversationHandler == nil { + continue + } + convHandler := *handlers.ConversationHandler + go convHandler(newConv) + case errMsg := <-subs.error: + if handlers.ErrorHandler == nil { + continue + } + errHandler := *handlers.ErrorHandler + go errHandler(errMsg) + } } } diff --git a/types.go b/types.go index aaa4dfc..9ad10f8 100644 --- a/types.go +++ b/types.go @@ -12,7 +12,6 @@ import ( // RunOptions holds a set of options to be passed to Run type RunOptions struct { Capacity int // Channel capacity for the buffered channel that holds messages. Defaults to 100 if not set - Heartbeat int64 // Send a heartbeat through the channel every X minutes (0 = off) Local bool // Subscribe to local messages HideExploding bool // Ignore exploding messages Dev bool // Subscribe to dev channel messages