feat(pull): migration to pull streams. Upgrade tests to use mocha as

well
This commit is contained in:
Friedel Ziegelmayer 2016-08-18 16:18:17 +02:00 committed by David Dias
parent 2782765497
commit cc3130fa23
15 changed files with 434 additions and 415 deletions

12
.gitignore vendored
View File

@ -1,7 +1,13 @@
**/node_modules/
**/*.log
test/repo-tests*
# Logs
logs
*.log
coverage
# Runtime data
pids
*.pid
@ -19,9 +25,11 @@ coverage
# node-waf configuration
.lock-wscript
# Compiled binary addons (http://nodejs.org/api/addons.html)
build/Release
build
# Dependency directory
# https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git
node_modules
lib
dist

34
.npmignore Normal file
View File

@ -0,0 +1,34 @@
**/node_modules/
**/*.log
test/repo-tests*
# Logs
logs
*.log
coverage
# Runtime data
pids
*.pid
*.seed
# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov
# Coverage directory used by tools like istanbul
coverage
# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files)
.grunt
# node-waf configuration
.lock-wscript
build
# Dependency directory
# https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git
node_modules
test

10
.travis.yml Normal file
View File

@ -0,0 +1,10 @@
sudo: false
language: node_js
node_js:
- "stable"
before_install:
- npm install -g npm
script:
- npm run lint

View File

@ -2,7 +2,10 @@ interface-stream-muxer
=====================
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/)
[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
[![Travis CI](https://travis-ci.org/ipfs/interface-stream-muxer.svg?branch=master)](https://travis-ci.org/ipfs/interface-stream-muxer)
[![Dependency Status](https://david-dm.org/ipfs/interface-stream-muxer.svg?style=flat-square)](https://david-dm.org/ipfs/interface-stream-muxer) [![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard)
> A test suite and interface you can use to implement a stream muxer. "A one stop shop for all your muxing needs"
@ -12,48 +15,46 @@ Publishing a test suite as a module lets multiple modules all ensure compatibili
The API is presented with both Node.js and Go primitives, however, there is not actual limitations for it to be extended for any other language, pushing forward the cross compatibility and interop through diferent stacks.
# Modules that implement the interface
## Modules that implement the interface
- [JavaScript libp2p-spdy](https://github.com/diasdavid/js-libp2p-spdy)
- [JavaScript libp2p-multiplex](https://github.com/diasdavid/js-libp2p-multiplex)
- [JavaScript libp2p-spdy](https://github.com/libp2p/js-libp2p-spdy)
- [Go spdy, muxado, yamux and multiplex](https://github.com/jbenet/go-stream-muxer)
Send a PR to add a new one if you happen to find or write one.
# Badge
## Badge
Include this badge in your readme if you make a new module that uses interface-stream-muxer API.
![](/img/badge.png)
# How to use the battery tests
## Usage
## Node.js
### Node.js
Install interface-stream-muxer as one of the dependencies of your project and as a test file, using `tap`, `tape` or a test runner with compatible API, do:
Install `interface-stream-muxer` as one of the dependencies of your project and as a test file. Then, using `mocha` (for Node.js) or a test runner with compatible API, do:
```
var tape = require('tape')
var tests = require('interface-stream-muxer/tests')
var yourStreamMuxer = require('../src')
```js
const test = require('interface-stream-muxer')
var common = {
setup: function (t, cb) {
cb(null, yourStreamMuxer)
const common = {
setup (cb) {
cb(null, yourMuxer)
},
teardown: function (t, cb) {
teardown (cb) {
cb()
}
}
tests(tape, common)
// use all of the test suits
test(common)
```
## Go
### Go
> WIP - being written
> WIP
# API
## API
A valid (read: that follows this abstraction) stream muxer, must implement the following API.
@ -86,7 +87,7 @@ In the Node.js case, if no callback is passed, stream will emit an 'ready' event
### Listen(wait/accept) a new incoming stream
- `Node.js` muxedConn.on('stream', function (stream) {})
- `Node.js` muxedConn.on('stream', function (stream) {})
- `Go` stream := muxedConn.Accept()
Each time a dialing peer initiates the new stream handshake, a new stream is created on the listening side.

View File

@ -2,16 +2,21 @@
"name": "interface-stream-muxer",
"version": "0.3.1",
"description": "A test suite and interface you can use to implement a stream muxer.",
"main": "tests/index.js",
"directories": {
"test": "tests"
},
"main": "src/index.js",
"jsnext:main": "src/index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
"test": "exit(0)",
"build": "aegir-build --env node",
"lint": "aegir-lint",
"release": "aegir-release --env node",
"release-minor": "aegir-release --env node --type minor",
"release-major": "aegir-release --env node --type major",
"coverage": "exit(0)",
"coverage-publish": "exit(0)"
},
"repository": {
"type": "git",
"url": "https://github.com/diasdavid/interface-stream-muxer.git"
"url": "https://github.com/libp2p/interface-stream-muxer.git"
},
"keywords": [
"Streams",
@ -22,11 +27,18 @@
"author": "David Dias <daviddias@ipfs.io>",
"license": "MIT",
"bugs": {
"url": "https://github.com/diasdavid/interface-stream-muxer/issues"
"url": "https://github.com/libp2p/interface-stream-muxer/issues"
},
"homepage": "https://github.com/diasdavid/interface-stream-muxer",
"homepage": "https://github.com/libp2p/interface-stream-muxer",
"dependencies": {
"stream-pair": "^1.0.3",
"timed-tape": "^0.1.1"
"async": "^2.0.1",
"chai": "^3.5.0",
"chai-checkmark": "^1.0.1",
"pull-generate": "^2.2.0",
"pull-pair": "^1.1.0",
"pull-stream": "^3.4.3"
},
"devDependencies": {
"aegir": "^6.0.1"
}
}

146
src/base-test.js Normal file
View File

@ -0,0 +1,146 @@
/* eslint-env mocha */
'use strict'
const chai = require('chai')
chai.use(require('chai-checkmark'))
const expect = chai.expect
const pair = require('pull-pair/duplex')
const pull = require('pull-stream')
function closeAndWait (stream) {
pull(
pull.empty(),
stream,
pull.onEnd((err) => {
expect(err).to.not.exist.mark()
})
)
}
module.exports = (common) => {
describe('base', () => {
let muxer
beforeEach((done) => {
common.setup((err, _muxer) => {
if (err) return done(err)
muxer = _muxer
done()
})
})
it('Open a stream from the dialer', (done) => {
const p = pair()
const dialer = muxer.dial(p[0])
const listener = muxer.listen(p[1])
expect(4).checks(done)
listener.on('stream', (stream) => {
expect(stream).to.exist.mark()
closeAndWait(stream)
})
const conn = dialer.newStream((err) => {
expect(err).to.not.exist.mark()
})
closeAndWait(conn)
})
it('Open a stream from the listener', (done) => {
const p = pair()
const dialer = muxer.dial(p[0])
const listener = muxer.listen(p[1])
expect(4).check(done)
dialer.on('stream', (stream) => {
expect(stream).to.exist.mark()
closeAndWait(stream)
})
const conn = listener.newStream((err) => {
expect(err).to.not.exist.mark()
})
closeAndWait(conn)
})
it('Open a stream on both sides', (done) => {
const p = pair()
const dialer = muxer.dial(p[0])
const listener = muxer.listen(p[1])
expect(8).check(done)
dialer.on('stream', (stream) => {
expect(stream).to.exist.mark()
closeAndWait(stream)
})
const listenerConn = listener.newStream((err) => {
expect(err).to.not.exist.mark()
})
listener.on('stream', (stream) => {
expect(stream).to.exist.mark()
closeAndWait(stream)
})
const dialerConn = dialer.newStream((err) => {
expect(err).to.not.exist.mark()
})
closeAndWait(dialerConn)
closeAndWait(listenerConn)
})
it('Open a stream on one side, write, open a stream in the other side', (done) => {
const p = pair()
const dialer = muxer.dial(p[0])
const listener = muxer.listen(p[1])
expect(6).check(done)
const dialerConn = dialer.newStream((err) => {
expect(err).to.not.exist.mark()
})
pull(
pull.values(['hey']),
dialerConn
)
listener.on('stream', (stream) => {
pull(
stream,
pull.collect((err, chunks) => {
expect(err).to.not.exist.mark()
expect(chunks).to.be.eql([Buffer('hey')]).mark()
})
)
const listenerConn = listener.newStream((err) => {
expect(err).to.not.exist.mark()
})
pull(
pull.values(['hello']),
listenerConn
)
dialer.on('stream', onDialerStream)
function onDialerStream (stream) {
pull(
stream,
pull.collect((err, chunks) => {
expect(err).to.not.exist.mark()
expect(chunks).to.be.eql([Buffer('hello')]).mark()
})
)
}
})
})
})
}

