Skip to content

basic support bidirectional stream#9

Open
thesyncim wants to merge 4 commits intomasterfrom
bidiretional-stream
Open

basic support bidirectional stream#9
thesyncim wants to merge 4 commits intomasterfrom
bidiretional-stream

Conversation

@thesyncim
Copy link
Copy Markdown
Owner

No description provided.

Comment thread client.go
return 1
}

func (c *Client) CallStream(opname string, req, resp Message, handleStream func(client *StreamClient) error) error {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

undeclared name: StreamClient

Comment thread server.go Outdated
}

func (s *Server) handleRequest(wi *serverWorkItem, pendingResponses chan<- *serverWorkItem, stopCh <-chan struct{}) {
func (s *Server) handleRequest(wi *serverWorkItem, pendingResponses chan<- *serverWorkItem, stream *StreamServer, stopCh <-chan struct{}) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

undeclared name: StreamServer

Comment thread client.go Outdated
//
// This saves memory and CPU resources.

s := NewStreamClient(sid, xxhash.Sum64String(opname), c.opts.Codec, in, func(id uint32, m *request, resp chan error) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

undeclared name: NewStreamClient

Comment thread server.go
continue
}

var stream *StreamServer
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

undeclared name: StreamServer

Comment thread server.go Outdated
in := make(chan *request, 1)
streamID := binary.BigEndian.Uint32(wi.streamID[:])
s.inStreamMsg[streamID] = in
stream = NewServerStream(streamID, s.opts.Codec, in, func(id uint32, m *response, resp chan error) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

undeclared name: NewServerStream

Comment thread stream_test.go
var request = strconv.Itoa(t.N / 2)
var response = ""

err := c.CallStream("stream", &request, &response, func(client *StreamClient) error {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

undeclared name: StreamClient

Comment thread stream_test.go Outdated
t.ResetTimer()
t.ReportAllocs()

err := client.SendMsg(request)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invalid operation: client (variable of type *invalid type) has no field or method SendMsg

Comment thread stream_test.go
var r string

for i := 0; i < t.N/2; i++ {
err = client.RecvMsg(&r)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invalid operation: client (variable of type *invalid type) has no field or method RecvMsg

Comment thread handler.go
reply := opinfo.ReplyType()

if err = handler(nil, args, reply); err != nil {
if err = handler(&Context{stream}, args, reply); err != nil {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

github.com/thesyncim/exposed.Context composite literal uses unkeyed fields

Comment thread client.go
ReleaseResponse(rawResp)
releaseClientWorkItem(wi)
c.decPendingRequests()
c.getError(err)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error return value of c.getError is not checked

Comment thread client.go
ReleaseResponse(rawResp)
releaseClientWorkItem(wi)
c.decPendingRequests()
c.getError(err)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error return value of c.getError is not checked

Comment thread client.go
ReleaseResponse(rawResp)
releaseClientWorkItem(wi)
c.decPendingRequests()
c.getError(err)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error return value of c.getError is not checked

Comment thread stream.go
codec encoding.Codec

inMessages chan *request
errOutCh chan error
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errOutCh is unused

Comment thread stream.go

type StreamClient struct {
ID uint32
isServer bool
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isServer is unused

Comment thread stream.go Outdated
codec encoding.Codec

inMessages <-chan *response
errOutCh chan error
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errOutCh is unused

Comment thread client.go
pendingResponsesLock sync.Mutex

incomingStreamMsg sync.Map
incomingStreamMsgLock sync.Mutex
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incomingStreamMsgLock is unused

Comment thread server.go Outdated
}

var (
typeUnarycall = []byte{byte(0)}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typeUnarycall is unused

Comment thread stream_test.go
if err != nil {
t.Fatal(err)
}
if i == 99 {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty branch

make stream thread safe
improve and reorganize code
Comment thread server.go
close(stopCh)
inStreamMsg.Range(func(key, value interface{}) bool {
panic("das")
close(value.(chan *request))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unreachable code

Comment thread server.go
conn.Close()
inStreamMsg.Range(func(key, value interface{}) bool {
panic("das")
close(value.(chan *request))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unreachable code

Comment thread protocol.go
streamMessage = packetControl(1)
streamStart = packetControl(2)
///todo implement
streamClose = packetControl(3)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamClose is unused

Comment thread protocol.go
streamStart = packetControl(2)
///todo implement
streamClose = packetControl(3)
streamCloseRead = packetControl(4)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamCloseRead is unused

Comment thread protocol.go
///todo implement
streamClose = packetControl(3)
streamCloseRead = packetControl(4)
streamCloseWrite = packetControl(5)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamCloseWrite is unused

Comment thread stream.go
inMessages <-chan *response
serverOutMessages chan<- WorkItem

errOutCh chan error
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errOutCh is unused

Comment thread stream.go
codec encoding.Codec

inMessages <-chan *response
serverOutMessages chan<- WorkItem
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serverOutMessages is unused

Comment thread server.go
reqID [4]byte
type serverUnaryWorkItem struct {
ctx *exposedCtx
startStream bool
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startStream is unused

Comment thread stream_test.go
if err != nil {
t.Fatal(err)
}
if i == 99 {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty branch

Comment thread client.go
})
sid := c.nextStreamID()
inStream, _ := c.incomingStreamMsg.LoadOrStore(sid, make(chan *response, 100))
var in chan *response
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should merge variable declaration with assignment on next line

@codecov
Copy link
Copy Markdown

codecov Bot commented Aug 7, 2018

Codecov Report

Merging #9 into master will decrease coverage by 13.76%.
The diff coverage is 24.73%.

Impacted file tree graph

@@             Coverage Diff             @@
##           master       #9       +/-   ##
===========================================
- Coverage   66.84%   53.08%   -13.77%     
===========================================
  Files           9       10        +1     
  Lines         926     1217      +291     
===========================================
+ Hits          619      646       +27     
- Misses        239      487      +248     
- Partials       68       84       +16
Impacted Files Coverage Δ
register.go 0% <ø> (ø) ⬆️
handler.go 37.5% <0%> (ø) ⬆️
stream.go 0% <0%> (ø)
client.go 51.41% <25.5%> (-15.96%) ⬇️
server.go 57% <35.96%> (-16.12%) ⬇️
request.go 71.79% <0%> (-5.13%) ⬇️
common.go 65.49% <0%> (-1.17%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 61b3f89...878b31e. Read the comment docs.

Repository owner deleted a comment from yuvicc Feb 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants