1
0
mirror of https://github.com/fluencelabs/js-libp2p synced 2025-03-31 23:01:04 +00:00
js-libp2p/doc/STREAMING_ITERABLES.md

165 lines
5.2 KiB
Markdown
Raw Normal View History

# Iterable Streams
> This document is a guide on how to use Iterable Streams in Libp2p. As a part of the [refactor away from callbacks](https://github.com/ipfs/js-ipfs/issues/1670), we have also moved to using Iterable Streams instead of [pull-streams](https://pull-stream.github.io/). If there are missing usage guides you feel should be added, please submit a PR!
## Table of Contents
- [Iterable Streams](#iterable-streams)
- [Table of Contents](#table-of-contents)
- [Usage Guide](#usage-guide)
- [Transforming Bidirectional Data](#transforming-bidirectional-data)
- [Iterable Stream Types](#iterable-stream-types)
- [Source](#source)
- [Sink](#sink)
- [Transform](#transform)
- [Duplex](#duplex)
- [Iterable Modules](#iterable-modules)
## Usage Guide
### Transforming Bidirectional Data
Sometimes you may need to wrap an existing duplex stream in order to perform incoming and outgoing [transforms](#transform) on data. This type of wrapping is commonly used in stream encryption/decryption. Using [it-pair][it-pair] and [it-pipe][it-pipe], we can do this rather easily, given an existing [duplex iterable](#duplex).
```js
import { duplexPair } from 'it-pair/duplex'
feat: convert to typescript (#1172) Converts this module to typescript. - Ecosystem modules renamed from (e.g.) `libp2p-tcp` to `@libp2p/tcp` - Ecosystem module now have named exports - Configuration has been updated, now pass instances of modules instead of classes: - Some configuration keys have been renamed to make them more descriptive. `transport` -> `transports`, `connEncryption` -> `connectionEncryption`. In general where we pass multiple things, the key is now plural, e.g. `streamMuxer` -> `streamMuxers`, `contentRouting` -> `contentRouters`, etc. Where we are configuring a singleton the config key is singular, e.g. `connProtector` -> `connectionProtector` etc. - Properties of the `modules` config key have been moved to the root - Properties of the `config` config key have been moved to the root ```js // before import Libp2p from 'libp2p' import TCP from 'libp2p-tcp' await Libp2p.create({ modules: { transport: [ TCP ], } config: { transport: { [TCP.tag]: { foo: 'bar' } }, relay: { enabled: true, hop: { enabled: true, active: true } } } }) ``` ```js // after import { createLibp2p } from 'libp2p' import { TCP } from '@libp2p/tcp' await createLibp2p({ transports: [ new TCP({ foo: 'bar' }) ], relay: { enabled: true, hop: { enabled: true, active: true } } }) ``` - Use of `enabled` flag has been reduced - previously you could pass a module but disable it with config. Now if you don't want a feature, just don't pass an implementation. Eg: ```js // before await Libp2p.create({ modules: { transport: [ TCP ], pubsub: Gossipsub }, config: { pubsub: { enabled: false } } }) ``` ```js // after await createLibp2p({ transports: [ new TCP() ] }) ``` - `.multiaddrs` renamed to `.getMultiaddrs()` because it's not a property accessor, work is done by that method to calculate announce addresses, observed addresses, etc - `/p2p/${peerId}` is now appended to all addresses returned by `.getMultiaddrs()` so they can be used opaquely (every consumer has to append the peer ID to the address to actually use it otherwise). If you need low-level unadulterated addresses, call methods on the address manager. BREAKING CHANGE: types are no longer hand crafted, this module is now ESM only
2022-03-28 14:30:27 +01:00
import { pipe } from 'it-pipe'
// Wrapper is what we will write and read from
// This gives us two duplex iterables that are internally connected
const [internal, external] = duplexPair()
// Now we can pipe our wrapper to the existing duplex iterable
pipe(
external, // The external half of the pair interacts with the existing duplex
outgoingTransform, // A transform iterable to send data through (ie: encrypting)
existingDuplex, // The original duplex iterable we are wrapping
incomingTransform, // A transform iterable to read data through (ie: decrypting)
external
)
// We can now read and write from the other half of our pair
pipe(
['some data'],
internal, // The internal half of the pair is what we will interact with to read/write data
async (source) => {
for await (const chunk of source) {
console.log('Data: %s', chunk.toString())
// > Data: some data
}
}
)
```
## Iterable Stream Types
These types are pulled from [@alanshaw's gist](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9) on streaming iterables.
### Source
A "source" is something that can be consumed. It is an iterable object.
```js
const ints = {
[Symbol.asyncIterator] () {
let i = 0
return {
async next () {
return { done: false, value: i++ }
}
}
}
}
// or, more succinctly using a generator and for/await:
const ints = (async function * () {
let i = 0
while (true) yield i++
})()
```
### Sink
A "sink" is something that consumes (or drains) a source. It is a function that takes a source and iterates over it. It optionally returns a value.
```js
const logger = async source => {
const it = source[Symbol.asyncIterator]()
while (true) {
const { done, value } = await it.next()
if (done) break
console.log(value) // prints 0, 1, 2, 3...
}
}
// or, more succinctly using a generator and for/await:
const logger = async source => {
for await (const chunk of source) {
console.log(chunk) // prints 0, 1, 2, 3...
}
}
```
### Transform
A "transform" is both a sink _and_ a source where the values it consumes and the values that can be consumed from it are connected in some way. It is a function that takes a source and returns a source.
```js
const doubler = source => {
return {
[Symbol.asyncIterator] () {
const it = source[Symbol.asyncIterator]()
return {
async next () {
const { done, value } = await it.next()
if (done) return { done }
return { done, value: value * 2 }
}
return () {
return it.return && it.return()
}
}
}
}
}
// or, more succinctly using a generator and for/await:
const doubler = source => (async function * () {
for await (const chunk of source) {
yield chunk * 2
}
})()
```
### Duplex
A "duplex" is similar to a transform but the values it consumes are not necessarily connected to the values that can be consumed from it. It is an object with two properties, `sink` and `source`.
```js
const duplex = {
sink: async source => {/* ... */},
source: { [Symbol.asyncIterator] () {/* ... */} }
}
```
## Iterable Modules
- [it-handshake][it-handshake] Handshakes for binary protocols with iterable streams.
- [it-length-prefixed][it-length-prefixed] Streaming length prefixed buffers with async iterables.
- [it-pair][it-pair] Paired streams that are internally connected.
- [it-pipe][it-pipe] Create a pipeline of iterables. Works with duplex streams.
- [it-pushable][it-pushable] An iterable that you can push values into.
- [it-reader][it-reader] Read an exact number of bytes from a binary, async iterable.
- [streaming-iterables][streaming-iterables] A Swiss army knife for async iterables.
[it-handshake]: https://github.com/jacobheun/it-handshake
[it-length-prefixed]: https://github.com/alanshaw/it-length-prefixed
[it-pair]: https://github.com/alanshaw/it-pair
[it-pipe]: https://github.com/alanshaw/it-pipe
[it-pushable]: https://github.com/alanshaw/it-pushable
[it-reader]: https://github.com/alanshaw/it-reader
[streaming-iterables]: https://github.com/reconbot/streaming-iterables