While Reactive Extensions will clean up after itself, it is still your responsibility to manage limited resources, and to dispose of any unmanaged resources.
You can, however, use a variant on the using statement with Observables.
To do so, call the static parameterized Using method on Observable. This returns an IObservable<char> and takes two parameters. The first parameter is a Func that returns a streamReader and the second is a Func that takes the StreamReader produced by the first and returns an IObservable of char.
The template parameters are the type of Observable to be produced (in this case char) and the resource to be disposed of when the observable sequence is disposed of (in this case a stream).
var ObservableStrings = Observable.Using<char, StreamReader> )
The first of the two parameters will create the streamReader and open a FileStream on a text file in the same directory as the program,
() => new StreamReader( new FileStream( "randomstrings.txt", FileMode.Open)),
The second parameter uses the streamReader just created, and reads through the stream to the end (creating a string array). That array is then turned into an observable by calling ToObservable on it,
streamReader => ( from str in streamReader.ReadToEnd() select str ) .ToObservable()
We can then subscribe to the newly created ObsevableStrings and display the strings found in the text document. Here is the complete program,
class Program { static void Main( string[ ] args ) { var ObservableStrings = Observable.Using<char, StreamReader> ( () => new StreamReader( new FileStream( "randomstrings.txt", FileMode.Open ) ), streamReader => ( from str in streamReader.ReadToEnd() select str ) .ToObservable() ); ObservableStrings.Subscribe( Console.Write ); } }
If you place this in a console application in Visual Studio, but sure to use NuGet to obtain the Reactive Extension libraries. In addition, you’ll need a text file in the …\bin\debug directory named randomstrings.txt with text in it.
Hi Jesse
Do you have to convert the FileStream to a StreamReader?
Is there a way to use the FileStream directly?
Thanks
Does anyone else think this syntax is starting to look kind of, well, pointlessly cryptic?
do other programs with lambda expressions (e.g., LINQ etc.) seem overly cryptic or is there something special about Rx that tips the balance?
Also, does it help to use interim variables?
var fs = new FileStream(
“randomstrings.txt”,
FileMode.Open );
var ObservableStrings =
Observable.Using
(
() => new StreamReader(fs),
streamReader =>
(
from str in streamReader.ReadToEnd()
select str
)
.ToObservable()
);
Hi Jesse,
What if my main method needs to subscribe to an Observable that in turn is creating say Observable.Interval? If the main method needs to dispose its subscription, do I need to explicitly dispose the “inner” subscription (kind of tricky ‘coz the timer is “ticking” for the Interval based subscription)?
Thanks,
Priya
Priya,
You don’t have to dispose the subscription at all. The only reason we’re using dispose here is for the StreamReader which requires the dispose.