Chapter 18. Linear Lazy Stream-Processing

As the name suggests, linear lazy streams are the linear version of lazy streams. In ATS, a linear value is one that cannot be shared; it must be consumed properly for otherwise a type error is reported during typechecking. Before moving on to introduce programming with linear lazy streams, I would like to first explain a serious drawback in programming with (non-linear) lazy streams.

Given an integer n and a closure-function f, the following function ftally returns the sum of the values of f(i) for i ranging from 1 to n:

// fun ftally ( n : int , f : cfun(int, double) ) : double = if n = 0 then 0.0 else ftally(n-1, f) + f(n) //

The next function ftally2 is a stream-based implementation of ftally:

// fun ftally2 ( n : int , f : cfun(int, double) ) : double = let // val xs1 = int_stream_from(1) val xs2 = stream_takeLte<int>(xs1, n) val xs3 = stream_map<int><double>(xs2, lam(i) => f(i)) // in stream_foldleft<double><double>(xs3, 0.0, lam(res, x) => res+x) end // end of [ftally2] //

In the body of ftally2, xs1 refers to an infinite stream of integers starting from 1 (that are enumerated ascendingly); xs2 refers to the prefix of xs1 ending with the integer n; xs3 refers to the stream consisting of f(i) for i ranging from 1 to n; and finally stream_foldleft is called to compute the sum of all of the numbers in xs3. For the code implementing stream_takeLte and stream_foldleft, please follow the link mentioned at the end of the chapter.

Clearly, the implementation of ftally2 seems more complex than that of ftally. Is there any practical reason for prefering ftally2 over ftally? Absolutely.

The implementation of ftally2 is combinator-based. In general, such an implementation makes it easier for the code to evolve. Let us imagine a scenario where the passed function f needs to perform computation at a remote site (because it needs to access a database) and the bottleneck for calling f lies in communicating the input and output over some network. A common approach to addressing this bottleneck issue is to provide a vectorial variant of f that can handle multiple inputs and outputs in one call. Assume that such a variant of f is available. We can implement a corresponding variant of stream_map based on this variant of f and the implementation of ftally2 can then benefit immediately. As a matter of fact, it can be realistically expected that such a variant of stream_map is already made available as a library function. On the other hand, we probably need to completely overhaul the implementation of ftally in order for it to benefit (from the vectorial variant of f).

An often touted feature of a (non-linear) stream is its support for caching: the elements in the stream are automatically saved for potential subsequent use once they are generated. As with any programming features, a constant question is whether one needs to pay for this feature even if one makes no use of it. In the case of automatic caching associated with (non-linear) streams, one clearly pays dearly for the time and memory spent on caching when the elements in such a stream need to be processed only once. In stream-based programming, most streams created to support intermediate computation are processed only once. For instance, the three streams in the body of ftally2 are all processed once and only once.

While one can argue that an optimizing compiler may be able to generate code (e.g., based on some deforestation heuristics) that completely eliminates caching in the case of ftally2, the very problem with such an argument is that the optimizing compiler also may not. This is a bit like trying to control a puppet with a string of one hundred meter long: the consequence of such a practice is difficult to predict if not unpredictable.

Linear streams come to the rescue. A linear stream is very much like a non-linear stream except that it does not support caching (of its generated elements). While a linear stream presents an illusion of a (possibily infinte) sequence of elements, only the first element (if it exists) is available for use. In order to gain access to the next element, the node for storing the first element needs to be freed. For the purpose of clarifying the concept of linearity, I would like to point out that the length of a (finite) linear stream can be readily obtained by evaluating a loop that traverses the entire linear stream but the linear stream itself is all consumed when its length is finally computed!

Given a type T, the type stream_vt(T) is for a linear stream of elements of the type T. Evaluating a linear stream-value yields a linear stream-con value of type stream_con(T) for some T, where stream_vt_con is the following declared linear datatype:

datavtype stream_vt_con(a:t@ype) = | steam_vt_nil of () | stream_vt_cons of (a, stream_vt(a))

The constructor stream_vt_nil, the linear counterpart of stream_nil, is for creating a linear empty stream-con value, and the constructor stream_vt_cons, the linear counterpart of stream_cons, is for creating a linear non-empty stream-con value.

The following code implements a function for building a linear stream of integers starting from a given one:

