Browse Source

refactor

main
David Haukeness 2 years ago
parent
commit
80605f645e
  1. 112
      chat.go
  2. 83
      cmd.go
  3. 242
      keybase.go
  4. 13
      keybase_test.go
  5. 29
      pkg/ctxreader/ctxreader.go
  6. 16
      ratelimits.go

112
chat.go

@ -0,0 +1,112 @@ @@ -0,0 +1,112 @@
package keybase
import (
"bufio"
"context"
"fmt"
"sync"
"git.hugfreevikings.wtf/keybase/keybase/pkg/ctxreader"
)
type ChatAPI struct {
sync.Mutex
toStdin chan []byte
fromStdout chan []byte
fromListen chan []byte
errors chan error
}
func NewChatAPI(ctx context.Context, opts *Options) (api *ChatAPI, err error) {
// set up the channels
api.toStdin = make(chan []byte, opts.ChannelBufferSize)
api.fromStdout = make(chan []byte, opts.ChannelBufferSize)
api.fromListen = make(chan []byte, opts.ChannelBufferSize)
api.errors = make(chan error, opts.ChannelBufferSize)
// get the basics
kbCmd := opts.locateKeybase()
chatApiArgs := opts.buildArgs("chat", "api")
chatApiListenArgs := opts.buildArgs("chat", "api-listen")
// build the commands
chatApi, err := newApiCmd(ctx, kbCmd, chatApiArgs...)
if err != nil {
return nil, err
}
chatListen, err := newApiCmd(ctx, kbCmd, chatApiListenArgs...)
if err != nil {
return nil, err
}
// create the goroutines
// listen reader
go func() {
cr := ctxreader.NewContextReader(ctx, chatListen.Stdout())
scanner := bufio.NewScanner(cr)
for scanner.Scan() {
api.fromListen <- scanner.Bytes()
}
}()
// writing to stdin
go func() {
for {
select {
case msg := <-api.toStdin:
_, err := chatApi.Stdin().Write(msg)
if err != nil {
api.errors <- err
}
case <-ctx.Done():
return
}
}
}()
// reading from stdout
go func() {
cr := ctxreader.NewContextReader(ctx, chatApi.Stdout())
scanner := bufio.NewScanner(cr)
for scanner.Scan() {
api.fromStdout <- scanner.Bytes()
}
}()
// reading from stderr
go func() {
cr := ctxreader.NewContextReader(ctx, chatApi.Stderr())
scanner := bufio.NewScanner(cr)
for scanner.Scan() {
api.errors <- fmt.Errorf("%v", scanner.Text())
}
}()
// then start the cmds
err = chatApi.Start()
if err != nil {
return nil, err
}
err = chatListen.Start()
if err != nil {
return nil, err
}
return
}
func (c *ChatAPI) Listen() chan []byte {
return c.fromListen
}
func (c *ChatAPI) SendRaw(msg []byte) ([]byte, error) {
c.Lock()
defer c.Unlock()
c.toStdin <- msg
select {
case resp := <-c.fromStdout:
return resp, nil
case err := <-c.errors:
return nil, err
}
}

83
cmd.go

@ -0,0 +1,83 @@ @@ -0,0 +1,83 @@
package keybase
import (
"context"
"fmt"
"io"
"os/exec"
)
// apiCmd holds the pipes that connect to an executed command
type apiCmd struct {
Stderr io.ReadCloser
Stdin io.WriteCloser
Stdout io.ReadCloser
startFunc func() error
}
type cmd interface {
Start() error
Stdin() io.WriteCloser
Stdout() io.ReadCloser
Stderr() io.ReadCloser
}
type defaultCmd struct {
apiCmd *apiCmd
}
func (c *defaultCmd) Start() error {
return c.apiCmd.startFunc()
}
func (c *defaultCmd) Stdin() io.WriteCloser {
return c.apiCmd.Stdin
}
func (c *defaultCmd) Stdout() io.ReadCloser {
return c.apiCmd.Stdout
}
func (c *defaultCmd) Stderr() io.ReadCloser {
return c.apiCmd.Stderr
}
var _ cmd = &defaultCmd{}
func newApiCmd(ctx context.Context, execCmd string, args ...string) (cmd, error) {
var err error
// create a new apiCmd
cmd := &apiCmd{}
// set up the command
api := exec.CommandContext(ctx, execCmd, args...)
// an empty start func for failures
cmd.startFunc = func() error { return nil }
// hook up the stderr pipe
if cmd.Stderr, err = api.StderrPipe(); err != nil {
return nil, fmt.Errorf("failed to get pipe: %v", err)
}
// hook up the stdout pipe
if cmd.Stdout, err = api.StdoutPipe(); err != nil {
return nil, fmt.Errorf("failed to get pipe: %v", err)
}
// hook up the stdin pipe
if cmd.Stdin, err = api.StdinPipe(); err != nil {
return nil, fmt.Errorf("failed to get pipe: %v", err)
}
// create an anonymous function to start the commands
cmd.startFunc = func() error {
if err := api.Start(); err != nil {
return fmt.Errorf("failed to start command: %v", err)
}
return nil
}
return &defaultCmd{cmd}, nil
}

242
keybase.go

@ -1,15 +1,8 @@ @@ -1,15 +1,8 @@
package keybase
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"os/exec"
"sync"
)
// Options holds... run... options...
@ -46,7 +39,7 @@ func (opt *Options) locateKeybase() string { @@ -46,7 +39,7 @@ func (opt *Options) locateKeybase() string {
}
// buildBaseCommand adds the homedirectory before the args, when required
func (opt *Options) buildBaseCommand(args ...string) []string {
func (opt *Options) buildArgs(args ...string) []string {
var cmd []string
if opt.HomeDir != "" {
cmd = append(cmd, "--home", opt.HomeDir)
@ -57,236 +50,3 @@ func (opt *Options) buildBaseCommand(args ...string) []string { @@ -57,236 +50,3 @@ func (opt *Options) buildBaseCommand(args ...string) []string {
cmd = append(cmd, args...)
return cmd
}
type ChatAPI struct {
sync.Mutex
opts *Options
Capacity int
Reset int
Gas int
inChan chan []byte
outChan chan []byte
errChan chan error
}
type KeybaseAPIResponse struct {
Result struct {
Message string `json:"message"`
ID int `json:"id"`
Ratelimits []struct {
Tank string `json:"tank"`
Capacity int `json:"capacity"`
Reset int `json:"reset"`
Gas int `json:"gas"`
} `json:"ratelimits"`
} `json:"result"`
}
// NewAPI returns a new instance of a ChatAPI after applying the given options
func NewAPI(options *Options) *ChatAPI {
return &ChatAPI{
opts: options,
inChan: make(chan []byte, options.ChannelBufferSize),
outChan: make(chan []byte, options.ChannelBufferSize),
errChan: make(chan error, options.ChannelBufferSize),
}
}
// Start starts the message handlers and connects the pipes to the long-running keybase
// api commands
func (a *ChatAPI) Start(ctx context.Context) error {
// setup api commands and start the pipes
kbLoc := a.opts.locateKeybase()
chatAPICmd := a.opts.buildBaseCommand("chat", "api")
chatAPIListenCmd := a.opts.buildBaseCommand("chat", "api-listen")
pipes, startPipes, err := getPipes(ctx, kbLoc, chatAPICmd, chatAPIListenCmd)
if err != nil {
return fmt.Errorf("failed to get pipes: %v", err)
}
a.initPipes(ctx, pipes)
if err = startPipes(); err != nil {
return fmt.Errorf("failed to start pipes: %v", err)
}
return nil
}
// SendRaw sends raw JSON bytes to the API without helper functions
func (a *ChatAPI) SendRaw(msg []byte) ([]byte, error) {
a.Lock()
defer a.Unlock()
a.inChan <- msg
select {
case err := <-a.errChan:
return nil, err
case resp := <-a.outChan:
return resp, nil
}
}
// CmdPipe holds the pipes that connect to the long-running keybase api commands
type CmdPipe struct {
Stderr io.ReadCloser
Stdin io.WriteCloser
Stdout io.ReadCloser
}
// Pipes is an interface that holds pointers to the necessary CmdPipes
type Pipes interface {
// ChatAPI returns a pointer to the ChatAPI pipe used to send outgoing requests to the
// keybase chat api
ChatAPI() *CmdPipe
// APIListen returns a pointer to the APIListen pipe used to receive incoming messages
// from the keybase chat api
APIListen() *CmdPipe
}
// defaultPipes satisfies the Pipes interface, and provides standard CmdPipes that should
// be used during normal operation
type defaultPipes struct {
chatAPI *CmdPipe
chatAPIListen *CmdPipe
}
var _ Pipes = &defaultPipes{}
// ChatAPI returns a pointer to the ChatAPI pipe used to send outgoing requests to the
// keybase chat api
func (p *defaultPipes) ChatAPI() *CmdPipe {
return p.chatAPI
}
// APIListen returns a pointer to the APIListen pipe used to receive incoming messages from
// the keybase chat api
func (p *defaultPipes) APIListen() *CmdPipe {
return p.chatAPIListen
}
// getPipes sets up and returns the default pipes
func getPipes(ctx context.Context, kbLoc string, chatAPICmd, chatAPIListenCmd []string) (Pipes, func() error, error) {
var err error
// set up the commands
chatAPI := exec.CommandContext(ctx, kbLoc, chatAPIListenCmd...)
chatAPIListen := exec.CommandContext(ctx, kbLoc, chatAPICmd...)
// create the pipes
emptyStartFunc := func() error { return nil }
chatAPIPipe := CmdPipe{}
if chatAPIPipe.Stderr, err = chatAPI.StderrPipe(); err != nil {
return nil, emptyStartFunc, fmt.Errorf("failed to get pipe: %v", err)
}
if chatAPIPipe.Stdin, err = chatAPI.StdinPipe(); err != nil {
return nil, emptyStartFunc, fmt.Errorf("failed to get pipe: %v", err)
}
if chatAPIPipe.Stdout, err = chatAPI.StdoutPipe(); err != nil {
return nil, emptyStartFunc, fmt.Errorf("failed to get pipe: %v", err)
}
chatAPIListenPipe := CmdPipe{}
if chatAPIListenPipe.Stderr, err = chatAPIListen.StderrPipe(); err != nil {
return nil, emptyStartFunc, fmt.Errorf("failed to get pipe: %v", err)
}
if chatAPIListenPipe.Stdin, err = chatAPIListen.StdinPipe(); err != nil {
return nil, emptyStartFunc, fmt.Errorf("failed to get pipe: %v", err)
}
if chatAPIListenPipe.Stdout, err = chatAPIListen.StdoutPipe(); err != nil {
return nil, emptyStartFunc, fmt.Errorf("failed to get pipe: %v", err)
}
// create an anonymous function to start the commands
startFunc := func() error {
if err := chatAPI.Start(); err != nil {
return fmt.Errorf("failed to start chat api command: %v", err)
}
if err := chatAPIListen.Start(); err != nil {
return fmt.Errorf("failed to start chat api listen command: %v", err)
}
return nil
}
return &defaultPipes{chatAPI: &chatAPIPipe, chatAPIListen: &chatAPIListenPipe}, startFunc, nil
}
// initPipes starts the goroutines that handle getting messages from the apis and sending
// them into the appropriate channels
func (a *ChatAPI) initPipes(ctx context.Context, pipes Pipes) {
var (
keyIn = pipes.ChatAPI().Stdin
keyResp = pipes.ChatAPI().Stdout
keyOut = pipes.APIListen().Stdout
)
// now start read and write goroutines
// reader
go func() {
r := bufio.NewReader(keyOut)
var buf bytes.Buffer
for {
select {
case <-ctx.Done():
return
default:
line, isPrefix, err := r.ReadLine()
if err != nil {
a.errChan <- err
}
if isPrefix {
// if its not a complete line, write it to the buffer only so we loop again
buf.Write(line)
} else {
// if the line is complete, write it to the buffer
buf.Write(line)
// then write the entire buffer to the channel
a.outChan <- buf.Bytes()
// then reset the buffer so it can be re-used
buf.Reset()
}
}
}
}()
// writer
go func() {
for {
select {
case msg := <-a.inChan:
_, err := keyIn.Write(msg)
if err != nil {
a.errChan <- err
}
case <-ctx.Done():
return
}
}
}()
// writer responses to fill up gas limits
go func() {
r := bufio.NewReader(keyResp)
for {
select {
case <-ctx.Done():
return
default:
line, err := r.ReadBytes('\n')
if err != nil {
a.errChan <- err
}
var resp *KeybaseAPIResponse
err = json.Unmarshal(line, resp)
if err != nil {
a.errChan <- err
} else {
for _, limit := range resp.Result.Ratelimits {
if limit.Tank == "chat" {
a.Capacity = limit.Capacity
a.Gas = limit.Gas
a.Reset = limit.Reset
}
}
}
}
}
}()
}

13
keybase_test.go

@ -35,19 +35,8 @@ func TestBaseCommand(t *testing.T) { @@ -35,19 +35,8 @@ func TestBaseCommand(t *testing.T) {
opts := NewOptions()
opts.HomeDir = "/home/foo"
want := []string{"--home", "/home/foo", "--enable-bot-lite-mode", "arg"}
got := opts.buildBaseCommand("arg")
got := opts.buildArgs("arg")
if !reflect.DeepEqual(want, got) {
t.Errorf("buildBaseCommand returned invalid command set")
}
}
func TestNewAPI(t *testing.T) {
opts := NewOptions()
want := &ChatAPI{
opts: opts,
}
got := NewAPI(opts)
if !reflect.DeepEqual(want, got) {
t.Errorf("NewApiWithContext returned incorrect struct")
}
}

29
pkg/ctxreader/ctxreader.go

@ -0,0 +1,29 @@ @@ -0,0 +1,29 @@
package ctxreader
import (
"context"
"io"
)
type ContextReader struct {
ctx context.Context
r io.ReadCloser
}
func (cr *ContextReader) Read(p []byte) (n int, err error) {
if err := cr.ctx.Err(); err != nil {
return 0, err
}
return cr.r.Read(p)
}
func (cr *ContextReader) Close() error {
return cr.r.Close()
}
func NewContextReader(ctx context.Context, r io.ReadCloser) io.ReadCloser {
return &ContextReader{
ctx: ctx,
r: r,
}
}

16
ratelimits.go

@ -0,0 +1,16 @@ @@ -0,0 +1,16 @@
package keybase
type RateLimit struct {
Tank string `json:"tank"`
Capacity int `json:"capacity"`
Reset int `json:"reset"`
Gas int `json:"gas"`
}
type KeybaseApiResponse struct {
Result struct {
Message string `json:"message"`
ID int `json:"id"`
RateLimits []RateLimit `json:"ratelimits"`
} `json:"result"`
}
Loading…
Cancel
Save