Managing Resources with Reactive Extensions

Reactive Extensions

While Reactive Extensions will clean up after itself, it is still your responsibility to manage limited resources, and to dispose of  any unmanaged resources.

You can, however, use a variant on the using statement with Observables.

To do so, call the static parameterized Using method on Observable.  This returns an IObservable<char> and takes two parameters. The first parameter is a Func that returns a streamReader and the second is a Func that takes the StreamReader produced by the first and returns an IObservable of char.

The template parameters are the type of Observable to be produced (in this case char) and the resource to be disposed of when the observable sequence is disposed of (in this case a stream).

var ObservableStrings =
    Observable.Using<char, StreamReader>

)

The first of the two parameters will create the streamReader and open a FileStream on a text file in the same directory as the program,

() => new StreamReader(
    new FileStream(
      "randomstrings.txt", FileMode.Open)),

 

The second parameter uses the streamReader just created, and reads through the stream to the end (creating a string array). That array is then turned into an observable by calling ToObservable on it,

streamReader =>
(
    from str in streamReader.ReadToEnd()
    select str
)
.ToObservable()

We can then subscribe to the newly created ObsevableStrings and display the strings found in the text document. Here is the complete program,

class Program
{
    static void Main( string[ ] args )
    {
        var ObservableStrings =
             Observable.Using<char, StreamReader>
        (
            () => new StreamReader(
               new FileStream(
                     "randomstrings.txt",
                     FileMode.Open ) ),
            streamReader =>
            (
                from str in streamReader.ReadToEnd()
                select str
            )
            .ToObservable()
        );

        ObservableStrings.Subscribe( Console.Write );

    }
}

If you place this in a console application in Visual Studio, but sure to use NuGet to obtain the Reactive Extension libraries.  In addition, you’ll need a text file in the …\bin\debug directory named randomstrings.txt with text in it.

About Jesse Liberty

Jesse Liberty has three decades of experience writing and delivering software projects and is the author of 2 dozen books and a couple dozen online courses. His latest book, Building APIs with .NET will be released early in 2025. Liberty is a Senior SW Engineer for CNH and he was a Senior Technical Evangelist for Microsoft, a Distinguished Software Engineer for AT&T, a VP for Information Services for Citibank and a Software Architect for PBS. He is a Microsoft MVP.
This entry was posted in Data, Linq, Mini-Tutorial, Reactive and tagged . Bookmark the permalink.

6 Responses to Managing Resources with Reactive Extensions

  1. Mike says:

    Hi Jesse

    Do you have to convert the FileStream to a StreamReader?

    Is there a way to use the FileStream directly?

    Thanks

  2. Brett says:

    Does anyone else think this syntax is starting to look kind of, well, pointlessly cryptic?

    • do other programs with lambda expressions (e.g., LINQ etc.) seem overly cryptic or is there something special about Rx that tips the balance?

    • Also, does it help to use interim variables?

      var fs = new FileStream(
      “randomstrings.txt”,
      FileMode.Open );

      var ObservableStrings =
      Observable.Using
      (
      () => new StreamReader(fs),
      streamReader =>
      (
      from str in streamReader.ReadToEnd()
      select str
      )
      .ToObservable()
      );

  3. Priya says:

    Hi Jesse,
    What if my main method needs to subscribe to an Observable that in turn is creating say Observable.Interval? If the main method needs to dispose its subscription, do I need to explicitly dispose the “inner” subscription (kind of tricky ‘coz the timer is “ticking” for the Interval based subscription)?

    Thanks,
    Priya

    • Priya,

      You don’t have to dispose the subscription at all. The only reason we’re using dispose here is for the StreamReader which requires the dispose.

Comments are closed.