Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discussion: Improve consistency of stream interface #32

Open
timwis opened this issue Sep 17, 2019 · 7 comments
Open

Discussion: Improve consistency of stream interface #32

timwis opened this issue Sep 17, 2019 · 7 comments

Comments

@timwis
Copy link
Contributor

timwis commented Sep 17, 2019

At the moment:

  • export is an async method that takes one string param and returns a promise that resolves to a readable stream. Callers can then encode it to csv by piping the returned stream to a writable csv encoder stream.
  • import is an async method that takes several params, including a readable stream, and returns a promise (no resolved value applicable). CSV decoding happens within the method.
  • compare is an async method that takes two string params and returns a promise that resolves to an array of objects
  • merge is an async method that takes a few string params and returns a promise (no resolved value applicable)

The main issue I have is with where csv encoding/decoding happens (it's different in import vs export). I'm thinking it should always happen outside (at the server/cli level) and the methods should just deal with object streams.

But moving csv decoding to the server is a bit tricky because of the way we pass the readable stream to the import method, as an argument. The method then calls a couple async functions, and then attaches the .on('data') and .on('error') handlers.

The trouble is that the flow is activated back in server when we { data: req.body.pipe(csvParser()) }, and the error handler isn't attached until after some async functions, so early errors are missed. Normally a stream interface would look something like

const importStream = await gitSheets.import(opts)
req.body.pipe(csvParser()).pipe(importStream)

So that means the import method should probably take the opts and return a writable stream.

And while we're at it, compare may as well return a readable stream of the diffs since that's what export does. The combination of promises and streams can be a bit confusing, but the alternative of just dealing with promises and not streams makes us less equipped to deal with large CSV files (though maybe it's premature optimisation).

@timwis timwis changed the title Improve consistency of stream interface Discussion: Improve consistency of stream interface Sep 17, 2019
@themightychris
Copy link
Member

themightychris commented Sep 17, 2019

Would decorating a promise object with a steam under an added property be an option? I do something like that for git-client. So you'd do something like

const importPromise = await gitSheets.import(opts)
req.body.pipe(csvParser()).pipe(importPromise.stream)
const commitHash = await importPromise

Or instead

const { promise, stream } = await gitSheets.import(opts)
req.body.pipe(csvParser()).pipe(stream)
const commitHash = await promise

@timwis
Copy link
Contributor Author

timwis commented Sep 17, 2019

Hm, I'm not sure that would work the way you're describing. An async function always returns a promise that resolves to whatever the function returns (if it returns a promise, it returns the resolved value of that, recursively/flatly). And var x = await fn is the equivalent of fn.then((val) => x = val).

@themightychris
Copy link
Member

themightychris commented Sep 17, 2019

I believe either would work fine. async function can return normal values, the engine automatically wraps them in a resolved promise.

In the former case you'd just instantiate the promise object yourself:

async function import() {
  const stream = await prepareStream()
  const commitPromise = new Promise((resolve, reject) => {
    stream.on('close', async () => {
      await stuff()
      buildCommit().then(resolve).catch(reject)
  });

  commitPromise.stream = stream

  return commitPromise
}

const commitPromise = import() // stream is ready synchronously
commitPromise.stream.pipe(stuff)
const commitHash = await commitPromise

In the latter case you'd double-wrap the promise (this first one returned directly would be a promise for the import stream being ready, and the one within the compound return value would be a promise for when the import was done and a commit was ready:

async function import() {
  const stream = await prepareStream()
  const commitPromise = new Promise((resolve, reject) => {
    stream.on('close', async () => {
      await stuff()
      buildCommit().then(resolve).catch(reject)
  });

  commitPromise.stream = stream

  return { promise: commitPromise, stream }
}

const { promise: commitPromise, stream } = await import() // waits for stream to be ready
stream.pipe(stuff)
const commitHash = await commitPromise

I think it's important that the import method be able to return a resulting tree hash or commit hash, rather than just be a one-way pipe you throw data into, and then have to go fetch the result of separately with no strict guarantee that the state you get back is the direct result of the import

The later is probably a better way to do it, allowing for the "opening" of the import to be async, avoiding decorating any unexpected properties onto a promise object, allowing you to await the final result directly, and direct access to the stream

Another approach might be to wrap a subset of the stream API so you can't possibly do things out of order (like await the result before piping a stream in), but you loose the flexibility of having direct access to the stream API to, for example, write one chunk at a time:

async function import() {
  const stream = await prepareStream()
  const commitPromise = new Promise((resolve, reject) => {
    stream.on('close', async () => {
      await stuff()
      buildCommit().then(resolve).catch(reject)
  });

  return {
    fromStream: async (inputStream) => {
      stream.pipe(inputStream)
      return commitPromise
    },
    withStream: async (fn) => {
      fn(stream)
      return commitPromise
    }
  }
}

const myImport = await import() // waits for stream to be ready

const commitHash = await myImport.fromStream(stuff)
// or
const commitHash = await myImport.withStream(stream => {
  stream.pipe(stuff)
})

This offers a lot of flexibility but leaves you with this stateful object with ambiguous state, whereas in the middle example you're just getting a compound value containing a stream and a promise, which both have well-defined states

@timwis
Copy link
Contributor Author

timwis commented May 11, 2020

I've been knee-deep in stream land for the past few days and finally remember enough to be able to contribute to this conversation again.

What you've proposed may work, but it feels rather unconventional: I'm trying to think of another example of a promise that resolves to an object containing another promise that resolves later, let alone splitting the asynchronousness into a stream and a promise that relates to that stream. It just feels like we shouldn't need to be pioneers here...surely someone's encountered and solved this? 🤔 Unless you've seen this pattern elsewhere and I've just misunderstood?

I've been considering a few alternatives.

A. Use Node's pipeline utility instead of .pipe.

In this case, server.js and cli.js would do something like this:

const treeHash = await pipeline(
  payload,
  csvParser({ strict: true }),
  gitSheets.import({
    parentRef: ref,
    saveToBranch: branch
  })
)

This would require gitSheets.import() synchronously create/return a Writable stream. The tricky part with that is we need to do some async things before we can start writing (get the config and create a tree). We may be able to do that in some sort of init hook of the Writable stream, and have it not enter a 'ready' state until that's done.

You'll note that pipeline is a promise which returns the resolved value of the promise returned by the sink/destination.

This looks pretty nice, and cleanly solves the problem of keeping csv parsing/formatting in the same place for both import and export. But what about the consistency of the compare and merge methods? Should they return streams too, just to be consistent?

B. Async iterables

These are a relatively new feature for JavaScript, though the seeds have been around since 2015 with generators. Any object can implement the 'iterable' interface by adding a property with the key Symbol.iterator and the value being a method that returns { value: any, done: boolean }. More recently, the 'async iterable' interface was added, with the key Symbol.asyncIterator. And the 'for-await-of' syntax was added, allowing easy iteration. Streams in Node v10+ are automatically async iterables. Strings and arrays are also iterables. I feel like I've got my head 85% wrapped around this, but it seems to address a lot of the issues with mixing promises and streams. It's annoying to deal with interop but that's probably just because (a) I'm still not 100% around it, and (b) there's not a ton of great toolkits out there for it yet.

  async _writeDataToTree ({ data, treeObject, pathTemplate }) {
    const pendingWrites = [];

    for await (const row of data) {
      let path
      let contents

      try {
        path = this._renderTemplate(pathTemplate, row);
        contents = this._serialize(row);
      } catch (err) {
        throw new SerializationError(err.message);
      }

      pendingWrites.push(treeObject.writeChild(`${path}.toml`, contents));
    }

    await Promise.all(pendingWrites)
    return treeObject.write()
  }

Using a lib like streaming-iterables (below), this could be simplified further.

  async _writeDataToTree ({ data, treeObject, pathTemplate }) {
    function writeRow (row) {
      const path = this._renderTemplate(pathTemplate, row);
      const contents = this._serialize(row);
      return treeObject.writeChild(`${path}.toml`, contents);
    }

    await pipeline(
      () => data,
      map(writeRow) // could also use parallelMap
    )
    return treeObject.write()
  }

C. Helper libraries

I've come across a few stream toolkits that make this all look much nicer and even improves the semantics of mixing streams with promises, though they all tend to carry some of their own complexity around inter-op. I may just not have cracked it yet.

@themightychris
Copy link
Member

Re:

This would require gitSheets.import() synchronously create/return a Writable stream.

Why would it be any of pipeline's concern whether we get it the stream synchronously or asynchronously? If it doesn't support receiving a promise, we can just prefix the arg with await methinks:

const treeHash = await pipeline(
  payload,
  csvParser({ strict: true }),
  await gitSheets.import({
    parentRef: ref,
    saveToBranch: branch
  })
)

Then gitSheets.import just needs to return either a stream or a promise that resolves to a stream

@timwis
Copy link
Contributor Author

timwis commented May 11, 2020

Hm, I forget.. will JavaScript await the argument before it runs the surrounding pipeline function in that example? If not, it would be passing pipeline an argument of type Promise. Anyway if that were the case, we could just do const writeStream = await gitSheets.import() before calling pipeline. You're right.

What would be even better is if pipeline simply worked with async/promisy arguments, and it just may, particularly if it's an async generator / async iterable.

@themightychris
Copy link
Member

(await anything()) will evaluate to the resolved value, whatever sort of expression is outside that will have no idea that async happened, it'll be as if the whole statement was deferred inside a callback

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants