Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stream and try_stream functions #74

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

SabrinaJewson
Copy link
Contributor

@SabrinaJewson SabrinaJewson commented May 25, 2022

Example usage:

let s = stream(|stream| async move {
    for i in 0..3 {
        stream.yield_item(i).await;
    }
});

Advantages of using the new API:

  1. Type annotations with stream::<u32, _, _> or try_stream::<u32, io::Error, _, _> or |stream: async_stream::Stream<u32>|
  2. Better IDE support
  3. Works better with rustfmt
  4. Reduces dependencies by avoiding depending on syn
  5. Better documentation

Disadvatanges of using the new API:

  1. It’s more verbose (stream.yield_item(i).await versus yield i;)
  2. It’s easy to misuse if the stream variable is moved outside the stream, used inside join! or select! or FuturesUnordered etc

Open questions:

  • What should the parameter be named? Right now I’m calling it stream and Stream which I was inspired by thread::scope for, but there may be a better name.

closes #71, closes #56, closes #63, closes #64 (latter two are via #56)
Might close, if new API is considered sufficient: #33, #68

@taiki-e
Copy link
Member

taiki-e commented Jul 7, 2022

Disadvatanges of using the new API:

I think the lack of the for await syntax would also be included in this list.

@taiki-e
Copy link
Member

taiki-e commented Jul 7, 2022

cc @carllerche @Darksonn @Kestrer: any thoughts on this?

@Darksonn
Copy link

Darksonn commented Jul 9, 2022

It’s easy to misuse if the stream variable is moved outside the stream, used inside join! or select! or FuturesUnordered etc

I would want a better understanding of how it fails if used in these.

@SabrinaJewson
Copy link
Contributor Author

The following code is incorrect:

join!(
    async { stream.yield_item(1).await; },
    async { stream.yield_item(2).await; },
);

because it will result in two items being yielded within the same poll, which results in a panic.

For the select!-like case, code is incorrect:

try_join!(
    async { stream.yield_item(1).await; },
    async { something_that_fails()?; Ok(()) },
)?;

Because an item will be yielded but then the try_join! will exit before the Pending gets propagated, so the outer task will exit and that will panic.

Of course, making these operations panic is a choice. Alternatives to the first case include:

  • Silently dropping the extra values
  • Storing a Vec of all the values each poll
  • Having yield_item take &mut self

For the second case, we could also just obediently yield the item.

@Darksonn
Copy link

Darksonn commented Jul 9, 2022

Having this fail seems pretty confusing. I would certainly prefer a version where this works. Perhaps yield_item could do a yield_now if it is unable to yield it immediately?

join!(
    async { stream.yield_item(1).await; },
    async { stream.yield_item(2).await; },
);

@SabrinaJewson
Copy link
Contributor Author

SabrinaJewson commented Jul 9, 2022

Oh wait, I was mistaken — yield_item already does take &mut self, meaning that the join! example woudn’t even compile. In that case there’s much less of a problem, the only issue can arise when cloning the Stream variable:

let stream_2 = stream.clone();
join!(
    async { stream.yield_item(1).await; },
    async { stream_2.yield_item(2).await; },
);

and this code would panic. So, we could remove the Clone implementation — I kept it in there just because I could, I don’t really know how useful it’d be.

Another option is, as you said, to have yield_item take &self and yield if it can’t give the item immediately. It boils down to whether we want to support code like that (yielding in multiple join branches). With the current macro it’s not possible, so maybe it’s fine to keep it that way.

@mohe2015
Copy link

This is so much more user friendly. I would love to see this get merged! I don't know what the next steps should be but maybe you could resolve the conflicts please?

@ghost
Copy link

ghost commented Oct 14, 2022

Would love to see this added.

@mohe2015
Copy link

I don't know if this would be possible but it seems like it's not Send:

error: future cannot be sent between threads safely
   --> backend-rust/src/bin/server/s_setup.rs:180:24
    |
180 |     Ok(StreamBody::new(stream).into_response())
    |                        ^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl FusedStream + futures_util::Stream<Item = Result<axum::body::Bytes, anyhow::Error>>`, the trait `std::marker::Send` is not implemented for `NonNull<()>`
note: future is not `Send` as this value is used across an await
   --> /home/moritz/.cargo/git/checkouts/async-stream.rs-d36ba58420dc21e4/d5e01e5/async-stream/src/functions.rs:313:28
    |
300 |         let mut option_stream = CURRENT_CONTAINING_STREAM.with(Cell::get);
    |             ----------------- has type `std::option::Option<async_stream::functions::ContainingStream>` which is not `Send`
...
313 |         PendOnce::default().await;
    |                            ^^^^^^ await occurs here, with `mut option_stream` maybe used later
314 |     }
    |     - `mut option_stream` is later dropped here
note: required by a bound in `StreamBody::<S>::new`
   --> /home/moritz/.cargo/registry/src/github.com-1ecc6299db9ec823/axum-0.6.0/src/body/stream_body.rs:78:24
    |
78  |         S: TryStream + Send + 'static,
    |                        ^^^^ required by this bound in `StreamBody::<S>::new`


@SabrinaJewson
Copy link
Contributor Author

You’re right — I have now pushed a fix and tests for that.

@mohe2015
Copy link

Is there interest at all by the maintainers to integrate this? If so what would the steps be? Maybe check/reduce breakage of existing code and document an upgrade path? (Also merge conflicts again)

I looked over the code and it didn't look too bad. There are a few things I'm not familiar with though like manual future implementations.

@mohe2015
Copy link

Maybe the macro feature should be enabled by default to aid in compatiblity?

@jakajancar
Copy link

Another upside of this approach:

Unlike yield, allows defining of a custom Ext trait. E.g. yield Message::new("content").boxed() can be stream.message("content").

@Nugine
Copy link

Nugine commented Feb 2, 2023

I suggest a shorter name stream.yield_(expr), since we already know the expression is an item of the stream.

@nurmohammed840
Copy link

nurmohammed840 commented Jun 21, 2023

Note that this following code is incorrect:

Case I

tokio::task(async move {
    stream.yield_item(1).await;
});

Case II

let _ = || async move {
    stream.yield_item(1).await;
};

Case III

let mut _stream = None;
let s = stream(|stream| {
   _stream = Some(stream);
    async {}
});

And much more...

Solution

Instead of move ing s into async block, Just return it.

stream|mut s| async {
    s.yield_(42).await;
    return (s, "42");
});

I create an alternative to this crate called async-gen, which supports these feature.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants