java 8 - Observable emitter generates too much pressure for HystrixCommands


Keywords:java  8 


Question: 

I have an Observable that emits file lines (many GBs read from GCS).

return StringObservable.byLine(
    Observable.using(
        () -> storage.get(blobId).reader(),
            reader -> Observable.create(
                    new OnSubscribeReadChannel(reader, 64 * 1024)
                ),
            ReadChannel::close
    )
)

Each line results in multiple (many in some cases) calls to various DBs, all wrapped in Hystrix commands. Obviously the lines eventually overwhelm the Hystrix commands, circuits start opening and everyone has a bad day.

This is roughly what I'm doing:

readLinesFromCloudStorageFile.readLines(blobInfo.getBlobId()))
            .map(this::deserializeLine)
            .flatMap(this::addDataToObjectFromSomeDb)
            .flatMap(this::writeObj)
            .map(Set::size)
            .reduce(0, (a, b) -> a + b)
            .toBlocking().single()

Is there a way I can apply some back-pressure, or limit the number of lines being processed at a time or something?


1 Answer: 

You need to use Emitter.BackpressureMode.BUFFER

BUFFER
Buffers (unbounded) all onNext calls until the downstream can consume them.