Converts this module to typescript. - Ecosystem modules renamed from (e.g.) `libp2p-tcp` to `@libp2p/tcp` - Ecosystem module now have named exports - Configuration has been updated, now pass instances of modules instead of classes: - Some configuration keys have been renamed to make them more descriptive. `transport` -> `transports`, `connEncryption` -> `connectionEncryption`. In general where we pass multiple things, the key is now plural, e.g. `streamMuxer` -> `streamMuxers`, `contentRouting` -> `contentRouters`, etc. Where we are configuring a singleton the config key is singular, e.g. `connProtector` -> `connectionProtector` etc. - Properties of the `modules` config key have been moved to the root - Properties of the `config` config key have been moved to the root ```js // before import Libp2p from 'libp2p' import TCP from 'libp2p-tcp' await Libp2p.create({ modules: { transport: [ TCP ], } config: { transport: { [TCP.tag]: { foo: 'bar' } }, relay: { enabled: true, hop: { enabled: true, active: true } } } }) ``` ```js // after import { createLibp2p } from 'libp2p' import { TCP } from '@libp2p/tcp' await createLibp2p({ transports: [ new TCP({ foo: 'bar' }) ], relay: { enabled: true, hop: { enabled: true, active: true } } }) ``` - Use of `enabled` flag has been reduced - previously you could pass a module but disable it with config. Now if you don't want a feature, just don't pass an implementation. Eg: ```js // before await Libp2p.create({ modules: { transport: [ TCP ], pubsub: Gossipsub }, config: { pubsub: { enabled: false } } }) ``` ```js // after await createLibp2p({ transports: [ new TCP() ] }) ``` - `.multiaddrs` renamed to `.getMultiaddrs()` because it's not a property accessor, work is done by that method to calculate announce addresses, observed addresses, etc - `/p2p/${peerId}` is now appended to all addresses returned by `.getMultiaddrs()` so they can be used opaquely (every consumer has to append the peer ID to the address to actually use it otherwise). If you need low-level unadulterated addresses, call methods on the address manager. BREAKING CHANGE: types are no longer hand crafted, this module is now ESM only
5.2 KiB
Iterable Streams
This document is a guide on how to use Iterable Streams in Libp2p. As a part of the refactor away from callbacks, we have also moved to using Iterable Streams instead of pull-streams. If there are missing usage guides you feel should be added, please submit a PR!
Table of Contents
Usage Guide
Transforming Bidirectional Data
Sometimes you may need to wrap an existing duplex stream in order to perform incoming and outgoing transforms on data. This type of wrapping is commonly used in stream encryption/decryption. Using it-pair and it-pipe, we can do this rather easily, given an existing duplex iterable.
const duplexPair from 'it-pair/duplex')
import { pipe } from 'it-pipe'
// Wrapper is what we will write and read from
// This gives us two duplex iterables that are internally connected
const [internal, external] = duplexPair()
// Now we can pipe our wrapper to the existing duplex iterable
pipe(
external, // The external half of the pair interacts with the existing duplex
outgoingTransform, // A transform iterable to send data through (ie: encrypting)
existingDuplex, // The original duplex iterable we are wrapping
incomingTransform, // A transform iterable to read data through (ie: decrypting)
external
)
// We can now read and write from the other half of our pair
pipe(
['some data'],
internal, // The internal half of the pair is what we will interact with to read/write data
async (source) => {
for await (const chunk of source) {
console.log('Data: %s', chunk.toString())
// > Data: some data
}
}
)
Iterable Stream Types
These types are pulled from @alanshaw's gist on streaming iterables.
Source
A "source" is something that can be consumed. It is an iterable object.
const ints = {
[Symbol.asyncIterator] () {
let i = 0
return {
async next () {
return { done: false, value: i++ }
}
}
}
}
// or, more succinctly using a generator and for/await:
const ints = (async function * () {
let i = 0
while (true) yield i++
})()
Sink
A "sink" is something that consumes (or drains) a source. It is a function that takes a source and iterates over it. It optionally returns a value.
const logger = async source => {
const it = source[Symbol.asyncIterator]()
while (true) {
const { done, value } = await it.next()
if (done) break
console.log(value) // prints 0, 1, 2, 3...
}
}
// or, more succinctly using a generator and for/await:
const logger = async source => {
for await (const chunk of source) {
console.log(chunk) // prints 0, 1, 2, 3...
}
}
Transform
A "transform" is both a sink and a source where the values it consumes and the values that can be consumed from it are connected in some way. It is a function that takes a source and returns a source.
const doubler = source => {
return {
[Symbol.asyncIterator] () {
const it = source[Symbol.asyncIterator]()
return {
async next () {
const { done, value } = await it.next()
if (done) return { done }
return { done, value: value * 2 }
}
return () {
return it.return && it.return()
}
}
}
}
}
// or, more succinctly using a generator and for/await:
const doubler = source => (async function * () {
for await (const chunk of source) {
yield chunk * 2
}
})()
Duplex
A "duplex" is similar to a transform but the values it consumes are not necessarily connected to the values that can be consumed from it. It is an object with two properties, sink
and source
.
const duplex = {
sink: async source => {/* ... */},
source: { [Symbol.asyncIterator] () {/* ... */} }
}
Iterable Modules
- it-handshake Handshakes for binary protocols with iterable streams.
- it-length-prefixed Streaming length prefixed buffers with async iterables.
- it-pair Paired streams that are internally connected.
- it-pipe Create a pipeline of iterables. Works with duplex streams.
- it-pushable An iterable that you can push values into.
- it-reader Read an exact number of bytes from a binary, async iterable.
- streaming-iterables A Swiss army knife for async iterables.