From 4537919579a3de41a1ecec455811504da9b94848 Mon Sep 17 00:00:00 2001 From: David Haukeness Date: Mon, 7 Mar 2022 10:53:15 -0700 Subject: [PATCH] merge patch from kf5grd --- libkeybase.go | 172 ++++++++++++++++++++++++++++++++------------- libkeybase_test.go | 7 +- 2 files changed, 125 insertions(+), 54 deletions(-) diff --git a/libkeybase.go b/libkeybase.go index 1d5a357..ede5a4e 100644 --- a/libkeybase.go +++ b/libkeybase.go @@ -4,6 +4,8 @@ import ( "bufio" "context" "encoding/json" + "fmt" + "io" "log" "os/exec" ) @@ -56,13 +58,15 @@ func (r *Options) buildBaseCommand(args ...string) []string { type ChatAPI struct { opts *Options - ctx context.Context Capacity int Reset int Gas int + inChan chan string + outChan chan string + errChan chan error } -type KeybaseApiResponse struct { +type KeybaseAPIResponse struct { Result struct { Message string `json:"message"` ID int `json:"id"` @@ -75,68 +79,142 @@ type KeybaseApiResponse struct { } `json:"result"` } -func NewApiWithContext(ctx context.Context, options *Options) *ChatAPI { +// NewAPI returns a new instance of a ChatAPI after applying the given options +func NewAPI(options *Options) *ChatAPI { return &ChatAPI{ - opts: options, - ctx: ctx, + opts: options, + inChan: make(chan string, 10), + outChan: make(chan string, 10), + errChan: make(chan error, 10), } } -func (a *ChatAPI) Start() (in, out chan string, err chan error) { - in = make(chan string, 10) - out = make(chan string, 10) - err = make(chan error, 10) - a._run(in, out, err) - return +// Start starts the message handlers and connects the pipes to the long-running keybase +// api commands +func (a *ChatAPI) Start(ctx context.Context) error { + // setup api commands and start the pipes + kbLoc := a.opts.locateKeybase() + chatAPICmd := a.opts.buildBaseCommand("chat", "api") + chatAPIListenCmd := a.opts.buildBaseCommand("chat", "api-listen") + pipes, startPipes, err := getPipes(ctx, kbLoc, chatAPICmd, chatAPIListenCmd) + if err != nil { + return fmt.Errorf("failed to get pipes: %v", err) + } + a.initPipes(ctx, pipes) + if err = startPipes(); err != nil { + return fmt.Errorf("failed to start pipes: %v", err) + } + return nil +} + +// CmdPipe holds the pipes that connect to the long-running keybase api commands +type CmdPipe struct { + Stderr io.ReadCloser + Stdin io.WriteCloser + Stdout io.ReadCloser +} + +// Pipes is an interface that holds pointers to the necessary CmdPipes +type Pipes interface { + // ChatAPI returns a pointer to the ChatAPI pipe used to send outgoing requests to the + // keybase chat api + ChatAPI() *CmdPipe + + // APIListen returns a pointer to the APIListen pipe used to receive incoming messages + // from the keybase chat api + APIListen() *CmdPipe +} + +// defaultPipes satisfies the Pipes interface, and provides standard CmdPipes that should +// be used during normal operation +type defaultPipes struct { + chatAPI *CmdPipe + chatAPIListen *CmdPipe +} + +var _ Pipes = &defaultPipes{} + +// ChatAPI returns a pointer to the ChatAPI pipe used to send outgoing requests to the +// keybase chat api +func (p *defaultPipes) ChatAPI() *CmdPipe { + return p.chatAPI +} + +// APIListen returns a pointer to the APIListen pipe used to receive incoming messages from +// the keybase chat api +func (p *defaultPipes) APIListen() *CmdPipe { + return p.chatAPIListen } -func (a *ChatAPI) _run(in, out chan string, e chan error) { - // build the base command, add homedir and args - readCmd := a.opts.buildBaseCommand("chat", "api-listen") - writeCmd := a.opts.buildBaseCommand("chat", "api") +// getPipes sets up and returns the default pipes +func getPipes(ctx context.Context, kbLoc string, chatAPICmd, chatAPIListenCmd []string) (Pipes, func() error, error) { + var err error // set up the commands - cmd_reader := exec.CommandContext(a.ctx, a.opts.locateKeybase(), readCmd...) - cmd_writer := exec.CommandContext(a.ctx, a.opts.locateKeybase(), writeCmd...) + chatAPI := exec.CommandContext(ctx, kbLoc, chatAPIListenCmd...) + chatAPIListen := exec.CommandContext(ctx, kbLoc, chatAPICmd...) - // grab the stdout pipe from the api-listen command - // it does not take stdin so only one pipe for this one - keyOut, err := cmd_reader.StdoutPipe() - if err != nil { - e <- err - return + // create the pipes + emptyStartFunc := func() error { return nil } + chatAPIPipe := CmdPipe{} + if chatAPIPipe.Stderr, err = chatAPI.StderrPipe(); err != nil { + return nil, emptyStartFunc, fmt.Errorf("failed to get pipe: %v", err) + } + if chatAPIPipe.Stdin, err = chatAPI.StdinPipe(); err != nil { + return nil, emptyStartFunc, fmt.Errorf("failed to get pipe: %v", err) + } + if chatAPIPipe.Stdout, err = chatAPI.StdoutPipe(); err != nil { + return nil, emptyStartFunc, fmt.Errorf("failed to get pipe: %v", err) } - // grab the stdin pipe for the writer - // this is to be able to send chat commands - keyIn, err := cmd_writer.StdinPipe() - if err != nil { - e <- err - return + chatAPIListenPipe := CmdPipe{} + if chatAPIListenPipe.Stderr, err = chatAPIListen.StderrPipe(); err != nil { + return nil, emptyStartFunc, fmt.Errorf("failed to get pipe: %v", err) + } + if chatAPIListenPipe.Stdin, err = chatAPIListen.StdinPipe(); err != nil { + return nil, emptyStartFunc, fmt.Errorf("failed to get pipe: %v", err) + } + if chatAPIListenPipe.Stdout, err = chatAPIListen.StdoutPipe(); err != nil { + return nil, emptyStartFunc, fmt.Errorf("failed to get pipe: %v", err) } - // grab the stdout pipe from the writer - // for now we'll just JSON decode the rate limite responses - keyResp, err := cmd_writer.StdoutPipe() - if err != nil { - e <- err - return + // create an anonymous function to start the commands + startFunc := func() error { + if err := chatAPI.Start(); err != nil { + return fmt.Errorf("failed to start chat api command: %v", err) + } + if err := chatAPIListen.Start(); err != nil { + return fmt.Errorf("failed to start chat api listen command: %v", err) + } + return nil } + return &defaultPipes{chatAPI: &chatAPIPipe, chatAPIListen: &chatAPIListenPipe}, startFunc, nil +} + +// initPipes starts the goroutines that handle getting messages from the apis and sending +// them into the appropriate channels +func (a *ChatAPI) initPipes(ctx context.Context, pipes Pipes) { + var ( + keyIn = pipes.ChatAPI().Stdin + keyResp = pipes.ChatAPI().Stdout + keyOut = pipes.APIListen().Stdout + ) + // now start read and write goroutines // reader go func() { r := bufio.NewReader(keyOut) for { select { - case <-a.ctx.Done(): + case <-ctx.Done(): return default: line, err := r.ReadString('\n') if err != nil { - e <- err + a.errChan <- err } else { - out <- line + a.outChan <- line } } } @@ -146,12 +224,12 @@ func (a *ChatAPI) _run(in, out chan string, e chan error) { go func() { for { select { - case msg := <-in: + case msg := <-a.inChan: _, err := keyIn.Write([]byte(msg)) if err != nil { - e <- err + a.errChan <- err } - case <-a.ctx.Done(): + case <-ctx.Done(): return } } @@ -162,17 +240,17 @@ func (a *ChatAPI) _run(in, out chan string, e chan error) { r := bufio.NewReader(keyResp) for { select { - case <-a.ctx.Done(): + case <-ctx.Done(): return default: line, err := r.ReadBytes('\n') if err != nil { - e <- err + a.errChan <- err } - var resp *KeybaseApiResponse + var resp *KeybaseAPIResponse err = json.Unmarshal(line, resp) if err != nil { - e <- err + a.errChan <- err } else { for _, limit := range resp.Result.Ratelimits { if limit.Tank == "chat" { @@ -185,8 +263,4 @@ func (a *ChatAPI) _run(in, out chan string, e chan error) { } } }() - - // now start the process - cmd_reader.Start() - cmd_writer.Start() } diff --git a/libkeybase_test.go b/libkeybase_test.go index 0ce3ca0..2fa87a7 100644 --- a/libkeybase_test.go +++ b/libkeybase_test.go @@ -1,7 +1,6 @@ package libkeybase import ( - "context" "reflect" "strings" "testing" @@ -44,14 +43,12 @@ func TestBaseCommand(t *testing.T) { } } -func TestNewApiWithContext(t *testing.T) { - ctx := context.Background() +func TestNewAPI(t *testing.T) { opts := NewOptions() want := &ChatAPI{ opts: opts, - ctx: ctx, } - got := NewApiWithContext(ctx, opts) + got := NewAPI(opts) if !reflect.DeepEqual(want, got) { t.Errorf("NewApiWithContext returned incorrect struct") }