最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Javascript (ES6) iterable stream - Stack Overflow

programmeradmin4浏览0评论

Is there a pattern for making a stream iterable using ES6 generators?

See 'MakeStreamIterable' below.

import {createReadStream} from 'fs'

let fileName = 'largeFile.txt'
let readStream = createReadStream(fileName, {
  encoding: 'utf8',
  bufferSize: 1024
})
let myIterableAsyncStream = MakeStreamIterable(readStream)

for (let data of myIterableAsyncStream) {
  let str = data.toString('utf8')
  console.log(str)
}

I'm not interested in co or bluebird's coroutine or blocking with deasync.

The gold is MakeStreamIterable should be a valid function.

Is there a pattern for making a stream iterable using ES6 generators?

See 'MakeStreamIterable' below.

import {createReadStream} from 'fs'

let fileName = 'largeFile.txt'
let readStream = createReadStream(fileName, {
  encoding: 'utf8',
  bufferSize: 1024
})
let myIterableAsyncStream = MakeStreamIterable(readStream)

for (let data of myIterableAsyncStream) {
  let str = data.toString('utf8')
  console.log(str)
}

I'm not interested in co or bluebird's coroutine or blocking with deasync.

The gold is MakeStreamIterable should be a valid function.

Share Improve this question asked Oct 2, 2015 at 7:09 JacobJacob 1,6722 gold badges17 silver badges27 bronze badges 6
  • How would you even define that (what is data...)? byte-by-byte (that's not what it looks like)? – Amit Commented Oct 2, 2015 at 7:23
  • 1 data is from readStream.on('data', data => ...) – Jacob Commented Oct 2, 2015 at 8:14
  • I cannot figure out a solution. It does not seem possible, as i see it. – Jacob Commented Oct 2, 2015 at 8:15
  • I don't think you can without co and the likes. Why are you against that? – Amit Commented Oct 2, 2015 at 8:33
  • 3 for...of is synchronous. You cannot iterate synchronously over an asynchronous data source. There was a proposal for async iterators but it was abandoned for github.com/zenparsing/es-observable . – Felix Kling Commented Oct 2, 2015 at 13:26
 |  Show 1 more comment

3 Answers 3

Reset to default 9

Is there a pattern for making a stream iterable using ES6 generators?

No, this cannot be achieved because generators are synchronous. They must know what they are yielding and when. Iteration of an asynchronous data source can only currently be achieved by using some kind of callback-based implementation. So, there is no way to make MakeStreamIterable 'a valid function' if what you mean by this is 'a valid function whose result can be given to a for-of loop'.

Streams are Asynchronous

A stream represents a potentially infinite amount of data received asynchronously over a potentially infinite amount of time. If we take a look at the definition of an iterator on MDN we can define in more detail what it is about a stream that makes it 'uniterable':

An object is an iterator when it knows how to access items from a collection one at a time, while keeping track of its current position within that sequence. In JavaScript an iterator is an object that provides a next() method which returns the next item in the sequence. This method returns an object with two properties: done and value.

(Emphasis is my own.)

Let's pick out the properties of an iterable from this definition. The object must...

  1. know how to access items from a collection one at a time;
  2. be able to keep track of its current position within the sequence of data;
  3. and provide a method, next, that retrieves an object with a property that holds the next value in the sequence or notifies that iteration is done.

A stream doesn't meet any of the above criteria because...

  1. it is not in control of when it receives data and cannot 'look into the future' to find the next value;
  2. it cannot know when or if it has received all data, only when the stream has closed;
  3. and it does not implement the iterable protocol and so does not expose a next method which a for-of can utilise.

______

Faking It(eration)

We can't actually iterate the data received from a stream (definitely not using a for-of), however we can build an interface that pretends to by using Promises (yay!) and abstracting away the stream's event handlers inside a closure.

// MakeStreamIterable.js
export default function MakeStreamIterable (stream) {
  let collection = []
  let index = 0
  let callback
  let resolve, reject

  stream
    .on('error', err => reject && reject(err))
    .on('end', () => resolve && resolve(collection))
    .on('data', data => {
      collection.push(data)

      try {
        callback && callback(data, index++)
      } catch (err) {
        this.end()
        reject(err)
      }
    })

  function each (cb) {
    if(callback) {
      return promise
    }

    callback = (typeof cb === 'function') ? cb : null

    if (callback && !!collection) {
        collection.forEach(callback)
        index = collection.length
    }

    return promise
  }

  promise = new Promise((res, rej) => {
    resolve = res
    reject = rej
  })

  promise.each = each

  return promise
}

And we can use it like this:

import {MakeStreamIterable} from './MakeStreamIterable'

let myIterableAsyncStream = MakeStreamIterable(readStream)

myIterableAsyncStream
  .each((data, i) => {
    let str = data.toString('utf8')
    console.log(i, str)
  })
  .then(() => console.log('completed'))
  .catch(err => console.log(err))

Things to note about this implementation:

  • It is not necessary to call each immediately on the 'iterable stream'.
  • When each is called, all values received prior to its call are passed to the callback one-by-one forEach-style. Afterwards all subsequent data are passed immediately to the callback.
  • The function returns a Promise which resolves the complete collection of data when the stream ends, meaning we actually don't have to call each at all if the method of iteration provided by each isn't satisfactory.
  • I have fostered the false semantics of calling this an iterator and am therefore an awful human being. Please report me to the relevant authority.

Soon you are going to be able to use Async Iterators and Generators. In node 9.8 you can use it by running with --harmony command line option.

async function* streamAsyncIterator(stream) {
  // Get a lock on the stream
  const reader = stream.getReader();

  try {
    while (true) {
      // Read from the stream
      const {done, value} = await reader.read();
      // Exit if we're done
      if (done) return;
      // Else yield the chunk
      yield value;
    }
  }
  finally {
    reader.releaseLock();
  }
}

async function example() {
  const response = await fetch(url);

  for await (const chunk of streamAsyncIterator(response.body)) {
    // …
  }
}

Thanks to Jake Archibald for the examples above.

2020 Update:

It looks like streams will be "natively" iterable in the future - just waiting on browsers to implement it:

  • https://streams.spec.whatwg.org/#rs-asynciterator
  • https://github.com/whatwg/streams/issues/778
  • https://bugs.chromium.org/p/chromium/issues/detail?id=929585
  • https://bugzilla.mozilla.org/show_bug.cgi?id=1525852
  • https://bugs.webkit.org/show_bug.cgi?id=194379
for await (const chunk of stream) {
 ...
}
发布评论

评论列表(0)

  1. 暂无评论