From 56dbf0540e2aaaf26c2154025e566ef39593c848 Mon Sep 17 00:00:00 2001 From: Arsen Musayelyan Date: Sun, 1 May 2022 11:36:28 -0700 Subject: [PATCH] Switch to lrpc and use context to handle signals --- api/api.go | 105 +----------- api/firmware.go | 25 +-- api/fs.go | 67 ++------ api/get.go | 26 ++- api/notify.go | 12 +- api/set.go | 5 +- api/update.go | 8 +- api/watch.go | 103 +++-------- cmd/itctl/main.go | 24 +-- cmd/itctl/watch.go | 118 +++++++------ go.mod | 17 +- go.sum | 35 +--- main.go | 7 - socket.go | 415 ++++++++++++--------------------------------- 14 files changed, 265 insertions(+), 702 deletions(-) diff --git a/api/api.go b/api/api.go index 77c2fab..86cf0eb 100644 --- a/api/api.go +++ b/api/api.go @@ -1,117 +1,30 @@ package api import ( - "context" + "net" - "github.com/smallnest/rpcxlite/client" - "github.com/smallnest/rpcxlite/protocol" - "github.com/vmihailenco/msgpack/v5" - "go.arsenm.dev/infinitime" + "go.arsenm.dev/lrpc/client" + "go.arsenm.dev/lrpc/codec" ) const DefaultAddr = "/tmp/itd/socket" type Client struct { - itdClient client.XClient - itdCh chan *protocol.Message - fsClient client.XClient - fsCh chan *protocol.Message - srvVals map[string]chan interface{} + client *client.Client } func New(sockPath string) (*Client, error) { - d, err := client.NewPeer2PeerDiscovery("unix@"+sockPath, "") + conn, err := net.Dial("unix", sockPath) if err != nil { return nil, err } - out := &Client{} - - out.itdCh = make(chan *protocol.Message, 5) - out.itdClient = client.NewBidirectionalXClient( - "ITD", - client.Failtry, - client.RandomSelect, - d, - client.DefaultOption, - out.itdCh, - ) - - out.fsCh = make(chan *protocol.Message, 5) - out.fsClient = client.NewBidirectionalXClient( - "FS", - client.Failtry, - client.RandomSelect, - d, - client.DefaultOption, - out.fsCh, - ) - - out.srvVals = map[string]chan interface{}{} - - go out.handleMessages(out.itdCh) - go out.handleMessages(out.fsCh) - + out := &Client{ + client: client.New(conn, codec.JSON), + } return out, nil } -func (c *Client) handleMessages(msgCh chan *protocol.Message) { - for msg := range msgCh { - _, ok := c.srvVals[msg.ServicePath] - if !ok { - c.srvVals[msg.ServicePath] = make(chan interface{}, 5) - } - - //fmt.Printf("%+v\n", msg) - - ch := c.srvVals[msg.ServicePath] - - switch msg.ServiceMethod { - case "FSProgress": - var progress FSTransferProgress - msgpack.Unmarshal(msg.Payload, &progress) - ch <- progress - case "DFUProgress": - var progress infinitime.DFUProgress - msgpack.Unmarshal(msg.Payload, &progress) - ch <- progress - case "MotionSample": - var motionVals infinitime.MotionValues - msgpack.Unmarshal(msg.Payload, &motionVals) - ch <- motionVals - case "Done": - close(c.srvVals[msg.ServicePath]) - delete(c.srvVals, msg.ServicePath) - default: - var value interface{} - msgpack.Unmarshal(msg.Payload, &value) - ch <- value - } - } -} - -func (c *Client) done(id string) error { - return c.itdClient.Call( - context.Background(), - "Done", - id, - nil, - ) -} - func (c *Client) Close() error { - err := c.itdClient.Close() - if err != nil { - return err - } - - err = c.fsClient.Close() - if err != nil { - return err - } - - close(c.itdCh) - close(c.fsCh) - - return nil + return c.client.Close() } diff --git a/api/firmware.go b/api/firmware.go index 1335365..52f6088 100644 --- a/api/firmware.go +++ b/api/firmware.go @@ -1,40 +1,23 @@ package api import ( - "context" - "time" - "go.arsenm.dev/infinitime" ) func (c *Client) FirmwareUpgrade(upgType UpgradeType, files ...string) (chan infinitime.DFUProgress, error) { - var id string - err := c.itdClient.Call( - context.Background(), + progressCh := make(chan infinitime.DFUProgress, 5) + err := c.client.Call( + "ITD", "FirmwareUpgrade", FwUpgradeData{ Type: upgType, Files: files, }, - &id, + &progressCh, ) if err != nil { return nil, err } - progressCh := make(chan infinitime.DFUProgress, 5) - go func() { - srvValCh, ok := c.srvVals[id] - for !ok { - time.Sleep(100 * time.Millisecond) - srvValCh, ok = c.srvVals[id] - } - - for val := range srvValCh { - progressCh <- val.(infinitime.DFUProgress) - } - close(progressCh) - }() - return progressCh, nil } diff --git a/api/fs.go b/api/fs.go index 735aeff..3d6474a 100644 --- a/api/fs.go +++ b/api/fs.go @@ -1,13 +1,8 @@ package api -import ( - "context" - "time" -) - func (c *Client) Remove(paths ...string) error { - return c.fsClient.Call( - context.Background(), + return c.client.Call( + "FS", "Remove", paths, nil, @@ -15,17 +10,17 @@ func (c *Client) Remove(paths ...string) error { } func (c *Client) Rename(old, new string) error { - return c.fsClient.Call( - context.Background(), - "Remove", + return c.client.Call( + "FS", + "Rename", [2]string{old, new}, nil, ) } func (c *Client) Mkdir(paths ...string) error { - return c.fsClient.Call( - context.Background(), + return c.client.Call( + "FS", "Mkdir", paths, nil, @@ -33,8 +28,8 @@ func (c *Client) Mkdir(paths ...string) error { } func (c *Client) ReadDir(dir string) (out []FileInfo, err error) { - err = c.fsClient.Call( - context.Background(), + err = c.client.Call( + "FS", "ReadDir", dir, &out, @@ -43,59 +38,31 @@ func (c *Client) ReadDir(dir string) (out []FileInfo, err error) { } func (c *Client) Upload(dst, src string) (chan FSTransferProgress, error) { - var id string - err := c.fsClient.Call( - context.Background(), + progressCh := make(chan FSTransferProgress, 5) + err := c.client.Call( + "FS", "Upload", [2]string{dst, src}, - &id, + progressCh, ) if err != nil { return nil, err } - progressCh := make(chan FSTransferProgress, 5) - go func() { - srvValCh, ok := c.srvVals[id] - for !ok { - time.Sleep(100 * time.Millisecond) - srvValCh, ok = c.srvVals[id] - } - - for val := range srvValCh { - progressCh <- val.(FSTransferProgress) - } - close(progressCh) - }() - return progressCh, nil } func (c *Client) Download(dst, src string) (chan FSTransferProgress, error) { - var id string - err := c.fsClient.Call( - context.Background(), + progressCh := make(chan FSTransferProgress, 5) + err := c.client.Call( + "FS", "Download", [2]string{dst, src}, - &id, + progressCh, ) if err != nil { return nil, err } - progressCh := make(chan FSTransferProgress, 5) - go func() { - srvValCh, ok := c.srvVals[id] - for !ok { - time.Sleep(100 * time.Millisecond) - srvValCh, ok = c.srvVals[id] - } - - for val := range srvValCh { - progressCh <- val.(FSTransferProgress) - } - close(progressCh) - }() - return progressCh, nil } diff --git a/api/get.go b/api/get.go index 9229ef8..438efb2 100644 --- a/api/get.go +++ b/api/get.go @@ -1,14 +1,12 @@ package api import ( - "context" - "go.arsenm.dev/infinitime" ) func (c *Client) HeartRate() (out uint8, err error) { - err = c.itdClient.Call( - context.Background(), + err = c.client.Call( + "ITD", "HeartRate", nil, &out, @@ -17,8 +15,8 @@ func (c *Client) HeartRate() (out uint8, err error) { } func (c *Client) BatteryLevel() (out uint8, err error) { - err = c.itdClient.Call( - context.Background(), + err = c.client.Call( + "ITD", "BatteryLevel", nil, &out, @@ -27,8 +25,8 @@ func (c *Client) BatteryLevel() (out uint8, err error) { } func (c *Client) Motion() (out infinitime.MotionValues, err error) { - err = c.itdClient.Call( - context.Background(), + err = c.client.Call( + "ITD", "Motion", nil, &out, @@ -37,8 +35,8 @@ func (c *Client) Motion() (out infinitime.MotionValues, err error) { } func (c *Client) StepCount() (out uint32, err error) { - err = c.itdClient.Call( - context.Background(), + err = c.client.Call( + "ITD", "StepCount", nil, &out, @@ -47,8 +45,8 @@ func (c *Client) StepCount() (out uint32, err error) { } func (c *Client) Version() (out string, err error) { - err = c.itdClient.Call( - context.Background(), + err = c.client.Call( + "ITD", "Version", nil, &out, @@ -57,8 +55,8 @@ func (c *Client) Version() (out string, err error) { } func (c *Client) Address() (out string, err error) { - err = c.itdClient.Call( - context.Background(), + err = c.client.Call( + "ITD", "Address", nil, &out, diff --git a/api/notify.go b/api/notify.go index ca48c4c..a8ebb6f 100644 --- a/api/notify.go +++ b/api/notify.go @@ -1,17 +1,13 @@ package api -import ( - "context" -) - func (c *Client) Notify(title, body string) error { - return c.itdClient.Call( - context.Background(), + return c.client.Call( + "ITD", "Notify", NotifyData{ Title: title, - Body: body, + Body: body, }, nil, ) -} \ No newline at end of file +} diff --git a/api/set.go b/api/set.go index c6918d6..ea9a66a 100644 --- a/api/set.go +++ b/api/set.go @@ -1,13 +1,12 @@ package api import ( - "context" "time" ) func (c *Client) SetTime(t time.Time) error { - return c.itdClient.Call( - context.Background(), + return c.client.Call( + "ITD", "SetTime", t, nil, diff --git a/api/update.go b/api/update.go index 5ff299c..af1ecf8 100644 --- a/api/update.go +++ b/api/update.go @@ -1,12 +1,10 @@ package api -import "context" - func (c *Client) WeatherUpdate() error { - return c.itdClient.Call( - context.Background(), + return c.client.Call( + "ITD", "WeatherUpdate", nil, nil, ) -} \ No newline at end of file +} diff --git a/api/watch.go b/api/watch.go index 25ffb46..ec863c1 100644 --- a/api/watch.go +++ b/api/watch.go @@ -1,143 +1,80 @@ package api import ( - "context" - "time" - "go.arsenm.dev/infinitime" ) func (c *Client) WatchHeartRate() (<-chan uint8, func(), error) { - var id string - err := c.itdClient.Call( - context.Background(), + outCh := make(chan uint8, 2) + err := c.client.Call( + "ITD", "WatchHeartRate", nil, - &id, + outCh, ) if err != nil { return nil, nil, err } - outCh := make(chan uint8, 2) - go func() { - srvValCh, ok := c.srvVals[id] - for !ok { - time.Sleep(100 * time.Millisecond) - srvValCh, ok = c.srvVals[id] - } - - for val := range srvValCh { - outCh <- val.(uint8) - } - }() - doneFn := func() { - c.done(id) - close(c.srvVals[id]) - delete(c.srvVals, id) + close(outCh) } return outCh, doneFn, nil } func (c *Client) WatchBatteryLevel() (<-chan uint8, func(), error) { - var id string - err := c.itdClient.Call( - context.Background(), + outCh := make(chan uint8, 2) + err := c.client.Call( + "ITD", "WatchBatteryLevel", nil, - &id, + outCh, ) if err != nil { return nil, nil, err } - outCh := make(chan uint8, 2) - go func() { - srvValCh, ok := c.srvVals[id] - for !ok { - time.Sleep(100 * time.Millisecond) - srvValCh, ok = c.srvVals[id] - } - - for val := range srvValCh { - outCh <- val.(uint8) - } - }() - doneFn := func() { - c.done(id) - close(c.srvVals[id]) - delete(c.srvVals, id) + close(outCh) } return outCh, doneFn, nil } func (c *Client) WatchStepCount() (<-chan uint32, func(), error) { - var id string - err := c.itdClient.Call( - context.Background(), + outCh := make(chan uint32, 2) + err := c.client.Call( + "ITD", "WatchStepCount", nil, - &id, + outCh, ) if err != nil { return nil, nil, err } - outCh := make(chan uint32, 2) - go func() { - srvValCh, ok := c.srvVals[id] - for !ok { - time.Sleep(100 * time.Millisecond) - srvValCh, ok = c.srvVals[id] - } - - for val := range srvValCh { - outCh <- val.(uint32) - } - }() - doneFn := func() { - c.done(id) - close(c.srvVals[id]) - delete(c.srvVals, id) + close(outCh) } return outCh, doneFn, nil } func (c *Client) WatchMotion() (<-chan infinitime.MotionValues, func(), error) { - var id string - err := c.itdClient.Call( - context.Background(), + outCh := make(chan infinitime.MotionValues, 2) + err := c.client.Call( + "ITD", "WatchMotion", nil, - &id, + outCh, ) if err != nil { return nil, nil, err } - outCh := make(chan infinitime.MotionValues, 2) - go func() { - srvValCh, ok := c.srvVals[id] - for !ok { - time.Sleep(100 * time.Millisecond) - srvValCh, ok = c.srvVals[id] - } - - for val := range srvValCh { - outCh <- val.(infinitime.MotionValues) - } - }() - doneFn := func() { - c.done(id) - close(c.srvVals[id]) - delete(c.srvVals, id) + close(outCh) } return outCh, doneFn, nil diff --git a/cmd/itctl/main.go b/cmd/itctl/main.go index ba2f1b9..529a3e8 100644 --- a/cmd/itctl/main.go +++ b/cmd/itctl/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "os" "os/signal" "syscall" @@ -16,6 +17,13 @@ var client *api.Client func main() { log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) + ctx := context.Background() + ctx, _ = signal.NotifyContext( + ctx, + syscall.SIGINT, + syscall.SIGTERM, + ) + app := cli.App{ Name: "itctl", Flags: []cli.Flag{ @@ -236,22 +244,8 @@ func main() { }, } - err := app.Run(os.Args) + err := app.RunContext(ctx, os.Args) if err != nil { log.Fatal().Err(err).Msg("Error while running app") } } - -func catchSignal(fn func()) { - sigCh := make(chan os.Signal, 1) - signal.Notify( - sigCh, - syscall.SIGINT, - syscall.SIGTERM, - ) - go func() { - <-sigCh - fn() - os.Exit(0) - }() -} diff --git a/cmd/itctl/watch.go b/cmd/itctl/watch.go index 1be121a..5df41db 100644 --- a/cmd/itctl/watch.go +++ b/cmd/itctl/watch.go @@ -14,21 +14,23 @@ func watchHeart(c *cli.Context) error { return err } - catchSignal(cancel) - - for heartRate := range heartCh { - if c.Bool("json") { - json.NewEncoder(os.Stdout).Encode( - map[string]uint8{"heartRate": heartRate}, - ) - } else if c.Bool("shell") { - fmt.Printf("HEART_RATE=%d\n", heartRate) - } else { - fmt.Println(heartRate, "BPM") + for { + select { + case heartRate := <-heartCh: + if c.Bool("json") { + json.NewEncoder(os.Stdout).Encode( + map[string]uint8{"heartRate": heartRate}, + ) + } else if c.Bool("shell") { + fmt.Printf("HEART_RATE=%d\n", heartRate) + } else { + fmt.Println(heartRate, "BPM") + } + case <-c.Done(): + cancel() + return nil } } - - return nil } func watchBattLevel(c *cli.Context) error { @@ -37,21 +39,23 @@ func watchBattLevel(c *cli.Context) error { return err } - catchSignal(cancel) - - for battLevel := range battLevelCh { - if c.Bool("json") { - json.NewEncoder(os.Stdout).Encode( - map[string]uint8{"battLevel": battLevel}, - ) - } else if c.Bool("shell") { - fmt.Printf("BATTERY_LEVEL=%d\n", battLevel) - } else { - fmt.Printf("%d%%\n", battLevel) + for { + select { + case battLevel := <-battLevelCh: + if c.Bool("json") { + json.NewEncoder(os.Stdout).Encode( + map[string]uint8{"battLevel": battLevel}, + ) + } else if c.Bool("shell") { + fmt.Printf("BATTERY_LEVEL=%d\n", battLevel) + } else { + fmt.Printf("%d%%\n", battLevel) + } + case <-c.Done(): + cancel() + return nil } } - - return nil } func watchStepCount(c *cli.Context) error { @@ -60,21 +64,23 @@ func watchStepCount(c *cli.Context) error { return err } - catchSignal(cancel) - - for stepCount := range stepCountCh { - if c.Bool("json") { - json.NewEncoder(os.Stdout).Encode( - map[string]uint32{"stepCount": stepCount}, - ) - } else if c.Bool("shell") { - fmt.Printf("STEP_COUNT=%d\n", stepCount) - } else { - fmt.Println(stepCount, "Steps") + for { + select { + case stepCount := <-stepCountCh: + if c.Bool("json") { + json.NewEncoder(os.Stdout).Encode( + map[string]uint32{"stepCount": stepCount}, + ) + } else if c.Bool("shell") { + fmt.Printf("STEP_COUNT=%d\n", stepCount) + } else { + fmt.Println(stepCount, "Steps") + } + case <-c.Done(): + cancel() + return nil } } - - return nil } func watchMotion(c *cli.Context) error { @@ -83,22 +89,24 @@ func watchMotion(c *cli.Context) error { return err } - catchSignal(cancel) - - for motionVals := range motionCh { - if c.Bool("json") { - json.NewEncoder(os.Stdout).Encode(motionVals) - } else if c.Bool("shell") { - fmt.Printf( - "X=%d\nY=%d\nZ=%d\n", - motionVals.X, - motionVals.Y, - motionVals.Z, - ) - } else { - fmt.Println(motionVals) + for { + select { + case motionVals := <-motionCh: + if c.Bool("json") { + json.NewEncoder(os.Stdout).Encode(motionVals) + } else if c.Bool("shell") { + fmt.Printf( + "X=%d\nY=%d\nZ=%d\n", + motionVals.X, + motionVals.Y, + motionVals.Z, + ) + } else { + fmt.Println(motionVals) + } + case <-c.Done(): + cancel() + return nil } } - - return nil } diff --git a/go.mod b/go.mod index b9d6364..e298ef7 100644 --- a/go.mod +++ b/go.mod @@ -7,15 +7,13 @@ require ( github.com/cheggaaa/pb/v3 v3.0.8 github.com/gen2brain/dlgs v0.0.0-20211108104213-bade24837f0b github.com/godbus/dbus/v5 v5.0.6 - github.com/google/uuid v1.3.0 github.com/knadh/koanf v1.4.0 github.com/mattn/go-isatty v0.0.14 github.com/mozillazg/go-pinyin v0.19.0 github.com/rs/zerolog v1.26.1 - github.com/smallnest/rpcxlite v0.0.0-20210907055316-506983e15e7e github.com/urfave/cli/v2 v2.3.0 - github.com/vmihailenco/msgpack/v5 v5.3.5 go.arsenm.dev/infinitime v0.0.0-20220424030849-6c3f1b14c948 + go.arsenm.dev/lrpc v0.0.0-20220501183450-91933b798dbb golang.org/x/text v0.3.7 ) @@ -30,38 +28,31 @@ require ( github.com/fxamacker/cbor/v2 v2.4.0 // indirect github.com/go-gl/gl v0.0.0-20210813123233-e4099ee2221f // indirect github.com/go-gl/glfw/v3.3/glfw v0.0.0-20211024062804-40e447a793be // indirect + github.com/gofrs/uuid v4.2.0+incompatible // indirect github.com/goki/freetype v0.0.0-20181231101311-fa8a33aabaff // indirect - github.com/golang/snappy v0.0.4 // indirect github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 // indirect - github.com/hashicorp/golang-lru v0.5.4 // indirect - github.com/julienschmidt/httprouter v1.3.0 // indirect - github.com/kavu/go_reuseport v1.5.0 // indirect github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-runewidth v0.0.12 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect - github.com/mitchellh/mapstructure v1.4.3 // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/muka/go-bluetooth v0.0.0-20220219050759-674a63b8741a // indirect github.com/pelletier/go-toml v1.7.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect - github.com/rs/cors v1.7.0 // indirect github.com/russross/blackfriday/v2 v2.0.1 // indirect github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect github.com/sirupsen/logrus v1.6.0 // indirect - github.com/soheilhy/cmux v0.1.5 // indirect github.com/srwiley/oksvg v0.0.0-20200311192757-870daf9aa564 // indirect github.com/srwiley/rasterx v0.0.0-20200120212402-85cb7272f5e9 // indirect github.com/stretchr/testify v1.7.1 // indirect - github.com/valyala/bytebufferpool v1.0.0 // indirect - github.com/valyala/fastrand v1.1.0 // indirect + github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/yuin/goldmark v1.4.1 // indirect golang.org/x/image v0.0.0-20200430140353-33d19683fad8 // indirect golang.org/x/net v0.0.0-20220407224826-aac1ed45d8e3 // indirect - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20220408201424-a24fb2fb8a0f // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect diff --git a/go.sum b/go.sum index 5efd50e..5c8020a 100644 --- a/go.sum +++ b/go.sum @@ -56,6 +56,8 @@ github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.0.6 h1:mkgN1ofwASrYnJ5W6U/BxG15eXXXjirgZc7CLqkcaro= github.com/godbus/dbus/v5 v5.0.6/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gofrs/uuid v4.2.0+incompatible h1:yyYWMnhkhrKwwr8gAOcOCYxOOscHgDS9yZgBrnJfGa0= +github.com/gofrs/uuid v4.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/goki/freetype v0.0.0-20181231101311-fa8a33aabaff h1:W71vTCKoxtdXgnm1ECDFkfQnpdqAO00zzGXLA5yaEX8= github.com/goki/freetype v0.0.0-20181231101311-fa8a33aabaff/go.mod h1:wfqRWLHRBsRgkp5dmbG56SA0DmVtwrF5N3oPdI8t+Aw= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -63,15 +65,10 @@ github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/golang/snappy v0.0.2/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= -github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -90,8 +87,6 @@ github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b github.com/hashicorp/go-version v1.1.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= -github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= -github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/vault/api v1.0.4/go.mod h1:gDcqh3WGcR1cpF5AJz/B1UFheUEneMoIospckxBxk6Q= @@ -104,10 +99,6 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfC github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc= github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= github.com/josephspurrier/goversioninfo v0.0.0-20200309025242-14b0ab84c6ca/go.mod h1:eJTEwMjXb7kZ633hO3Ln9mBUCOjX2+FlTljvpl9SYdE= -github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= -github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= -github.com/kavu/go_reuseport v1.5.0 h1:UNuiY2OblcqAtVDE8Gsg1kZz8zbBWg907sP1ceBV+bk= -github.com/kavu/go_reuseport v1.5.0/go.mod h1:CG8Ee7ceMFSMnx/xr25Vm0qXaj2Z4i5PWoUx+JZ5/CU= github.com/knadh/koanf v1.4.0 h1:/k0Bh49SqLyLNfte9r6cvuZWrApOQhglOmhIU3L/zDw= github.com/knadh/koanf v1.4.0/go.mod h1:1cfH5223ZeZUOs8FU2UdTmaNfHpqgtjV0+NHjRO43gs= github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8= @@ -137,8 +128,8 @@ github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eI github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= -github.com/mitchellh/mapstructure v1.4.3 h1:OVowDSCllw/YjdLkam3/sm7wEtOy59d8ndGgCcyj8cs= -github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= @@ -165,8 +156,6 @@ github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8d github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= -github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= -github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.26.1 h1:/ihwxqH+4z8UxyI70wM1z9yCvkWcfz/a3mj48k/Zngc= github.com/rs/zerolog v1.26.1/go.mod h1:/wSSJWX7lVrsOwlbyTRSOJvqRlc+WjWlfes+CiJ+tmc= @@ -178,10 +167,6 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5I github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= -github.com/smallnest/rpcxlite v0.0.0-20210907055316-506983e15e7e h1:/vXNU7mtgLoU8SHyS0aKFcgH5nzS4gXyPzYAqxqo9TY= -github.com/smallnest/rpcxlite v0.0.0-20210907055316-506983e15e7e/go.mod h1:vKxVAvFR0h8sReZSmUg3PB8cDJXB+m1JtS5BKAITOUE= -github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js= -github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= @@ -201,12 +186,6 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/suapapa/go_eddystone v1.3.1/go.mod h1:bXC11TfJOS+3g3q/Uzd7FKd5g62STQEfeEIhcKe4Qy8= github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M= github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= -github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= -github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fastrand v0.0.0-20170531153657-19dd0f0bf014/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= -github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8= -github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= -github.com/vmihailenco/msgpack/v5 v5.3.4/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU= github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= @@ -221,6 +200,8 @@ github.com/yuin/goldmark v1.4.1 h1:/vn0k+RBvwlxEmP5E7SZMqNxPhfMVFEJiykr15/0XKM= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.arsenm.dev/infinitime v0.0.0-20220424030849-6c3f1b14c948 h1:OX1SyEIFz4ae2z468lBQvRTNRvqLEwjfJ8lcssUH5+w= go.arsenm.dev/infinitime v0.0.0-20220424030849-6c3f1b14c948/go.mod h1:1cBQ3fp6QlRbSqu9kEBAHsVThINj31FtqHIYVsQ7wgg= +go.arsenm.dev/lrpc v0.0.0-20220501183450-91933b798dbb h1:5fsGR3Vz6ZMcSwA9/KSdxblh5qbtFf5HNzQptykexvY= +go.arsenm.dev/lrpc v0.0.0-20220501183450-91933b798dbb/go.mod h1:KtIqSuK4mMzHm9OIO2oRtMxSZTooPCnMgtP3Q88N7hw= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -240,11 +221,9 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= -golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220407224826-aac1ed45d8e3 h1:EN5+DfgmRMvRUrMGERW2gQl3Vc+Z7ZMnI/xdEpPSf0c= golang.org/x/net v0.0.0-20220407224826-aac1ed45d8e3/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -254,7 +233,6 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -270,7 +248,6 @@ golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200331124033-c3d80250170d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200728102440-3e129f6d46b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/main.go b/main.go index bc2a1c2..abdf3fe 100644 --- a/main.go +++ b/main.go @@ -26,8 +26,6 @@ import ( "strconv" "time" - rpcxlog "github.com/smallnest/rpcxlite/log" - "github.com/gen2brain/dlgs" "github.com/knadh/koanf" "github.com/mattn/go-isatty" @@ -36,11 +34,6 @@ import ( "go.arsenm.dev/infinitime" ) -func init() { - // Disable rpcxlite logging - rpcxlog.SetDummyLogger() -} - var k = koanf.New(".") //go:embed version.txt diff --git a/socket.go b/socket.go index a4efb14..01070fd 100644 --- a/socket.go +++ b/socket.go @@ -19,26 +19,20 @@ package main import ( - "bytes" - "context" "errors" + "fmt" "io" "net" - "net/http" - "net/url" "os" "path/filepath" - "strings" "time" - "github.com/google/uuid" "github.com/rs/zerolog/log" - "github.com/smallnest/rpcxlite/server" - "github.com/smallnest/rpcxlite/share" - "github.com/vmihailenco/msgpack/v5" "go.arsenm.dev/infinitime" "go.arsenm.dev/infinitime/blefs" "go.arsenm.dev/itd/api" + "go.arsenm.dev/lrpc/codec" + "go.arsenm.dev/lrpc/server" ) // This type signifies an unneeded value. @@ -50,7 +44,6 @@ var ( ErrDFUInvalidFile = errors.New("provided file is invalid for given upgrade type") ErrDFUNotEnoughFiles = errors.New("not enough files provided for given upgrade type") ErrDFUInvalidUpgType = errors.New("invalid upgrade type") - ErrRPCXNoReturnURL = errors.New("bidirectional requests over gateway require a returnURL field in the metadata") ) type DoneMap map[string]chan struct{} @@ -100,13 +93,12 @@ func startSocket(dev *infinitime.Device) error { log.Warn().Err(err).Msg("Error getting BLE filesystem") } - srv := server.NewServer() + srv := server.New() itdAPI := &ITD{ dev: dev, - srv: srv, } - err = srv.Register(itdAPI, "") + err = srv.Register(itdAPI) if err != nil { return err } @@ -114,14 +106,13 @@ func startSocket(dev *infinitime.Device) error { fsAPI := &FS{ dev: dev, fs: fs, - srv: srv, } - err = srv.Register(fsAPI, "") + err = srv.Register(fsAPI) if err != nil { return err } - go srv.ServeListener("unix", ln) + go srv.Serve(ln, codec.JSON) // Log socket start log.Info().Str("path", k.String("socket.path")).Msg("Started control socket") @@ -131,21 +122,16 @@ func startSocket(dev *infinitime.Device) error { type ITD struct { dev *infinitime.Device - srv *server.Server } -func (i *ITD) HeartRate(_ context.Context, _ none, out *uint8) error { - heartRate, err := i.dev.HeartRate() - *out = heartRate - return err +func (i *ITD) HeartRate(_ *server.Context) (uint8, error) { + return i.dev.HeartRate() } -func (i *ITD) WatchHeartRate(ctx context.Context, _ none, out *string) error { - // Get client message sender - msgSender, ok := getMsgSender(ctx, i.srv) - // If user is using gateway, the client connection will not be available - if !ok { - return ErrRPCXNoReturnURL +func (i *ITD) WatchHeartRate(ctx *server.Context) error { + ch, err := ctx.MakeChannel() + if err != nil { + return err } heartRateCh, cancel, err := i.dev.WatchHeartRate() @@ -153,46 +139,32 @@ func (i *ITD) WatchHeartRate(ctx context.Context, _ none, out *string) error { return err } - id := uuid.New().String() go func() { - done.Create(id) // For every heart rate value - for heartRate := range heartRateCh { + for { select { - case <-done[id]: + case <-ctx.Done(): + fmt.Println("ctx done") // Stop notifications if done signal received cancel() - done.Remove(id) return - default: - data, err := msgpack.Marshal(heartRate) - if err != nil { - log.Error().Err(err).Msg("Error encoding heart rate") - continue - } - - // Send response to connection if no done signal received - msgSender.SendMessage(id, "HeartRateSample", nil, data) + case heartRate := <-heartRateCh: + ch <- heartRate } } }() - *out = id return nil } -func (i *ITD) BatteryLevel(_ context.Context, _ none, out *uint8) error { - battLevel, err := i.dev.BatteryLevel() - *out = battLevel - return err +func (i *ITD) BatteryLevel(_ *server.Context) (uint8, error) { + return i.dev.BatteryLevel() } -func (i *ITD) WatchBatteryLevel(ctx context.Context, _ none, out *string) error { - // Get client message sender - msgSender, ok := getMsgSender(ctx, i.srv) - // If user is using gateway, the client connection will not be available - if !ok { - return ErrRPCXNoReturnURL +func (i *ITD) WatchBatteryLevel(ctx *server.Context) error { + ch, err := ctx.MakeChannel() + if err != nil { + return err } battLevelCh, cancel, err := i.dev.WatchBatteryLevel() @@ -200,46 +172,31 @@ func (i *ITD) WatchBatteryLevel(ctx context.Context, _ none, out *string) error return err } - id := uuid.New().String() go func() { - done.Create(id) // For every heart rate value - for battLevel := range battLevelCh { + for { select { - case <-done[id]: + case <-ctx.Done(): // Stop notifications if done signal received cancel() - done.Remove(id) return - default: - data, err := msgpack.Marshal(battLevel) - if err != nil { - log.Error().Err(err).Msg("Error encoding battery level") - continue - } - - // Send response to connection if no done signal received - msgSender.SendMessage(id, "BatteryLevelSample", nil, data) + case battLevel := <-battLevelCh: + ch <- battLevel } } }() - *out = id return nil } -func (i *ITD) Motion(_ context.Context, _ none, out *infinitime.MotionValues) error { - motionVals, err := i.dev.Motion() - *out = motionVals - return err +func (i *ITD) Motion(_ *server.Context) (infinitime.MotionValues, error) { + return i.dev.Motion() } -func (i *ITD) WatchMotion(ctx context.Context, _ none, out *string) error { - // Get client message sender - msgSender, ok := getMsgSender(ctx, i.srv) - // If user is using gateway, the client connection will not be available - if !ok { - return ErrRPCXNoReturnURL +func (i *ITD) WatchMotion(ctx *server.Context) error { + ch, err := ctx.MakeChannel() + if err != nil { + return err } motionValsCh, cancel, err := i.dev.WatchMotion() @@ -247,46 +204,31 @@ func (i *ITD) WatchMotion(ctx context.Context, _ none, out *string) error { return err } - id := uuid.New().String() go func() { - done.Create(id) // For every heart rate value - for motionVals := range motionValsCh { + for { select { - case <-done[id]: + case <-ctx.Done(): // Stop notifications if done signal received cancel() - done.Remove(id) return - default: - data, err := msgpack.Marshal(motionVals) - if err != nil { - log.Error().Err(err).Msg("Error encoding motion values") - continue - } - - // Send response to connection if no done signal received - msgSender.SendMessage(id, "MotionSample", nil, data) + case motionVals := <-motionValsCh: + ch <- motionVals } } }() - *out = id return nil } -func (i *ITD) StepCount(_ context.Context, _ none, out *uint32) error { - stepCount, err := i.dev.StepCount() - *out = stepCount - return err +func (i *ITD) StepCount(_ *server.Context) (uint32, error) { + return i.dev.StepCount() } -func (i *ITD) WatchStepCount(ctx context.Context, _ none, out *string) error { - // Get client message sender - msgSender, ok := getMsgSender(ctx, i.srv) - // If user is using gateway, the client connection will not be available - if !ok { - return ErrRPCXNoReturnURL +func (i *ITD) WatchStepCount(ctx *server.Context) error { + ch, err := ctx.MakeChannel() + if err != nil { + return err } stepCountCh, cancel, err := i.dev.WatchStepCount() @@ -294,60 +236,44 @@ func (i *ITD) WatchStepCount(ctx context.Context, _ none, out *string) error { return err } - id := uuid.New().String() go func() { - done.Create(id) // For every heart rate value - for stepCount := range stepCountCh { + for { select { - case <-done[id]: + case <-ctx.Done(): // Stop notifications if done signal received cancel() - done.Remove(id) return - default: - data, err := msgpack.Marshal(stepCount) - if err != nil { - log.Error().Err(err).Msg("Error encoding step count") - continue - } - - // Send response to connection if no done signal received - msgSender.SendMessage(id, "StepCountSample", nil, data) + case stepCount := <-stepCountCh: + ch <- stepCount } } }() - *out = id return nil } -func (i *ITD) Version(_ context.Context, _ none, out *string) error { - version, err := i.dev.Version() - *out = version - return err +func (i *ITD) Version(_ *server.Context) (string, error) { + return i.dev.Version() } -func (i *ITD) Address(_ context.Context, _ none, out *string) error { - addr := i.dev.Address() - *out = addr - return nil +func (i *ITD) Address(_ *server.Context) string { + return i.dev.Address() } -func (i *ITD) Notify(_ context.Context, data api.NotifyData, _ *none) error { +func (i *ITD) Notify(_ *server.Context, data api.NotifyData) error { return i.dev.Notify(data.Title, data.Body) } -func (i *ITD) SetTime(_ context.Context, t time.Time, _ *none) error { - return i.dev.SetTime(t) +func (i *ITD) SetTime(_ *server.Context, t *time.Time) error { + return i.dev.SetTime(*t) } -func (i *ITD) WeatherUpdate(_ context.Context, _ none, _ *none) error { +func (i *ITD) WeatherUpdate(_ *server.Context) { sendWeatherCh <- struct{}{} - return nil } -func (i *ITD) FirmwareUpgrade(ctx context.Context, reqData api.FwUpgradeData, out *string) error { +func (i *ITD) FirmwareUpgrade(ctx *server.Context, reqData api.FwUpgradeData) error { i.dev.DFU.Reset() switch reqData.Type { @@ -387,30 +313,22 @@ func (i *ITD) FirmwareUpgrade(ctx context.Context, reqData api.FwUpgradeData, ou return ErrDFUInvalidUpgType } - id := uuid.New().String() - *out = id - - // Get client message sender - msgSender, ok := getMsgSender(ctx, i.srv) - // If user is using gateway, the client connection will not be available - if ok { - go func() { - // For every progress event - for event := range i.dev.DFU.Progress() { - data, err := msgpack.Marshal(event) - if err != nil { - log.Error().Err(err).Msg("Error encoding DFU progress event") - continue - } - - msgSender.SendMessage(id, "DFUProgress", nil, data) - } - - firmwareUpdating = false - msgSender.SendMessage(id, "Done", nil, nil) - }() + ch, err := ctx.MakeChannel() + if err != nil { + return err } + go func() { + // For every progress event + for event := range i.dev.DFU.Progress() { + ch <- event + } + + firmwareUpdating = false + // Send zero object to signal completion + close(ch) + }() + // Set firmwareUpdating firmwareUpdating = true @@ -427,18 +345,12 @@ func (i *ITD) FirmwareUpgrade(ctx context.Context, reqData api.FwUpgradeData, ou return nil } -func (i *ITD) Done(_ context.Context, id string, _ *none) error { - done.Done(id) - return nil -} - type FS struct { dev *infinitime.Device fs *blefs.FS - srv *server.Server } -func (fs *FS) Remove(_ context.Context, paths []string, _ *none) error { +func (fs *FS) Remove(_ *server.Context, paths []string) error { fs.updateFS() for _, path := range paths { err := fs.fs.Remove(path) @@ -449,12 +361,12 @@ func (fs *FS) Remove(_ context.Context, paths []string, _ *none) error { return nil } -func (fs *FS) Rename(_ context.Context, paths [2]string, _ *none) error { +func (fs *FS) Rename(_ *server.Context, paths [2]string) error { fs.updateFS() return fs.fs.Rename(paths[0], paths[1]) } -func (fs *FS) Mkdir(_ context.Context, paths []string, _ *none) error { +func (fs *FS) Mkdir(_ *server.Context, paths []string) error { fs.updateFS() for _, path := range paths { err := fs.fs.Mkdir(path) @@ -465,18 +377,18 @@ func (fs *FS) Mkdir(_ context.Context, paths []string, _ *none) error { return nil } -func (fs *FS) ReadDir(_ context.Context, dir string, out *[]api.FileInfo) error { +func (fs *FS) ReadDir(_ *server.Context, dir string) ([]api.FileInfo, error) { fs.updateFS() entries, err := fs.fs.ReadDir(dir) if err != nil { - return err + return nil, err } var fileInfo []api.FileInfo for _, entry := range entries { info, err := entry.Info() if err != nil { - return err + return nil, err } fileInfo = append(fileInfo, api.FileInfo{ Name: info.Name(), @@ -485,11 +397,10 @@ func (fs *FS) ReadDir(_ context.Context, dir string, out *[]api.FileInfo) error }) } - *out = fileInfo - return nil + return fileInfo, nil } -func (fs *FS) Upload(ctx context.Context, paths [2]string, out *string) error { +func (fs *FS) Upload(ctx *server.Context, paths [2]string) error { fs.updateFS() localFile, err := os.Open(paths[1]) @@ -507,31 +418,22 @@ func (fs *FS) Upload(ctx context.Context, paths [2]string, out *string) error { return err } - id := uuid.New().String() - *out = id - - // Get client message sender - msgSender, ok := getMsgSender(ctx, fs.srv) - // If user is using gateway, the client connection will not be available - if ok { - go func() { - // For every progress event - for sent := range remoteFile.Progress() { - data, err := msgpack.Marshal(api.FSTransferProgress{ - Total: remoteFile.Size(), - Sent: sent, - }) - if err != nil { - log.Error().Err(err).Msg("Error encoding filesystem transfer progress event") - continue - } - - msgSender.SendMessage(id, "FSProgress", nil, data) - } - - msgSender.SendMessage(id, "Done", nil, nil) - }() + ch, err := ctx.MakeChannel() + if err != nil { + return err } + go func() { + // For every progress event + for sent := range remoteFile.Progress() { + ch <- api.FSTransferProgress{ + Total: remoteFile.Size(), + Sent: sent, + } + } + + // Send zero object to signal completion + close(ch) + }() go func() { io.Copy(remoteFile, localFile) @@ -542,7 +444,7 @@ func (fs *FS) Upload(ctx context.Context, paths [2]string, out *string) error { return nil } -func (fs *FS) Download(ctx context.Context, paths [2]string, out *string) error { +func (fs *FS) Download(ctx *server.Context, paths [2]string) error { fs.updateFS() localFile, err := os.Create(paths[0]) @@ -555,33 +457,24 @@ func (fs *FS) Download(ctx context.Context, paths [2]string, out *string) error return err } - id := uuid.New().String() - *out = id - - // Get client message sender - msgSender, ok := getMsgSender(ctx, fs.srv) - // If user is using gateway, the client connection will not be available - if ok { - go func() { - // For every progress event - for rcvd := range remoteFile.Progress() { - data, err := msgpack.Marshal(api.FSTransferProgress{ - Total: remoteFile.Size(), - Sent: rcvd, - }) - if err != nil { - log.Error().Err(err).Msg("Error encoding filesystem transfer progress event") - continue - } - - msgSender.SendMessage(id, "FSProgress", nil, data) - } - - msgSender.SendMessage(id, "Done", nil, nil) - localFile.Close() - remoteFile.Close() - }() + ch, err := ctx.MakeChannel() + if err != nil { + return err } + go func() { + // For every progress event + for sent := range remoteFile.Progress() { + ch <- api.FSTransferProgress{ + Total: remoteFile.Size(), + Sent: sent, + } + } + + // Send zero object to signal completion + close(ch) + localFile.Close() + remoteFile.Close() + }() go io.Copy(localFile, remoteFile) @@ -602,87 +495,3 @@ func (fs *FS) updateFS() { } } } - -// cleanPaths runs strings.TrimSpace and filepath.Clean -// on all inputs, and returns the updated slice -func cleanPaths(paths []string) []string { - for index, path := range paths { - newPath := strings.TrimSpace(path) - paths[index] = filepath.Clean(newPath) - } - return paths -} - -func getMsgSender(ctx context.Context, srv *server.Server) (MessageSender, bool) { - // Get client message sender - clientConn, ok := ctx.Value(server.RemoteConnContextKey).(net.Conn) - // If the connection exists, use rpcMsgSender - if ok { - return &rpcMsgSender{srv, clientConn}, true - } else { - // Get metadata if it exists - metadata, ok := ctx.Value(share.ReqMetaDataKey).(map[string]string) - if !ok { - return nil, false - } - // Get returnURL field from metadata if it exists - returnURL, ok := metadata["returnURL"] - if !ok { - return nil, false - } - // Use httpMsgSender - return &httpMsgSender{returnURL}, true - } - -} - -// The MessageSender interface sends messages to the client -type MessageSender interface { - SendMessage(servicePath, serviceMethod string, metadata map[string]string, data []byte) error -} - -// rpcMsgSender sends messages using RPCX, for clients that support it -type rpcMsgSender struct { - srv *server.Server - conn net.Conn -} - -// SendMessage uses the server to send an RPCX message back to the client -func (r *rpcMsgSender) SendMessage(servicePath, serviceMethod string, metadata map[string]string, data []byte) error { - return r.srv.SendMessage(r.conn, servicePath, serviceMethod, metadata, data) -} - -// httpMsgSender sends messages to the given return URL, for clients that provide it -type httpMsgSender struct { - url string -} - -// SendMessage uses HTTP to send a message back to the client -func (h *httpMsgSender) SendMessage(servicePath, serviceMethod string, metadata map[string]string, data []byte) error { - // Create new POST request with provided URL - req, err := http.NewRequest(http.MethodPost, h.url, bytes.NewReader(data)) - if err != nil { - return err - } - - // Set service path and method headers - req.Header.Set("X-RPCX-ServicePath", servicePath) - req.Header.Set("X-RPCX-ServiceMethod", serviceMethod) - - // Create new URL query values - query := url.Values{} - // Transfer values from metadata to query - for k, v := range metadata { - query.Set(k, v) - } - // Set metadata header by encoding query values - req.Header.Set("X-RPCX-Meta", query.Encode()) - - // Perform request - res, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - // Close body - return res.Body.Close() -}