I have an Observable that emits file lines (many GBs read from GCS).
return StringObservable.byLine( Observable.using( () -> storage.get(blobId).reader(), reader -> Observable.create( new OnSubscribeReadChannel(reader, 64 * 1024) ), ReadChannel::close ) )
Each line results in multiple (many in some cases) calls to various DBs, all wrapped in Hystrix commands. Obviously the lines eventually overwhelm the Hystrix commands, circuits start opening and everyone has a bad day.
This is roughly what I'm doing:
readLinesFromCloudStorageFile.readLines(blobInfo.getBlobId())) .map(this::deserializeLine) .flatMap(this::addDataToObjectFromSomeDb) .flatMap(this::writeObj) .map(Set::size) .reduce(0, (a, b) -> a + b) .toBlocking().single()
Is there a way I can apply some back-pressure, or limit the number of lines being processed at a time or something?