-
Notifications
You must be signed in to change notification settings - Fork 37.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Eclipse Jetty core HTTP handler adapter #32097
base: main
Are you sure you want to change the base?
Conversation
@gregw Please sign the Contributor License Agreement! Click here to manually synchronize the status of this Pull Request. See the FAQ for frequently asked questions. |
@pivotal-cla Working on getting a CCLA signed. Stand by.... |
.../main/java/org/springframework/web/reactive/function/server/DefaultServerRequestBuilder.java
Outdated
Show resolved
Hide resolved
@lachlan-roberts can you sign the individual CLA. |
This PR is failing 2 tests that undertow also fails: See #25310. |
@gregw Thank you for signing the Contributor License Agreement! |
...-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpResponse.java
Outdated
Show resolved
Hide resolved
// this.dataBuffer = dataBufferFactory.wrap(BufferUtil.copy(chunk.getByteBuffer())); // TODO this copy avoids multipart bugs | ||
this.dataBuffer = dataBufferFactory.wrap(chunk.getByteBuffer()); // TODO avoid double slice? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the work around for the failing multipart tests (see #25310)
...g-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpRequest.java
Outdated
Show resolved
Hide resolved
...g-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpRequest.java
Outdated
Show resolved
Hide resolved
...g-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpRequest.java
Outdated
Show resolved
Hide resolved
…pResponse Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
…HttpResponse Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
Signed-off-by: Lachlan Roberts <lachlan.p.roberts@gmail.com>
Signed-off-by: Lachlan Roberts <lachlan.p.roberts@gmail.com>
upgrade jetty
Signed-off-by: Lachlan Roberts <lachlan.p.roberts@gmail.com>
I have committed several other changes, see https://github.com/poutsma/spring-framework/commits/gh-32097/. The only test that fails is the
@gregw Does that ring any bell with you, or are we doing something wrong in the Spring code? I have also asked @rstoyanchev and @simonbasle to review (my branch of) the PR, so comments from them might be forthcoming. The next Spring Framework milestone (6.2.0-M4) is on June 13th. I am not sure if we will have resolved the test above by that point, so this PR might not make that milestone. The milestone after that is scheduled for July 11th. |
|
||
private final Flux<WebSocketMessage> flux; | ||
private final Sinks.One<CloseStatus> closeStatusSink = Sinks.one(); | ||
private final Lock lock = new ReentrantLock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This Lock
got my attention because pessimistic locking is usually not the first go-to method in Reactive Streams. Instead it's usually some sort of optimistic locking with a CAS loop.
That said, the lock protects 2 critical sections that represent quick state checks and updates. There is precedent for that sort of things, e.g. in Reactor
, so in that case I think it is fine (although care will need to be exercised if the code is modified or new usage of the lock is introduced).
boolean demand = false; | ||
this.lock.lock(); | ||
try { | ||
this.requested += n; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is more problematic, as n
can be an "unbounded request" (Integer.MAX_VALUE
) and this.requested
can overflow to negative.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.requested += n; | |
this.requested += n; | |
if (this.requested < 0L) { | |
this.requested = Long.MAX_VALUE; | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using Math.addExact(long,long)
is probably best.
try { | ||
this.requested += n; | ||
if (!this.awaitingMessage && this.requested > 0) { | ||
this.requested--; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not entirely sure how it translates in terms of getDelegate().demand()
calls, but if the requested amount is unbounded, this decrement shouldn't happen.
with the above suggested change, unbounded request amount (n == Integer.MAXVALUE
at any point) leads to this.requested == Long.MAX_VALUE
so if that's the case the this.requested--
decrement should be skipped.
} | ||
this.awaitingMessage = false; | ||
if (this.requested > 0) { | ||
this.requested--; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above, this should take the "unbounded request" scenario (in the Reactive Streams sense) into consideration
My apologies for being slow now... I've seen your reviews and will respond in the next few days. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a number of additional commits in the branch referenced from #32097 (comment), so do have a look there for the latest.
I've added a few comments and questions below. The only other thing to mention is we'll need a non-reactive variant of JettyWebSocketClient
in the spring-websocket
module for those who prefer not to use or have spring-webflux
on the classpath.
...-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpResponse.java
Outdated
Show resolved
Hide resolved
|
||
@Override | ||
protected Mono<Void> writeAndFlushWithInternal(Publisher<? extends Publisher<? extends DataBuffer>> body) { | ||
return Flux.from(body).flatMap(this::writeWithInternal, 1).then(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is supposed to flush after each nested Publisher
, e.g. to ensure events in SSE stream are sent at the right time. Is there anything to ensure that? Nothing obvious that I see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jetty writes at this level are non-buffering. Any aggregation that might require flushing is done at a higher level. Now, we might actually buffer (e.g. if sitting in a flow control congested HTTP/2 stream), but if so, then flushing is not going to help.
The only time we really need a flush semantic is if there is no content at all, then we will write an empty buffer, so that at least the HTTP response headers are generated etc. So perhaps this method needs to detect if the body was actually empty, in which case an empty buffer should be written
|
||
private final long length; | ||
|
||
private long totalRead = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All fields in this private IteratingCallback
are Jetty types. I'm wondering if it makes sense for this to be in Jetty for wider availability?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. It might be in a slightly different form... probably something like:
Content.copy(Content.Source.from(channel, position, count), this.response, callback);
I'll try to get that in this months release.
this.jettyServer.setHandler(createHandlerAdapter()); | ||
|
||
// TODO: We don't actually want the upgrade handler but this will create the WebSocketContainer. | ||
// This requires a change in Jetty. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be useful to add an issue link to track the change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* | ||
* @author Violeta Georgieva | ||
* @author Rossen Stoyanchev | ||
* @since 5.0 | ||
*/ | ||
public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Session> { | ||
@SuppressWarnings("NullAway") | ||
public class JettyWebSocketSession extends AbstractWebSocketSession<Session> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering about the choice to extend AbstractWebSocketSession
rather than AbstractListenerWebSocketSession
which implements the receiving logic. This relates to @simonbasle's question as well with the resulting implementation using a Lock
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lachlan-roberts
can you look at the websocket part of these review
@poutsma Our branches have diverged as I made the changes to use |
…ettyCoreHttpHandlerAdapter # Conflicts: # spring-web/src/main/java/org/springframework/http/server/reactive/DefaultServerHttpRequestBuilder.java # spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreHttpHandlerAdapter.java # spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpRequest.java # spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpResponse.java # spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java # spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java # spring-websocket/src/main/java/org/springframework/web/socket/adapter/jetty/JettyWebSocketHandlerAdapter.java
…ettyCoreHttpHandlerAdapter # Conflicts: # spring-web/src/main/java/org/springframework/http/server/reactive/DefaultServerHttpRequestBuilder.java # spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreHttpHandlerAdapter.java # spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpRequest.java # spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpResponse.java # spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java # spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java # spring-websocket/src/main/java/org/springframework/web/socket/adapter/jetty/JettyWebSocketHandlerAdapter.java
Actually, I'm not exactly sure why our branches were seen as divergent. I've merged yours back to mine and made sure that there are no differences, and also merged to lastest origin/main. |
I think it is likely a problem with the jetty ContentSourcePublisher, for which we already have a PR in review: jetty/jetty.project#11849
Agreed we are not making the 13th. We have a jetty release at the end of the month, so I will get #11849 merged for that and also look at change @rstoyanchev made. @lachlan-roberts can you look at the websocket reviews before end of month, so that any changes needed in next jetty release can be included. E.g. anything needed to get the Container without the Handler |
This provides an implementation of a HTTP Handler Adaptor that is coded directly to the Eclipse Jetty core API, bypassing any servlet implementation.
Fixes #32035