Linear Channels for Asynchronous IPC

In this section, I will present an implementation of linear channels to support asynchronous communication between threads. This is also a very fitting occasion for me to advocate what I often refer to as programmer-centric program verification.

A communication channel between threads is essentially a queue wrapped in some kind of protection mechanism needed for guarding against race conditions. Assume that a queue is of a fixed capacity, that is, the capacity of the queue is fixed after its creation. If the queue is full, then inserting an element into it results in a failure. If the queue is empty, then removing an element from it results in a failure. In order to prevent inserting into a full queue or removing from an empty queue, I could first introduce a linear abstract type for queues as follows:

absvtype queue_vtype(a:vt@ype+, int(*m*), int(*n*)) vtypedef queue(a:vt@ype,m:int,n:int) = queue_vtype(a,m,n)

where the type queue(VT,M,N) is for a queue of capacity M that currently contains N elements of type VT. Then the functions for inserting into and removing from a queue are expected to be given the following interface:

// fun{a:vt0p} queue_insert {m,n:nat | m > n} (!queue(a, m, n) >> queue(a, m, n+1), a): void // fun{a:vt0p} queue_remove {m,n:nat | n > 0}(!queue(a, m, n) >> queue(a, m, n-1)): (a) //

The presented abstract type queue can indeed work very well for the task of implementing linear channels. However, I will not continue with this version of queue further for I intend to present a style of program verification that is less rigorous but far more flexible.

Following is another version of abstract type queue:

// absvtype queue_vtype(a:vt@ype+, int(*id*)) = ptr // vtypedef queue(a:vt0p, id:int) = queue_vtype(a, id) vtypedef queue(a:vt0p) = [id:int] queue(a, id) //

Given a viewtype VT and an integer ID, queue(VT,ID) is for a queue containing elements of the type VT that can be uniquely identified with the integer ID. So one may think of ID as some form of stamp. The following declared function queue_isnil is for testing whether a given queue is empty:

// absprop ISNIL(id:int, b:bool) // fun {a:vt0p} queue_isnil{id:int}(!queue(a, id)): [b:bool] (ISNIL(id, b) | bool(b)) //

Given an integer ID, a proof of the prop ISNIL(ID,true) (ISNIL(ID,false)) means that the queue uniquely identified by ID is (not) empty. Similarly, the following declared function queue_isful is for testing whether a given queue is full:

// absprop ISFUL(id:int, b:bool) // fun {a:vt0p} queue_isful{id:int}(!queue(a, id)): [b:bool] (ISFUL(id, b) | bool(b)) //

Given an integer ID, a proof of the prop ISFUL(ID,true) (ISFUL(ID,false)) means that the queue uniquely identified by ID is (not) full.

The functions queue_insert and queue_remove for inserting into and removing from a given queue can now be given the following interface:

// extern fun {a:vt0p} queue_insert {id:int} ( ISFUL(id, false) | xs: !queue(a, id) >> queue(a, id2), x: a ) : #[id2:int] void // extern fun {a:vt0p} queue_remove {id:int} ( ISNIL(id, false) | xs: !queue(a, id) >> queue(a, id2) ) : #[id2:int] a // end-of-fun //

Note that either inserting an element into a queue or removing an element from a queue assigns a new stamp to the queue. This is essential for interpreting ISNIL and ISFUL in the manner presented above.

In order to call queue_insert on a given queue, one needs to have a proof attesting to the queue being not full. Such a proof is obtained if calling queue_isful on the queue returns false. Similarly, in order to call queue_remove on a given queue, one can first call queue_isnil on the queue to obtain a proof attesting to the queue being not empty.

What is really of concern here is not to actually verify that queue_isnil and queue_isful have the interface assigned to them. Instead, the focus is on ensuring that queue_insert is never applied to a full queue and queue_remove is never applied to an empty queue under the assumption that queue_isnil and queue_isful have the assigned interface. I refer to this form of program verification as being programmer-centric because its correctness is not formally established in an objective manner. I myself find that programmer-centric program verification is very flexible and effective in practice. If we believe that constructing informal mathematical proofs can help one check whether the proven statements are valid, then it is only natural to believe that programmer-centric program verification can also help one check whether verified programs are correct.