14
src/index.js Normal file
View File

@ -0,0 +1,14 @@
/* eslint-env mocha */
'use strict'
const baseTest = require('./base-test')
const stressTest = require('./stress-test')
const megaStressTest = require('./mega-stress-test')
module.exports = (common) => {
describe('interface-stream-muxer', () => {
baseTest(common)
stressTest(common)
megaStressTest(common)
})
}

23
src/mega-stress-test.js Normal file
View File

@ -0,0 +1,23 @@
/* eslint-env mocha */
'use strict'
const spawn = require('./spawner')
module.exports = (common) => {
describe.skip('mega stress test', function () {
this.timeout(100 * 200 * 1000)
let muxer
beforeEach((done) => {
common.setup((err, _muxer) => {
if (err) return done(err)
muxer = _muxer
done()
})
})
it('10000 messages of 10000 streams', (done) => {
spawn(muxer, 10000, 10000, done, 5000)
})
})
}

88
src/spawner.js Normal file
View File

@ -0,0 +1,88 @@
'use strict'
const expect = require('chai').expect
const pair = require('pull-pair/duplex')
const pull = require('pull-stream')
const generate = require('pull-generate')
const each = require('async/each')
const eachLimit = require('async/eachLimit')
module.exports = (muxer, nStreams, nMsg, done, limit) => {
const p = pair()
const dialerSocket = p[0]
const listenerSocket = p[1]
const check = marker((6 * nStreams) + (nStreams * nMsg), done)
const msg = 'simple msg'
const listener = muxer.listen(listenerSocket)
const dialer = muxer.dial(dialerSocket)
listener.on('stream', (stream) => {
expect(stream).to.exist
check()
pull(
stream,
pull.through((chunk) => {
expect(chunk).to.exist
check()
}),
pull.onEnd((err) => {
expect(err).to.not.exist
check()
pull(pull.empty(), stream)
})
)
})
const numbers = []
for (let i = 0; i < nStreams; i++) {
numbers.push(i)
}
const spawnStream = (n, cb) => {
const stream = dialer.newStream((err) => {
expect(err).to.not.exist
check()
expect(stream).to.exist
check()
pull(
generate(0, (s, cb) => {
cb(s === nMsg ? true : null, msg, s + 1)
}),
stream,
pull.collect((err, res) => {
expect(err).to.not.exist
check()
expect(res).to.be.eql([])
check()
cb()
})
)
})
}
if (limit) {
eachLimit(numbers, limit, spawnStream, () => {})
} else {
each(numbers, spawnStream, () => {})
}
}
function marker (n, done) {
let i = 0
return (err) => {
i++
if (err) {
console.error('Failed after %s iterations', i)
return done(err)
}
if (i === n) {
done()
}
}
}

