Organizational Research By

Surprising Reserch Topic

creating a node js stream from two piped streams


creating a node js stream from two piped streams  using -'node.js,node.js-stream'

I'd like to combine two Node.js streams into one by piping them, if possible. I'm using Transform streams.

In other words, I'd like my library to return myStream for people to use. For example they could write:

process.stdin.pipe(myStream).pipe(process.stdout);


And internally I'm using a third-party vendorStream that does some work, plugged into my own logic contained in myInternalStream. So what's above would translate to:

process.stdin.pipe(vendorStream).pipe(myInternalStream).pipe(process.stdout);


Can I do something like that? I've tried var myStream = vendorStream.pipe(myInternalStream) but that obviously doesn't work.

To make an analogy with bash, let's say I want to write a program that checks if the letter h is present in the last line of some stream (tail -n 1 | grep h), I can create a shell script:

# myscript.sh
tail -n 1 | grep h


And then if people do:

$ printf "abc\ndef\nghi" | . myscript.sh


It just works.

This is what I have so far:

// Combine a pipe of two streams into one stream

var util = require('util')
  , Transform = require('stream').Transform;

var chunks1 = [];
var stream1 = new Transform();
var soFar = '';
stream1._transform = function(chunk, encoding, done) {
  chunks1.push(chunk.toString());
  var pieces = (soFar + chunk).split('\n');
  soFar = pieces.pop();
  for (var i = 0; i < pieces.length; i++) {
    var piece = pieces[i];
    this.push(piece);
  }
  return done();
};

var chunks2 = [];
var count = 0;
var stream2 = new Transform();
stream2._transform = function(chunk, encoding, done) {
  chunks2.push(chunk.toString());
  count = count + 1;
  this.push(count + ' ' + chunk.toString() + '\n');
  done();
};

var stdin = process.stdin;
var stdout = process.stdout;

process.on('exit', function () {
    console.error('chunks1: ' + JSON.stringify(chunks1));
    console.error('chunks2: ' + JSON.stringify(chunks2));
});
process.stdout.on('error', process.exit);


// stdin.pipe(stream1).pipe(stream2).pipe(stdout);

// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js
// Outputs:
// 1 abc
// 2 def
// 3 ghi
// chunks1: ["abc\nd","ef\nghi\n"]
// chunks2: ["abc","def","ghi"]

// Best working solution I could find
var stream3 = function(src) {
  return src.pipe(stream1).pipe(stream2);
};
stream3(stdin).pipe(stdout);

// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js
// Outputs:
// 1 abc
// 2 def
// 3 ghi
// chunks1: ["abc\nd","ef\nghi\n"]
// chunks2: ["abc","def","ghi"]


Is this at all possible? Let me know if what I'm trying to do isn't clear.

Thanks!
    
asked Oct 11, 2015 by amit_cmps
0 votes
2 views



Related Hot Questions



Government Jobs Opening


...