Let us now start to implement linear channels for asynchronous communication between threads. First, let us declare a linear abstract type channel as follows:

absvtype channel_vtype(a:vt@ype+) = ptr vtypedef channel(a:vt0p) = channel_vtype(a)

The function for inserting an element into a channel is given the following interface:

fun{a:vt0p} channel_insert (!channel(a), a): void

The caller of channel_insert is blocked if the channel is full. Similarly, the function for removing an element from a channel is given the following interface:

fun{a:vt0p} channel_remove (chan: !channel(a)): (a)

The caller of channel_remove is blocked if the channel is empty.

Let a channel be represented as follows:

// datavtype channel_ = { l0,l1,l2,l3:agz } CHANNEL of @{ cap=intGt(0) , spin=spin_vt(l0) , rfcnt=intGt(0) , mutex=mutex_vt(l1) , CVisnil=condvar_vt(l2) , CVisful=condvar_vt(l3) , queue=ptr // deqarray } (* end of [channel] *) // assume channel_vtype(a:vt0p) = channel_ //

There are 7 fields in the record representing a channel; the cap field stores an integer indicating the (fixed) capacity of the channel; the spin field stores a spin-lock for protecting the reference count in the field of the name rfcnt; the mutex field stores a mutex for protecting the queue in the field of the name queue; the field CVisnil stores a conditional variable for the situation where a caller (holding the mutex) wants to wait for the condition that the queue becomes not empty; the field CVisful stores a conditional variable for the situation where a caller (holding the mutex) wants to wait for the condition that the queue becomes not full.

The function channel_insert is given the following implementation:

implement {a}(*tmp*) channel_insert (chan, x0) = let // val+CHANNEL {l0,l1,l2,l3}(ch) = chan val mutex = unsafe_mutex_vt2t(ch.mutex) val (pfmut | ()) = mutex_lock (mutex) val xs = $UN.castvwtp0{queue(a)}((pfmut | ch.queue)) val ((*void*)) = channel_insert2<a> (chan, xs, x0) prval pfmut = $UN.castview0{locked_v(l1)}(xs) val ((*void*)) = mutex_unlock (pfmut | mutex) // in // nothing end // end of [channel_insert]

where the auxiliary function channel_insert2 is given the following interface:

fun{a:vt0p} channel_insert2 (!channel(a), !queue(a) >> _, a): void

Please note that channel_insert2 is called when the caller is holding the mutex inside the channel. Following is an implementation for channel_insert2:

implement {a}(*tmp*) channel_insert2 (chan, xs, x0) = let // val+CHANNEL {l0,l1,l2,l3}(ch) = chan // val (pf | isful) = queue_isful (xs) // in // if isful then let prval (pfmut, fpf) = __assert () where { extern praxi __assert (): vtakeout0(locked_v(l1)) } val mutex = unsafe_mutex_vt2t(ch.mutex) val CVisful = unsafe_condvar_vt2t(ch.CVisful) val ((*void*)) = condvar_wait (pfmut | CVisful, mutex) prval ((*void*)) = fpf (pfmut) in channel_insert2 (chan, xs, x0) end // end of [then] else let val isnil = queue_isnil (xs) val ((*void*)) = queue_insert (pf | xs, x0) val ((*void*)) = if isnil.1 then condvar_broadcast(unsafe_condvar_vt2t(ch.CVisnil)) // end of [if] in // nothing end // end of [else] // end // end of [channel_insert2]

The logic behind channel_insert2 can be explained as follows. If the queue in the given channel is full, the caller calls condvar_wait to release the mutex it holds and then wait on the conditional variable stored in the field CVisful of the channel; after the caller regains the mutex after being awoken by a signal sent to the conditioanl variable, it calls channel_insert2 recursively. If the queue in the given channel is not full, then the caller insert a given element into the queue stored in the field queue and then returns. Note that channel_insert2 is a tail-recursive function that essentially corresponds to a standard while-loop often appearing in C code for handling the wait on a conditional variable.

By following the above implementation for channel_insert (and channel_insert2), it should be pretty straightforward for one to figure out an implementation for channel_remove. I leave it as an exercise.

Please find on-line the file channel_vt.dats containing the entirety of the code presented in this section plus some code for testing.