diff --git a/client/client.go b/client/client.go index 9ac1d64..409d6c3 100644 --- a/client/client.go +++ b/client/client.go @@ -47,7 +47,7 @@ type Client struct { conn io.ReadWriteCloser codec codec.Codec - chMtx sync.Mutex + chMtx *sync.Mutex chs map[string]chan *types.Response } @@ -57,6 +57,7 @@ func New(conn io.ReadWriteCloser, cf codec.CodecFunc) *Client { conn: conn, codec: cf(conn), chs: map[string]chan *types.Response{}, + chMtx: &sync.Mutex{}, } go out.handleConn() @@ -92,7 +93,10 @@ func (c *Client) Call(ctx context.Context, rcvr, method string, arg interface{}, } // Get response from channel - resp := <-c.chs[idStr] + c.chMtx.Lock() + respCh := c.chs[idStr] + c.chMtx.Unlock() + resp := <-respCh // Close and delete channel c.chMtx.Lock() @@ -210,11 +214,14 @@ func (c *Client) handleConn() { continue } + c.chMtx.Lock() // Get channel from map, skip if it doesn't exist ch, ok := c.chs[resp.ID] if !ok { + c.chMtx.Unlock() continue } + c.chMtx.Unlock() // Send response to channel ch <- resp diff --git a/server/server.go b/server/server.go index 6a06843..033f632 100644 --- a/server/server.go +++ b/server/server.go @@ -280,6 +280,8 @@ func (s *Server) ServeWS(addr string, cf codec.CodecFunc) (err error) { // handleConn handles a listener connection func (s *Server) handleConn(c codec.Codec) { + codecMtx := &sync.Mutex{} + for { var call types.Request // Read request using codec @@ -322,11 +324,13 @@ func (s *Server) handleConn(c codec.Codec) { go func() { // For every value received from channel for val := range ctx.channel { + codecMtx.Lock() // Encode response using codec c.Encode(types.Response{ ID: ctx.channelID, Return: val, }) + codecMtx.Unlock() } // Cancel context @@ -336,15 +340,19 @@ func (s *Server) handleConn(c codec.Codec) { delete(s.contexts, ctx.channelID) s.contextsMtx.Unlock() + codecMtx.Lock() c.Encode(types.Response{ ID: ctx.channelID, ChannelDone: true, }) + codecMtx.Unlock() }() } // Encode response using codec + codecMtx.Lock() c.Encode(res) + codecMtx.Unlock() } } }