Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Codec Error: Broken Pipe #153

Open
apowers313 opened this issue Dec 28, 2021 · 12 comments
Open

Codec Error: Broken Pipe #153

apowers313 opened this issue Dec 28, 2021 · 12 comments

Comments

@apowers313
Copy link

When sending multiple packets in a relatively short period of time, it appears that sometimes the underlying communication pipe breaks. The error is Codec Error: Broken pipe (os error 32). Here's a log with two examples: one, two. Here's a demo video so you can get a sense of timing.

Not sure if this is somehow related to #151 or not.

@Alexei-Kornienko
Copy link
Collaborator

Not sure that I have enough information to help..

  1. If I understand correctly we have a Dealer socket used as a server here - https://github.com/denoland/deno/pull/13122/files#diff-7d5254b223064169161752a0a7fa2a8d4df60c99444ecb71637f497c8292714cR57
  2. Rep/Router socket connects to it from somewhere.. Is it also using this library? or it's a some kind of third party client?
  3. underlying socket connection is closed for whatever reason.

According to ZMQ spec client (the side that does connect call) is responsible for reconnecting. The server side:

SHALL create a double queue when a peer connects to it. If this peer disconnects, the DEALER socket SHALL destroy its double queue and SHALL discard any messages it contains.

So in terms of application logic it seams reasonable to catch this error and resend the same message to other clients if there are any. Or maybe wait for some short period of time to allow the same client to reconnect and resend the message again to the same client. So I would say that in such case you should implement a small loop with back-off that would be responsible for resending messages in case of such errors.
Please note that there are 2 possible errors in such scenario:

  1. there are no peers connected to socket - in such case socket will return you ownership of the message so you could resend it without cloning
  2. Broken pipe error - in such case message may be partially send down the wire so it's state is unknown. You would have to create a new message and send it again. Zmq internally doesn't clone it cause it would mean additional overhead for normal flow

@apowers313
Copy link
Author

Thanks for the quick reply. :)

  1. Yes, specifically shell_comm here and sending messages is here
  2. Router socket is in jupyter_client, part of the main Jupyter distribution, which appears to be using pyzmq from the main ZMQ distribution.
  3. This is part of what's confusing to me -- there's no reason that the underlying socket should be closed on either side. Sending the same sequence of packets from other kernels (IPython, IJavascript, IDeno Typescript) doesn't cause the connection to close.

Seems like the initial connection and first few packets work okay, it's an intermittent error. After re-connecting and re-sending it seems to work for another handful of packets before the pipe breaks again.

@Alexei-Kornienko
Copy link
Collaborator

  1. Are you sure that second link is pointing to the right line? Cause that one is PubSocket as far as I can see:
pub struct PubComm {
  conn_str: String,
  session_id: String,
  hmac_key: hmac::Key,
  socket: zeromq::PubSocket,
}

Pub socket has different behaviour and has this error handled internally - https://github.com/zeromq/zmq.rs/blob/master/src/pub.rs#L179
This must be Dealer socket

  1. Ok in such case we may assume that client side is working correctly (However I do know that there are some issues in pyzmq also)
  2. This is the most interesting part.. I guess it needs further investigation.. Is there a chance that you can record network traffic between the sockets and upload a dump? I guess we would have to compare traffic between some other kernel and rust zmq to check for possible issues in terms of socket options or anything..

@apowers313
Copy link
Author

apowers313 commented Dec 29, 2021

Sorry, you're right. Second link should have been here.

Wireshark capture attached with one good sequence (execute_request + execute_reply) followed by one failed sequence. It not obvious to me that there is any difference between the packets, although it's easy to see the TCP RST from Jupyter after the failed sequence.

@apowers313
Copy link
Author

