Streams are easy to read, tricky to test, and even trickier to trust without good tests.
Every feed today chats about streams. Java 8 streams sit in most new code reviews, RxJava keeps popping up in talks, and Node 4 just landed with a comfy ES6 baseline that makes stream code a bit nicer to write. Teams are turning loops into map filter reduce chains, and it looks clean. The punchline is still the same. No test, no confidence. The good news is that unit testing streams is not a mysterious art. You treat the stream like a pure function when you can, you fake the edges when you cannot, and you keep the test data tiny and obvious.
Start by thinking about deterministic transforms. If your stream is pure, test it as a transform from input values to output values. Keep inputs small and named. Use assertions that read like a story. I try to keep one idea per test and I avoid hiding intent behind helpers unless they make the test shorter to read than to ignore. In Java that means Stream.of with a couple of values, then collect and compare. No mock frenzy, no setup maze. Here is one that filters, maps, and sorts, with an edge case for empty input.
/* Java 8 + JUnit 4 */
import static java.util.stream.Collectors.toList;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
import java.util.*;
import java.util.stream.*;
public class PriceStreamTest {
private static class Price {
final String sku;
final double amount;
Price(String sku, double amount) { this.sku = sku; this.amount = amount; }
}
private static Stream<String> formatAffordable(Stream<Price> prices) {
return prices
.filter(p -> p.amount < 10.0)
.map(p -> p.sku + ":" + String.format(Locale.US, "%.2f", p.amount))
.sorted();
}
@Test
public void formatsAffordableSorted() {
List<Price> input = Arrays.asList(
new Price("A", 12.0),
new Price("B", 5.5),
new Price("C", 1.0),
new Price("D", 10.0)
);
List<String> out = formatAffordable(input.stream()).collect(toList());
assertEquals(Arrays.asList("B:5.50", "C:1.00"), out);
}
@Test
public void emptyInputGivesEmptyOutput() {
List<String> out = formatAffordable(Stream.<Price>empty()).collect(toList());
assertEquals(Collections.emptyList(), out);
}
}When you cannot keep it pure because you deal with files, sockets, or time, you still can unit test by plugging a fake source or sink. In Node that might be a tiny Readable that emits a known set of chunks and a transform with through2. The test builds the stream graph, collects output into an array, and asserts order and content. Notice that the test does not hit the disk or the network. No sleeps, no flakiness. Just data in and data out.
// Node 4 + mocha + through2
const assert = require('assert');
const stream = require('stream');
const through2 = require('through2');
function onlyJsonLines() {
return through2.obj(function (line, enc, cb) {
try {
const obj = JSON.parse(line);
this.push(obj);
cb();
} catch (e) {
cb(); // skip bad lines
}
});
}
function fromArray(arr) {
const src = new stream.Readable({ objectMode: true });
let i = 0;
src._read = function () {
if (i < arr.length) this.push(arr[i++]);
else this.push(null);
};
return src;
}
describe('onlyJsonLines', function () {
it('parses valid JSON lines and skips the rest', function (done) {
const lines = [
'{"a":1}',
'{bad json}',
'{"b":2}'
];
const out = [];
fromArray(lines)
.pipe(onlyJsonLines())
.on('data', function (obj) { out.push(obj); })
.on('end', function () {
assert.deepEqual(out, [{ a: 1 }, { b: 2 }]);
done();
})
.on('error', done);
});
});For evented flows you can go with RxJava and lean on TestSubscriber. It is a tiny lifesaver. You send a few values, subscribe with a test probe, then assert the exact sequence, completion, and errors. If time is involved, you can bring in TestScheduler and tick it, but for everyday unit tests I prefer simple value streams that do not depend on wall time. The point is the same. Keep it tight and readable.
/* RxJava 1.x + JUnit 4 */
import org.junit.Test;
import rx.Observable;
import rx.observers.TestSubscriber;
import java.util.concurrent.TimeUnit;
public class RxTransformTest {
private Observable<String> toUpperAlpha(Observable<String> in) {
return in
.filter(s -> s != null && s.matches("[A-Za-z]+"))
.map(String::toUpperCase);
}
@Test
public void transformsAndCompletes() {
TestSubscriber<String> ts = new TestSubscriber<>();
toUpperAlpha(Observable.from(new String[] { "foo", "123", "Bar", null }))
.subscribe(ts);
ts.assertNoErrors();
ts.assertCompleted();
ts.assertValues("FOO", "BAR");
}
@Test
public void propagatesError() {
TestSubscriber<String> ts = new TestSubscriber<>();
Observable<String> boom = Observable.create(sub -> {
sub.onNext("ok");
sub.onError(new RuntimeException("boom"));
});
toUpperAlpha(boom).subscribe(ts);
ts.assertNotCompleted();
ts.assertError(RuntimeException.class);
ts.assertValues("OK");
}
}Now a few habits that keep tests sharp. Name your streams in tests with tiny helpers only when it reads better than the raw chain. Test happy paths and ugly edges. Empty input, single item, sorted input, unsorted input, duplicate values, bad lines, and errors. Assert that resources close when the stream ends. Verify order when order matters, and do not depend on order when it does not. Resist the urge to over mock. A small in memory source beats a complex mock with a script of expectations. When test setup starts to feel like a second product, pause and move logic out of the stream and into a tiny function you can test in isolation. Streams shine as glue. Glue is easy to test when the parts are simple and pure.
Readability wins. A stream chain that fits in one screen and uses clear verbs is a gift to future you. Tests should mirror that clarity. Arrange Act Assert still works. Arrange a few named values. Act by running the stream. Assert with clean expectations. If you feel the need to add print lines to debug a stream chain, the test is either missing a case or the stream is doing too much. Split it. Give names to steps. Then your tests become small and your stream code gets easier to reason about. That is the cycle that keeps a codebase fast to change as your input and output grow.
One last note for today. I keep hearing that streams cannot be unit tested and need end to end checks. That is not the experience on our side. You can build a tiny slice with a fake source, push five values, and assert the exact shape of the output. End to end is still useful, but it is a safety net, not the first line. If your stream shuffles a lot of state or touches many systems, that is a flag to move the heavy logic into pure helpers and leave the stream to stitch them together. Then your unit tests stay stable and fast, even as the outside world keeps moving.
Unit testing streams is just testing functions. The only twist is that data arrives as a flow. Set up a tiny flow, assert results with care, and keep the code honest.
Ship small streams.
Test the edges.