// extern fun int_stream_vt_from(n: int): stream_vt(int) // implement int_stream_vt_from(n) = $ldelay(stream_vt_cons(n, int_stream_vt_from(n+1))) //

Note that the keyword $ldelay, the linear counterpart of $delay, is for initiating the construction of a linear stream.

Given a linear stream xs and an integer n, the following function stream_vt_takeLte returns another stream consisting of the first n elements of xs if xs contains at least n elements or all of xs if not:

// extern fun {a:t@ype} stream_vt_takeLte (xs: stream_vt(a), n: int): stream_vt(a) // implement {a}(*tmp*) stream_vt_takeLte (xs, n) = $ldelay ( if n > 0 then ( case+ !xs of | ~stream_vt_nil() => stream_vt_nil() | ~stream_vt_cons(x, xs) => stream_vt_cons(x, stream_vt_takeLte(xs, n-1)) ) else (~xs; stream_vt_nil((*void*))) , lazy_vt_free(xs) // called when the stream is freed ) (* end of [stream_vt_takeLte] *) //

Given a linear stream xs, we can call lazy_vt_free on xs to free it. We can also write ~(xs) as a shorthand for lazy_vt_free(xs). Assume that the keyword $ldelay initiates the construction of a linear stream. What follows the keyword $ldelay should be a pair of expressions; the first one is the body of the thunk that is called when the constructed linear stream is evaluated (to a linear stream-con value); the second one is the body of the thunk that is called when the constructed linear stream is to be freed by lazy_vt_free. If the second expression is missing, then calling lazy_vt_free on the constructed linear stream does nothing. In other words, a linear stream is like an object with two methods: One is called to evaluate the stream to a stream-con value and the other to free the stream. In the actual implementation, these two methods are combined into one, which is passed a boolean value to determine which of the two methods should be called. In the body of the function stream_vt_takeLte, the symbol ~ in front of the pattern stream_vt_nil() or stream_vt_cons(x, xs) means that the stream-con value that matches the pattern is freed (and thus no longer available for any subsequent use).

The following function stream_vt_map is just the counterpart of stream_map for linear streams:

// extern fun {a:t@ype} {b:t@ype} stream_vt_map (xs: stream_vt(a), fopr: cfun(a, b)): stream_vt(b) // implement {a}{b} stream_vt_map (xs, fopr) = $ldelay ( case+ !xs of | ~stream_vt_nil() => stream_vt_nil() | ~stream_vt_cons(x, xs) => stream_vt_cons(fopr(x), stream_vt_map<a><b>(xs, fopr)) , ~(xs) // called when the stream is freed ) //

The following function stream_vt_foldleft is just the counterpart of stream_foldleft for linear streams:

// extern fun {res:t@ype }{a:t@ype} stream_vt_foldleft ( xs: stream_vt(a), r0: res, fopr: cfun(res, a, res) ) : res // end-of-function // implement {res}{a} stream_vt_foldleft(xs, r0, fopr) = ( // case+ !xs of | ~stream_vt_nil() => r0 | ~stream_vt_cons(x, xs) => stream_vt_foldleft<res><a>(xs, fopr(r0, x), fopr) // ) (* end of [stream_vt_foldleft] *) //

The following function ftally2_vt is simply a variant of ftally2 in which the combinators on (non-linear) streams are replaced with their counterparts on linear streams:

// fun ftally2_vt ( n : int , f : cfun(int, double) ) : double = let // val xs1 = int_stream_vt_from(1) val xs2 = stream_vt_takeLte<int>(xs1, n) val xs3 = stream_vt_map<int><double>(xs2, lam(i) => f(i)) // in stream_vt_foldleft<double><double>(xs3, 0.0, lam(res, x) => res+x) end // end of [ftally2_vt] //

Given a natural number n, ftally2 creates three streams on heap and leave them there when it returns such that each of the three left streams contains n cached elements. Given the same natural number, ftally2_vt creates three linear streams and then frees all of them before it returns. When compared to ftally2, ftally2_vt is clearly of greater efficiency both time-wise and memory-wise.

As far as I can tell for now, the support for linear streams in ATS is unique and it is not yet available elsewhere. In concern of stream-based programming, I see this support as a secret weapon that gives ATS a clear edge over others that can only provide support for non-linear streams. Please find on-line the entirety of the code used in this chapter.