If I drop a debug message in the notebook application where it receives ZMQ messages I can see that half the messages never make it up the stack -- and it seems to be a pattern of every-other packet is dropped (sometimes the pipe closes when a message is dropped, sometimes it doesn't). After about 8 messages, the dropped messages stop and it seems to work fine from there on out.

A partial stack trace from the app (most recent call in the stack at the bottom) is:

[...]
File "/usr/local/Cellar/python@3.9/3.9.9/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/events.py", line 80, in _run
self._context.run(self._callback, *self._args)
File "/usr/local/Cellar/jupyterlab/3.2.4/libexec/lib/python3.9/site-packages/tornado/platform/asyncio.py", line 189, in _handle_events
handler_func(fileobj, events)
File "/usr/local/Cellar/jupyterlab/3.2.4/libexec/lib/python3.9/site-packages/zmq/eventloop/zmqstream.py", line 452, in _handle_events
self._handle_recv()
File "/usr/local/Cellar/jupyterlab/3.2.4/libexec/lib/python3.9/site-packages/zmq/eventloop/zmqstream.py", line 481, in _handle_recv
self._run_callback(callback, msg)
File "/usr/local/Cellar/jupyterlab/3.2.4/libexec/lib/python3.9/site-packages/zmq/eventloop/zmqstream.py", line 431, in _run_callback
callback(*args, **kwargs)
File "/usr/local/Cellar/jupyterlab/3.2.4/libexec/lib/python3.9/site-packages/zmq/eventloop/zmqstream.py", line 189, in
self.on_recv(lambda msg: callback(self, msg), copy=copy)
File "/usr/local/Cellar/jupyterlab/3.2.4/libexec/lib/python3.9/site-packages/notebook/services/kernels/handlers.py", line 453, in _on_zmq_reply
traceback.print_stack()

It seems most likely that the packets are getting dropped somewhere in zmq/eventloop/zmqstream.py but I can't figure out where...

@Alexei-Kornienko
Copy link
Collaborator

I'll be able to take a look later this week. Currently busy with other stuff..

@Alexei-Kornienko
Copy link
Collaborator

Alexei-Kornienko commented Jan 2, 2022

Ok. so I've done some debugging and checked the network dump you provided. I've run both packets through a decoder and both of them contain valid messages:
1.

ZmqMessage {
                frames: [
                    b"<IDS|MSG>",
                    b"298af1ece920f5767ea29487ba61dd46a54cc412be020391a9e81ce5c24809e0",
                    b"{\"msg_id\":\"65c69b01-cf4b-45c2-aaea-22c0ce2e11e3\",\"session\":\"02db5f1d-8e5c-4dda-80d4-03768544eaa8\",\"username\":\"<TODO>\",\"date\":\"2021-12-29T04:59:39.593537+00:00\",\"msg_type\":\"execute_reply\",\"version\":\"5.3\"}",
                    b"{\"msg_id\":\"9e03610672b94399a4421e97303302ab\",\"session\":\"b121993df25a42d897a5e1c83b0c7afe\",\"username\":\"username\",\"date\":\"1970-01-01T00:00:00+00:00\",\"msg_type\":\"execute_request\",\"version\":\"5.2\"}",
                    b"{}",
                    b"{\"status\":\"ok\",\"execution_count\":1,\"payload\":[],\"user_expressions\":{}}",
                ],
            },

ZmqMessage {
                frames: [
                    b"<IDS|MSG>",
                    b"a94d56e04485350811eab0cf70b4d5cf793e41e88ac0fc9dbc4eb2b8aa6d650f",
                    b"{\"msg_id\":\"c84fb937-31bf-4c5b-8d09-8e5c20c44366\",\"session\":\"02db5f1d-8e5c-4dda-80d4-03768544eaa8\",\"username\":\"<TODO>\",\"date\":\"2021-12-29T04:59:43.074813+00:00\",\"msg_type\":\"execute_reply\",\"version\":\"5.3\"}",
                    b"{\"msg_id\":\"28df51c4093143d399114fd349d509f4\",\"session\":\"b121993df25a42d897a5e1c83b0c7afe\",\"username\":\"username\",\"date\":\"1970-01-01T00:00:00+00:00\",\"msg_type\":\"execute_request\",\"version\":\"5.2\"}",
                    b"{}",
                    b"{\"status\":\"ok\",\"execution_count\":2,\"payload\":[],\"user_expressions\":{}}",
                ],
            },

So this issue is not related with messages formed incorrectly or damaged.
1 point that I see is that these 2 packets are sent to different clients (judging by the port numbers):
3 0.013162 127.0.0.1 127.0.0.1 TCP 608 55164 → 55185 [PSH, ACK] Seq=1 Ack=380 Win=6367 Len=552 TSval=636178270 TSecr=636178258
7 3.494483 127.0.0.1 127.0.0.1 TCP 608 55164 → 55188 [PSH, ACK] Seq=1 Ack=1 Win=6363 Len=552 TSval=636181724 TSecr=636073590

My thoughts on this situation is the following:
IMHO we should split this issue in 3 different problems and handle them separately:

  1. If we would use dealer as client socket I would agree that this issue needs to be handled by the socket itself (by doing a reconnect reconnect #143). Or this behaviour should be configurable and you would decide between implicit reconnect attempts (and you client code blocked on the socket) or explicit error

  2. When the socket is used as a server returning this error here is completely valid cause this behaviour is fixed in the spec:

SHALL block on sending, or return a suitable error, when it has no available peers.
SHALL NOT discard messages that it cannot queue.

So according to the spec you should have some logic in your application code that would handle this error appropriately (by waiting and resending the message or passing this error on a higher level)

  1. Debug the reason why client closes the connection. This part is mostly related to jupyter client and possibly pyzmq library (I can help here with some advice but I won't be able to spend a lot of time on this.)

@apowers313
Copy link
Author

Good catch on the port numbers, I think that's symptomatic of the problem. I would think that's because the TCP connection dropped and the client opened up a new connection with an ephemeral port. It's odd that we don't see a new TCP SYN / ACK sequence and that the new port issues a TCP RST indicating that it wasn't expecting packets on that port. I'm not sure why our ZMQ socket would suddenly switch ports without a new TCP connection?

According to the Jupyter spec their client is using a Dealer socket.

@Alexei-Kornienko
Copy link
Collaborator

I've took a quick look on the specs and it seems that you might be missing some parts in your implementation:

For example the doc says:

Note
The stdin socket of the client is required to have the same zmq IDENTITY as the client’s shell socket. Because of this, the input_request must be sent with the same IDENTITY routing prefix as the execute_reply in order for the frontend to receive the message.

As far as I see from your code you just create sockets with default parameters and the get random UUIDs as identities. You can check this example to see how to assign specific identity to your sockets - https://github.com/zeromq/zmq.rs/blob/master/examples/socket_client_with_options.rs#L10

I guess it might be reasonable to check correct implementation in Python and see how it's built

@bartlomieju
Copy link

@Alexei-Kornienko thanks for help in debugging and pointing out problem with IDENTITY. The latter is now fixed.

I'm still getting BrokenPipe error fairly often (usually 2-3 times when running a notebook with about 25 cells), however as pointed by @apowers313 they stop after a few messages are exchanged. I this we will have to wait for #143 so these errors could be handled gracefully and responses resent when the client reconnects.

@Alexei-Kornienko
Copy link
Collaborator

@bartlomieju you use dealer socket to bind a port so it acts as a server. Client (pyzmq) is responsible for reconnect so you will still have this errors (in theory) so you just need to handle them somehow. Dealer socket (when acting as a server) will not handle this for you cause it doesn't have any guarantees that client will ever reconnect.

@bartlomieju
Copy link

@bartlomieju you use dealer socket to bind a port so it acts as a server. Client (pyzmq) is responsible for reconnect so you will still have this errors (in theory) so you just need to handle them somehow. Dealer socket (when acting as a server) will not handle this for you cause it doesn't have any guarantees that client will ever reconnect.

Thanks you are obviously right. I will keep digging how that situation is handled in other kernels.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants