From 80605f645eab7c7655a843aa713172039cb31090 Mon Sep 17 00:00:00 2001 From: David Haukeness Date: Thu, 10 Mar 2022 14:42:38 -0700 Subject: [PATCH] refactor --- chat.go | 112 +++++++++++++++++ cmd.go | 83 +++++++++++++ keybase.go | 242 +------------------------------------ keybase_test.go | 13 +- pkg/ctxreader/ctxreader.go | 29 +++++ ratelimits.go | 16 +++ 6 files changed, 242 insertions(+), 253 deletions(-) create mode 100644 chat.go create mode 100644 cmd.go create mode 100644 pkg/ctxreader/ctxreader.go create mode 100644 ratelimits.go diff --git a/chat.go b/chat.go new file mode 100644 index 0000000..a33f09f --- /dev/null +++ b/chat.go @@ -0,0 +1,112 @@ +package keybase + +import ( + "bufio" + "context" + "fmt" + "sync" + + "git.hugfreevikings.wtf/keybase/keybase/pkg/ctxreader" +) + +type ChatAPI struct { + sync.Mutex + toStdin chan []byte + fromStdout chan []byte + fromListen chan []byte + errors chan error +} + +func NewChatAPI(ctx context.Context, opts *Options) (api *ChatAPI, err error) { + // set up the channels + api.toStdin = make(chan []byte, opts.ChannelBufferSize) + api.fromStdout = make(chan []byte, opts.ChannelBufferSize) + api.fromListen = make(chan []byte, opts.ChannelBufferSize) + api.errors = make(chan error, opts.ChannelBufferSize) + + // get the basics + kbCmd := opts.locateKeybase() + chatApiArgs := opts.buildArgs("chat", "api") + chatApiListenArgs := opts.buildArgs("chat", "api-listen") + + // build the commands + chatApi, err := newApiCmd(ctx, kbCmd, chatApiArgs...) + if err != nil { + return nil, err + } + chatListen, err := newApiCmd(ctx, kbCmd, chatApiListenArgs...) + if err != nil { + return nil, err + } + + // create the goroutines + // listen reader + go func() { + cr := ctxreader.NewContextReader(ctx, chatListen.Stdout()) + scanner := bufio.NewScanner(cr) + for scanner.Scan() { + api.fromListen <- scanner.Bytes() + } + }() + + // writing to stdin + go func() { + for { + select { + case msg := <-api.toStdin: + _, err := chatApi.Stdin().Write(msg) + if err != nil { + api.errors <- err + } + case <-ctx.Done(): + return + } + } + }() + + // reading from stdout + go func() { + cr := ctxreader.NewContextReader(ctx, chatApi.Stdout()) + scanner := bufio.NewScanner(cr) + for scanner.Scan() { + api.fromStdout <- scanner.Bytes() + } + }() + + // reading from stderr + go func() { + cr := ctxreader.NewContextReader(ctx, chatApi.Stderr()) + scanner := bufio.NewScanner(cr) + for scanner.Scan() { + api.errors <- fmt.Errorf("%v", scanner.Text()) + } + }() + + // then start the cmds + err = chatApi.Start() + if err != nil { + return nil, err + } + err = chatListen.Start() + if err != nil { + return nil, err + } + + return +} + +func (c *ChatAPI) Listen() chan []byte { + return c.fromListen +} + +func (c *ChatAPI) SendRaw(msg []byte) ([]byte, error) { + c.Lock() + defer c.Unlock() + c.toStdin <- msg + select { + case resp := <-c.fromStdout: + return resp, nil + case err := <-c.errors: + return nil, err + } +} diff --git a/cmd.go b/cmd.go new file mode 100644 index 0000000..8b14e8a --- /dev/null +++ b/cmd.go @@ -0,0 +1,83 @@ +package keybase + +import ( + "context" + "fmt" + "io" + "os/exec" +) + +// apiCmd holds the pipes that connect to an executed command +type apiCmd struct { + Stderr io.ReadCloser + Stdin io.WriteCloser + Stdout io.ReadCloser + startFunc func() error +} + +type cmd interface { + Start() error + Stdin() io.WriteCloser + Stdout() io.ReadCloser + Stderr() io.ReadCloser +} + +type defaultCmd struct { + apiCmd *apiCmd +} + +func (c *defaultCmd) Start() error { + return c.apiCmd.startFunc() +} + +func (c *defaultCmd) Stdin() io.WriteCloser { + return c.apiCmd.Stdin +} + +func (c *defaultCmd) Stdout() io.ReadCloser { + return c.apiCmd.Stdout +} + +func (c *defaultCmd) Stderr() io.ReadCloser { + return c.apiCmd.Stderr +} + +var _ cmd = &defaultCmd{} + +func newApiCmd(ctx context.Context, execCmd string, args ...string) (cmd, error) { + var err error + + // create a new apiCmd + cmd := &apiCmd{} + + // set up the command + api := exec.CommandContext(ctx, execCmd, args...) + + // an empty start func for failures + cmd.startFunc = func() error { return nil } + + // hook up the stderr pipe + if cmd.Stderr, err = api.StderrPipe(); err != nil { + return nil, fmt.Errorf("failed to get pipe: %v", err) + } + + // hook up the stdout pipe + if cmd.Stdout, err = api.StdoutPipe(); err != nil { + return nil, fmt.Errorf("failed to get pipe: %v", err) + } + + // hook up the stdin pipe + if cmd.Stdin, err = api.StdinPipe(); err != nil { + return nil, fmt.Errorf("failed to get pipe: %v", err) + } + + // create an anonymous function to start the commands + cmd.startFunc = func() error { + if err := api.Start(); err != nil { + return fmt.Errorf("failed to start command: %v", err) + } + return nil + } + + return &defaultCmd{cmd}, nil +} diff --git a/keybase.go b/keybase.go index da193f7..df484b3 100644 --- a/keybase.go +++ b/keybase.go @@ -1,15 +1,8 @@ package keybase import ( - "bufio" - "bytes" - "context" - "encoding/json" - "fmt" - "io" "log" "os/exec" - "sync" ) // Options holds... run... options... @@ -46,7 +39,7 @@ func (opt *Options) locateKeybase() string { } // buildBaseCommand adds the homedirectory before the args, when required -func (opt *Options) buildBaseCommand(args ...string) []string { +func (opt *Options) buildArgs(args ...string) []string { var cmd []string if opt.HomeDir != "" { cmd = append(cmd, "--home", opt.HomeDir) @@ -57,236 +50,3 @@ func (opt *Options) buildBaseCommand(args ...string) []string { cmd = append(cmd, args...) return cmd } - -type ChatAPI struct { - sync.Mutex - opts *Options - Capacity int - Reset int - Gas int - inChan chan []byte - outChan chan []byte - errChan chan error -} - -type KeybaseAPIResponse struct { - Result struct { - Message string `json:"message"` - ID int `json:"id"` - Ratelimits []struct { - Tank string `json:"tank"` - Capacity int `json:"capacity"` - Reset int `json:"reset"` - Gas int `json:"gas"` - } `json:"ratelimits"` - } `json:"result"` -} - -// NewAPI returns a new instance of a ChatAPI after applying the given options -func NewAPI(options *Options) *ChatAPI { - return &ChatAPI{ - opts: options, - inChan: make(chan []byte, options.ChannelBufferSize), - outChan: make(chan []byte, options.ChannelBufferSize), - errChan: make(chan error, options.ChannelBufferSize), - } -} - -// Start starts the message handlers and connects the pipes to the long-running keybase -// api commands -func (a *ChatAPI) Start(ctx context.Context) error { - // setup api commands and start the pipes - kbLoc := a.opts.locateKeybase() - chatAPICmd := a.opts.buildBaseCommand("chat", "api") - chatAPIListenCmd := a.opts.buildBaseCommand("chat", "api-listen") - pipes, startPipes, err := getPipes(ctx, kbLoc, chatAPICmd, chatAPIListenCmd) - if err != nil { - return fmt.Errorf("failed to get pipes: %v", err) - } - a.initPipes(ctx, pipes) - if err = startPipes(); err != nil { - return fmt.Errorf("failed to start pipes: %v", err) - } - return nil -} - -// SendRaw sends raw JSON bytes to the API without helper functions -func (a *ChatAPI) SendRaw(msg []byte) ([]byte, error) { - a.Lock() - defer a.Unlock() - a.inChan <- msg - select { - case err := <-a.errChan: - return nil, err - case resp := <-a.outChan: - return resp, nil - } -} - -// CmdPipe holds the pipes that connect to the long-running keybase api commands -type CmdPipe struct { - Stderr io.ReadCloser - Stdin io.WriteCloser - Stdout io.ReadCloser -} - -// Pipes is an interface that holds pointers to the necessary CmdPipes -type Pipes interface { - // ChatAPI returns a pointer to the ChatAPI pipe used to send outgoing requests to the - // keybase chat api - ChatAPI() *CmdPipe - - // APIListen returns a pointer to the APIListen pipe used to receive incoming messages - // from the keybase chat api - APIListen() *CmdPipe -} - -// defaultPipes satisfies the Pipes interface, and provides standard CmdPipes that should -// be used during normal operation -type defaultPipes struct { - chatAPI *CmdPipe - chatAPIListen *CmdPipe -} - -var _ Pipes = &defaultPipes{} - -// ChatAPI returns a pointer to the ChatAPI pipe used to send outgoing requests to the -// keybase chat api -func (p *defaultPipes) ChatAPI() *CmdPipe { - return p.chatAPI -} - -// APIListen returns a pointer to the APIListen pipe used to receive incoming messages from -// the keybase chat api -func (p *defaultPipes) APIListen() *CmdPipe { - return p.chatAPIListen -} - -// getPipes sets up and returns the default pipes -func getPipes(ctx context.Context, kbLoc string, chatAPICmd, chatAPIListenCmd []string) (Pipes, func() error, error) { - var err error - - // set up the commands - chatAPI := exec.CommandContext(ctx, kbLoc, chatAPIListenCmd...) - chatAPIListen := exec.CommandContext(ctx, kbLoc, chatAPICmd...) - - // create the pipes - emptyStartFunc := func() error { return nil } - chatAPIPipe := CmdPipe{} - if chatAPIPipe.Stderr, err = chatAPI.StderrPipe(); err != nil { - return nil, emptyStartFunc, fmt.Errorf("failed to get pipe: %v", err) - } - if chatAPIPipe.Stdin, err = chatAPI.StdinPipe(); err != nil { - return nil, emptyStartFunc, fmt.Errorf("failed to get pipe: %v", err) - } - if chatAPIPipe.Stdout, err = chatAPI.StdoutPipe(); err != nil { - return nil, emptyStartFunc, fmt.Errorf("failed to get pipe: %v", err) - } - - chatAPIListenPipe := CmdPipe{} - if chatAPIListenPipe.Stderr, err = chatAPIListen.StderrPipe(); err != nil { - return nil, emptyStartFunc, fmt.Errorf("failed to get pipe: %v", err) - } - if chatAPIListenPipe.Stdin, err = chatAPIListen.StdinPipe(); err != nil { - return nil, emptyStartFunc, fmt.Errorf("failed to get pipe: %v", err) - } - if chatAPIListenPipe.Stdout, err = chatAPIListen.StdoutPipe(); err != nil { - return nil, emptyStartFunc, fmt.Errorf("failed to get pipe: %v", err) - } - - // create an anonymous function to start the commands - startFunc := func() error { - if err := chatAPI.Start(); err != nil { - return fmt.Errorf("failed to start chat api command: %v", err) - } - if err := chatAPIListen.Start(); err != nil { - return fmt.Errorf("failed to start chat api listen command: %v", err) - } - return nil - } - - return &defaultPipes{chatAPI: &chatAPIPipe, chatAPIListen: &chatAPIListenPipe}, startFunc, nil -} - -// initPipes starts the goroutines that handle getting messages from the apis and sending -// them into the appropriate channels -func (a *ChatAPI) initPipes(ctx context.Context, pipes Pipes) { - var ( - keyIn = pipes.ChatAPI().Stdin - keyResp = pipes.ChatAPI().Stdout - keyOut = pipes.APIListen().Stdout - ) - - // now start read and write goroutines - // reader - go func() { - r := bufio.NewReader(keyOut) - var buf bytes.Buffer - for { - select { - case <-ctx.Done(): - return - default: - line, isPrefix, err := r.ReadLine() - if err != nil { - a.errChan <- err - } - if isPrefix { - // if its not a complete line, write it to the buffer only so we loop again - buf.Write(line) - } else { - // if the line is complete, write it to the buffer - buf.Write(line) - // then write the entire buffer to the channel - a.outChan <- buf.Bytes() - // then reset the buffer so it can be re-used - buf.Reset() - } - } - } - }() - - // writer - go func() { - for { - select { - case msg := <-a.inChan: - _, err := keyIn.Write(msg) - if err != nil { - a.errChan <- err - } - case <-ctx.Done(): - return - } - } - }() - - // writer responses to fill up gas limits - go func() { - r := bufio.NewReader(keyResp) - for { - select { - case <-ctx.Done(): - return - default: - line, err := r.ReadBytes('\n') - if err != nil { - a.errChan <- err - } - var resp *KeybaseAPIResponse - err = json.Unmarshal(line, resp) - if err != nil { - a.errChan <- err - } else { - for _, limit := range resp.Result.Ratelimits { - if limit.Tank == "chat" { - a.Capacity = limit.Capacity - a.Gas = limit.Gas - a.Reset = limit.Reset - } - } - } - } - } - }() -} diff --git a/keybase_test.go b/keybase_test.go index 1a87727..17b8229 100644 --- a/keybase_test.go +++ b/keybase_test.go @@ -35,19 +35,8 @@ func TestBaseCommand(t *testing.T) { opts := NewOptions() opts.HomeDir = "/home/foo" want := []string{"--home", "/home/foo", "--enable-bot-lite-mode", "arg"} - got := opts.buildBaseCommand("arg") + got := opts.buildArgs("arg") if !reflect.DeepEqual(want, got) { t.Errorf("buildBaseCommand returned invalid command set") } } - -func TestNewAPI(t *testing.T) { - opts := NewOptions() - want := &ChatAPI{ - opts: opts, - } - got := NewAPI(opts) - if !reflect.DeepEqual(want, got) { - t.Errorf("NewApiWithContext returned incorrect struct") - } -} diff --git a/pkg/ctxreader/ctxreader.go b/pkg/ctxreader/ctxreader.go new file mode 100644 index 0000000..fa83e0b --- /dev/null +++ b/pkg/ctxreader/ctxreader.go @@ -0,0 +1,29 @@ +package ctxreader + +import ( + "context" + "io" +) + +type ContextReader struct { + ctx context.Context + r io.ReadCloser +} + +func (cr *ContextReader) Read(p []byte) (n int, err error) { + if err := cr.ctx.Err(); err != nil { + return 0, err + } + return cr.r.Read(p) +} + +func (cr *ContextReader) Close() error { + return cr.r.Close() +} + +func NewContextReader(ctx context.Context, r io.ReadCloser) io.ReadCloser { + return &ContextReader{ + ctx: ctx, + r: r, + } +} diff --git a/ratelimits.go b/ratelimits.go new file mode 100644 index 0000000..96c0ad5 --- /dev/null +++ b/ratelimits.go @@ -0,0 +1,16 @@ +package keybase + +type RateLimit struct { + Tank string `json:"tank"` + Capacity int `json:"capacity"` + Reset int `json:"reset"` + Gas int `json:"gas"` +} + +type KeybaseApiResponse struct { + Result struct { + Message string `json:"message"` + ID int `json:"id"` + RateLimits []RateLimit `json:"ratelimits"` + } `json:"result"` +}