From 7592eae31810693b5e696ccb4370c11d79dabdaf Mon Sep 17 00:00:00 2001 From: Arsen Musayelyan Date: Sat, 6 Aug 2022 22:52:58 -0700 Subject: [PATCH] Handle requests concurrently --- lrpc_test.go | 2 +- server/server.go | 140 ++++++++++++++++++++++++----------------------- 2 files changed, 72 insertions(+), 70 deletions(-) diff --git a/lrpc_test.go b/lrpc_test.go index 586a048..72cb76a 100644 --- a/lrpc_test.go +++ b/lrpc_test.go @@ -122,7 +122,7 @@ func TestCodecs(t *testing.T) { if err != nil { t.Errorf("codec/%s: %v", name, err) } - + if add != 4 { t.Errorf("codec/%s: add: expected 4, got %d", name, add) } diff --git a/server/server.go b/server/server.go index f630ce7..f2e2af0 100644 --- a/server/server.go +++ b/server/server.go @@ -130,9 +130,8 @@ func (s *Server) execute(pCtx context.Context, typ string, name string, data []b if err != nil { return nil, nil, err } - + arg = argVal.Elem().Interface() - ctx = newContext(pCtx, c) // Get reflect value of context @@ -304,87 +303,90 @@ func (s *Server) handleConn(pCtx context.Context, c codec.Codec) { continue } - // Execute decoded call - val, ctx, err := s.execute( - pCtx, - call.Receiver, - call.Method, - call.Arg, - c, - ) - if err != nil { - s.sendErr(c, call, val, err) - } else { - valData, err := c.Marshal(val) + go func() { + // Execute decoded call + val, ctx, err := s.execute( + pCtx, + call.Receiver, + call.Method, + call.Arg, + c, + ) if err != nil { s.sendErr(c, call, val, err) - continue - } - - // Create response - res := types.Response{ - ID: call.ID, - Return: valData, - } - - // If function has created a channel - if ctx.isChannel { - idData, err := c.Marshal(ctx.channelID) + } else { + valData, err := c.Marshal(val) if err != nil { s.sendErr(c, call, val, err) - continue + return } - // Set IsChannel to true - res.Type = types.ResponseTypeChannel - // Overwrite return value with channel ID - res.Return = idData + // Create response + res := types.Response{ + ID: call.ID, + Return: valData, + } - // Store context in map for future use - s.contextsMtx.Lock() - s.contexts[ctx.channelID] = ctx - s.contextsMtx.Unlock() - - go func() { - // For every value received from channel - for val := range ctx.channel { - codecMtx.Lock() - - valData, err := c.Marshal(val) - if err != nil { - continue - } - - // Encode response using codec - c.Encode(types.Response{ - ID: ctx.channelID, - Return: valData, - }) - - codecMtx.Unlock() + // If function has created a channel + if ctx.isChannel { + idData, err := c.Marshal(ctx.channelID) + if err != nil { + s.sendErr(c, call, val, err) + return } - // Cancel context - ctx.cancel() - // Delete context from map + // Set IsChannel to true + res.Type = types.ResponseTypeChannel + // Overwrite return value with channel ID + res.Return = idData + + // Store context in map for future use s.contextsMtx.Lock() - delete(s.contexts, ctx.channelID) + s.contexts[ctx.channelID] = ctx s.contextsMtx.Unlock() - codecMtx.Lock() - c.Encode(types.Response{ - Type: types.ResponseTypeChannelDone, - ID: ctx.channelID, - }) - codecMtx.Unlock() - }() + go func() { + // For every value received from channel + for val := range ctx.channel { + codecMtx.Lock() + + valData, err := c.Marshal(val) + if err != nil { + continue + } + + // Encode response using codec + c.Encode(types.Response{ + ID: ctx.channelID, + Return: valData, + }) + + codecMtx.Unlock() + } + + // Cancel context + ctx.cancel() + // Delete context from map + s.contextsMtx.Lock() + delete(s.contexts, ctx.channelID) + s.contextsMtx.Unlock() + + codecMtx.Lock() + c.Encode(types.Response{ + Type: types.ResponseTypeChannelDone, + ID: ctx.channelID, + }) + codecMtx.Unlock() + }() + } + + // Encode response using codec + codecMtx.Lock() + c.Encode(res) + codecMtx.Unlock() } - // Encode response using codec - codecMtx.Lock() - c.Encode(res) - codecMtx.Unlock() - } + }() } }