66
src/stress-test.js Normal file
View File

@ -0,0 +1,66 @@
/* eslint-env mocha */
'use strict'
const spawn = require('./spawner')
module.exports = (common) => {
describe('stress test', () => {
let muxer
beforeEach((done) => {
common.setup((err, _muxer) => {
if (err) return done(err)
muxer = _muxer
done()
})
})
it('1 stream with 1 msg', (done) => {
spawn(muxer, 1, 1, done)
})
it('1 stream with 10 msg', (done) => {
spawn(muxer, 1, 10, done)
})
it('1 stream with 100 msg', (done) => {
spawn(muxer, 1, 100, done)
})
it('10 streams with 1 msg', (done) => {
spawn(muxer, 10, 1, done)
})
it('10 streams with 10 msg', (done) => {
spawn(muxer, 10, 10, done)
})
it('10 streams with 100 msg', (done) => {
spawn(muxer, 10, 100, done)
})
it('100 streams with 1 msg', (done) => {
spawn(muxer, 100, 1, done)
})
it('100 streams with 10 msg', (done) => {
spawn(muxer, 100, 10, done)
})
it('100 streams with 100 msg', (done) => {
spawn(muxer, 100, 100, done)
})
it('1000 streams with 1 msg', (done) => {
spawn(muxer, 1000, 1, done)
})
it('1000 streams with 10 msg', (done) => {
spawn(muxer, 1000, 10, done)
})
it('1000 streams with 100 msg', (done) => {
spawn(muxer, 1000, 100, done)
})
})
}

BIN
tests/.DS_Store vendored

Binary file not shown.

View File

@ -1,167 +0,0 @@
var streamPair = require('stream-pair')
module.exports.all = function (test, common) {
test('Open a stream from the dialer', function (t) {
common.setup(test, function (err, muxer) {
t.plan(4)
t.ifError(err, 'Should not throw')
var pair = streamPair.create()
var dialer = muxer(pair, false)
var listener = muxer(pair.other, true)
listener.on('stream', (stream) => {
t.pass('got stream')
})
dialer.newStream((err, stream) => {
t.ifError(err, 'Should not throw')
t.pass('dialed stream')
})
})
})
test('Open a stream from the listener', function (t) {
common.setup(test, function (err, muxer) {
t.plan(4)
t.ifError(err, 'Should not throw')
var pair = streamPair.create()
var dialer = muxer(pair, false)
var listener = muxer(pair.other, true)
dialer.on('stream', (stream) => {
t.pass('got stream')
})
listener.newStream((err, stream) => {
t.ifError(err, 'Should not throw')
t.pass('dialed stream')
})
})
})
test('Open a stream on both sides', function (t) {
common.setup(test, function (err, muxer) {
t.plan(7)
t.ifError(err, 'Should not throw')
var pair = streamPair.create()
var dialer = muxer(pair, false)
var listener = muxer(pair.other, true)
dialer.on('stream', (stream) => {
t.pass('got stream')
})
listener.newStream((err, stream) => {
t.ifError(err, 'Should not throw')
t.pass('dialed stream')
})
listener.on('stream', (stream) => {
t.pass('got stream')
})
dialer.newStream((err, stream) => {
t.ifError(err, 'Should not throw')
t.pass('dialed stream')
})
})
})
test('Open a stream on one side, write, open a stream in the other side', function (t) {
common.setup(test, function (err, muxer) {
t.plan(9)
t.ifError(err, 'Should not throw')
var pair = streamPair.create()
var dialer = muxer(pair, false)
var listener = muxer(pair.other, true)
dialer.newStream(function (err, stream) {
t.ifError(err, 'Should not throw')
t.pass('dialed stream from dialer')
stream.write('hey')
})
listener.on('stream', function (stream) {
t.pass('listener got stream')
stream.on('data', function (chunk) {
t.equal(chunk.toString(), 'hey')
})
listener.newStream(function (err, stream) {
t.ifError(err, 'Should not throw')
t.pass('dialed stream from listener')
stream.write('hello')
})
})
dialer.on('stream', function (stream) {
t.pass('dialer got stream')
stream.on('data', function (chunk) {
t.equal(chunk.toString(), 'hello')
})
})
})
})
test('Open a stream using the net.connect pattern', function (t) {
common.setup(test, function (err, muxer) {
t.plan(2)
t.ifError(err, 'Should not throw')
var pair = streamPair.create()
var dialer = muxer(pair, false)
var listener = muxer(pair.other, true)
var stream = dialer.newStream()
stream.on('ready', function () {
t.pass('dialed stream')
})
stream.on('error', function (err) {
t.ifError(err, 'Should not throw')
})
listener.on('stream', function (stream) {
t.pass('got stream')
})
})
})
test('Buffer writes Open a stream using the net.connect pattern', function (t) {
common.setup(test, function (err, muxer) {
t.plan(3)
t.ifError(err, 'Should not throw')
var pair = streamPair.create()
var dialer = muxer(pair, false)
var listener = muxer(pair.other, true)
var stream = dialer.newStream()
stream.write('buffer this')
stream.on('ready', function () {
t.pass('dialed stream')
})
stream.on('error', function (err) {
t.ifError(err, 'Should not throw')
})
listener.on('stream', function (stream) {
t.pass('got stream')
stream.on('data', function (chunk) {
t.equal(chunk.toString(), 'buffer this')
})
})
})
})
}

View File

@ -1,8 +0,0 @@
var timed = require('timed-tape')
module.exports = function (test, common, mega) {
test = timed(test)
require('./base-test.js').all(test, common)
require('./stress-test.js').all(test, common)
// require('./mega-stress-test.js').all(test, common)
}

View File

@ -1,55 +0,0 @@
var streamPair = require('stream-pair')
module.exports.all = function (test, common) {
test('10000 messages of 10000 streams', function (t) {
common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw')
var pair = streamPair.create()
spawnGeneration(t, muxer, pair, pair.other, 10000, 10000)
})
})
}
function spawnGeneration (t, muxer, dialerSocket, listenerSocket, nStreams, nMsg, size) {
t.plan(1 + (5 * nStreams) + (nStreams * nMsg))
var msg = !size ? 'simple msg' : 'make the msg bigger'
var listener = muxer(listenerSocket, true)
var dialer = muxer(dialerSocket, false)
listener.on('stream', function (stream) {
t.pass('Incoming stream')
stream.on('data', function (chunk) {
t.pass('Received message')
})
stream.on('end', function () {
t.pass('Stream ended on Listener')
stream.end()
})
})
for (var i = 0; i < nStreams; i++) {
dialer.newStream(function (err, stream) {
t.ifError(err, 'Should not throw')
t.pass('Dialed stream')
for (var j = 0; j < nMsg; j++) {
stream.write(msg)
}
stream.on('data', function (chunk) {
t.fail('Should not happen')
})
stream.on('end', function () {
t.pass('Stream ended on Dialer')
})
stream.end()
})
}
}

View File

@ -1,153 +0,0 @@
var streamPair = require('stream-pair')
module.exports.all = function (test, common) {
test('1 stream with 1 msg', function (t) {
common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw')
var pair = streamPair.create()
spawnGeneration(t, muxer, pair, pair.other, 1, 1)
})
})
test('1 stream with 10 msg', function (t) {
common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw')
var pair = streamPair.create()
spawnGeneration(t, muxer, pair, pair.other, 1, 10)
})
})
test('1 stream with 100 msg', function (t) {
common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw')
var pair = streamPair.create()
spawnGeneration(t, muxer, pair, pair.other, 1, 100)
})
})
test('10 stream with 1 msg', function (t) {
common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw')
var pair = streamPair.create()
spawnGeneration(t, muxer, pair, pair.other, 10, 1)
})
})
test('10 stream with 10 msg', function (t) {
common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw')
var pair = streamPair.create()
spawnGeneration(t, muxer, pair, pair.other, 10, 10)
})
})
test('10 stream with 100 msg', function (t) {
common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw')
var pair = streamPair.create()
spawnGeneration(t, muxer, pair, pair.other, 10, 10)
})
})
test('100 stream with 1 msg', function (t) {
common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw')
var pair = streamPair.create()
spawnGeneration(t, muxer, pair, pair.other, 100, 1)
})
})
test('100 stream with 10 msg', function (t) {
common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw')
var pair = streamPair.create()
spawnGeneration(t, muxer, pair, pair.other, 100, 10)
})
})
test('100 stream with 100 msg', function (t) {
common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw')
var pair = streamPair.create()
spawnGeneration(t, muxer, pair, pair.other, 100, 10)
})
})
test('1000 stream with 1 msg', function (t) {
common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw')
var pair = streamPair.create()
spawnGeneration(t, muxer, pair, pair.other, 1000, 1)
})
})
test('1000 stream with 10 msg', function (t) {
common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw')
var pair = streamPair.create()
spawnGeneration(t, muxer, pair, pair.other, 1000, 10)
})
})
test('1000 stream with 100 msg', function (t) {
common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw')
var pair = streamPair.create()
spawnGeneration(t, muxer, pair, pair.other, 1000, 100)
})
})
}
function spawnGeneration (t, muxer, dialerSocket, listenerSocket, nStreams, nMsg, size) {
t.plan(1 + (5 * nStreams) + (nStreams * nMsg))
var msg = !size ? 'simple msg' : 'make the msg bigger'
var listener = muxer(listenerSocket, true)
var dialer = muxer(dialerSocket, false)
listener.on('stream', function (stream) {
t.pass('Incoming stream')
stream.on('data', function (chunk) {
t.pass('Received message')
})
stream.on('end', function () {
t.pass('Stream ended on Listener')
stream.end()
})
})
for (var i = 0; i < nStreams; i++) {
dialer.newStream(function (err, stream) {
t.ifError(err, 'Should not throw')
t.pass('Dialed stream')
for (var j = 0; j < nMsg; j++) {
stream.write(msg)
}
stream.on('data', function (chunk) {
t.fail('Should not happen')
})
stream.on('end', function () {
t.pass('Stream ended on Dialer')
})
stream.end()
})
}
}