Sunday, March 22, 2009

Ch8, Ex13

Only method Trans inside TMClass is changed to following:
meth Trans(P ?R TS)
Halt={NewName}
T=trans(stamp:TS save:{NewDictionary} body:P
state:{NewCell running} result:R)
proc {ExcT C X Y} S1 S2 in
{proc {$}
{@tm getlock(T C S1)}
end}
if S1==halt then raise Halt end end
{@tm savestate(T C S2)} {Wait S2}
{Exchange C.state X Y}
end
proc {AccT C ?X} {ExcT C X X} end
proc {AssT C X} {ExcT C _ X} end
proc {AboT} {@tm abort(T)} R=abort raise Halt end end
SubThread %Change: New Addition
in
%Change here
thread {NewThread
proc {$}
try Res={T.body t(access:AccT assign:AssT
exchange:ExcT abort:AboT
subthread:SubThread)}
in {@tm commit(T)} R=commit(Res)
catch E then
if E\=Halt then {@tm abort(T)} R=abort(E) end
end end SubThread} end
end


If one needs to start another thread inside the transaction then it can use {T.subthread <0-argument-proc>} to do so.

Ch8, Ex12

Please don't pay attention to this solution as its far from complete and maybe wrong. Its here just for my record.
each cell now has two queues, readqueue and writequeue. And yet another queue of readowners.

Issue: What happens when a T that first requests a read lock on a cell and then a write lock some time later, if it puts all read lock threads on probation then it'll put itself on probation too i.e. it'll just keep on restarting all the time no matter what happens. And if it waits then its a deadlock as its waiting for itself to release the read lock. Solution is that it does put everyone else on probation except itself.

Some Use-Cases:

I. T1, T2 and T3 have read lock on cell C and T4 requests a write lock. If priority(T4)>Max(priotiry(T1),priority(T2),priority(T3)) then T4 waits in the write wait queue of C. Else T1, T2 and T3 are put on probation.

II. T1, T2 have a read lock on cell C and T1 requests a write lock on C. First T1 releases the read lock on C then case I is followed.

III. T1 has a write lock on C, T2 requests a read lock. if priority(T2)>priority(T1) then T1 is put on probation and T2 waits in the read wait queue.
declare
class TMClass
attr timestamp tm
meth init(TM) timestamp:=0 tm:=TM end
meth Unlockall(T RestoreFlag)
for X in {Dictionary.items T.save} do
case X of save(cell:C state:S) then
(C.owner):=unit
if RestoreFlag then (C.state):=S end
if {Not {C.queue.isEmpty}} then
Sync2#T2 = {C.queue.dequeue} in
case @(T2.state) of waiting_on(C) then
(T2.state):=running
(C.owner):=T2 Sync2=ok
[] waiting_read_on(C) then
(T2.state):=running
(C.owner):=T2 Sync2=ok
for Sync3#T3 in {C.queue.allItems} do
case @(T3.state) of waiting_on(C) then
skip
[] waiting_read_on(C) then
Sync3#T3={(C.queue).delete T3.stamp}
(T3.state):=running
Sync3=ok
end
end
end
end
[] save(cell:C) then
T = {(C.readOwners).delete T.stamp}
if {C.readOwners.isEmpty} then
if {Not {C.queue.isEmpty}} then
Sync2#T2 = {C.queue.dequeue} in
case @(T2.state) of waiting_on(C) then
(T2.state):=running
(C.owner):=T2 Sync2=ok
[] waiting_read_on(C) then
(T2.state):=running
(C.owner):=T2 Sync2=ok
for Sync3#T3 in {C.queue.allItems} do
case @(T3.state) of waiting_on(C) then
skip
[] waiting_read_on(C) then
Sync3#T3={(C.queue).delete T3.stamp}
(T3.state):=running
Sync3=ok
end
end
end
end
end
end
end
end
meth Trans(P ?R TS)
Halt={NewName}
T=trans(stamp:TS save:{NewDictionary} body:P
state:{NewCell running} result:R)
proc {ExcT C X Y} S1 S2 in
if {Not {Dictionary.member T.save C.name}} then
{@tm getWriteLock(T C S1)}
if S1==halt then raise Halt end end
{@tm savewritestate(T C S2)} {Wait S2}
else if (T.save).(C.name) == save(cell:C) then
{Dictionary.remove T.save C.name}
T={C.readOwners.delete T.stamp}
{@tm getWriteLock(T C S1)}
if S1==halt then raise Halt end end
{@tm savewritestate(T C S2)} {Wait S2}
end
end
{Exchange C.state X Y}
end
proc {AccT C ?X}
if {Not {Dictionary.member T.save C.name}} then S1 S2 in
{@tm getReadLock(T C S1)}
if S1==halt then raise Halt end end
{@tm savereadstate(T C S2)} {Wait S2}
end
{Exchange C.state X X}
end
proc {AssT C X} {ExcT C _ X} end
proc {AboT} {@tm abort(T)} R=abort raise Halt end end
in
thread try Res={T.body t(access:AccT assign:AssT
exchange:ExcT abort:AboT)}
in {@tm commit(T)} R=commit(Res)
catch E then
if E\=Halt then {@tm abort(T)} R=abort(E) end
end end
end
meth getReadLock(T C ?Sync)
if @(T.state)==probation then
{self Unlockall(T true)}
{self Trans(T.body T.result T.stamp)} Sync=halt
elseif @(C.owner)==unit then
{C.readOwners.enqueue T T.stamp}
Sync = ok
else T2 = @(C.owner) in
if T2.stamp==T.stamp then
%what if T itself has the write lock
%just let it proceed
Sync = ok
else
%Put T on read wait queue of C
{C.queue.enqueue Sync#T T.stamp}
(T.state):=waiting_read_on(C)
if T.stamp < T2.stamp then
%Correct the cases that follow
case @(T2.state) of waiting_on(C2) then
Sync2#_={C2.queue.delete T2.stamp} in
{self Unlockall(T2 true)}
{self Trans(T2.body T2.result T2.stamp)}
Sync2=halt
[] waiting_read_on(C2) then
Sync2#_={C2.queue.delete T2.stamp} in
{self Unlockall(T2 true)}
{self Trans(T2.body T2.result T2.stamp)}
Sync2=halt
[] running then
(T2.state):=probation
[] probation then skip end
end
end
end
end
%here we need to take care of the issue if a read and then a
%a write is requested by the same thread.
meth getWriteLock(T C ?Sync)
if @(T.state)==probation then
{self Unlockall(T true)}
{self Trans(T.body T.result T.stamp)} Sync=halt
elseif @(C.owner)==unit andthen
{(C.readOwners).isEmpty}==true then
(C.owner):=T Sync=ok
elseif @(C.owner)\=unit andthen
T.stamp==@(C.owner).stamp then
Sync=ok
elseif @(C.owner)\=unit then T2=@(C.owner) in
{C.queue.enqueue Sync#T T.stamp}
(T.state):=waiting_on(C)
if T.stamp<T2.stamp then
case @(T2.state) of waiting_on(C2) then
Sync2#_={C2.queue.delete T2.stamp} in
{self Unlockall(T2 true)}
{self Trans(T2.body T2.result T2.stamp)}
Sync2=halt
[] waiting_read_on(C2) then
Sync2#_={C2.queue.delete T2.stamp} in
{self Unlockall(T2 true)}
{self Trans(T2.body T2.result T2.stamp)}
Sync2=halt
[] running then
(T2.state):=probation
[] probation then skip end
end
else /* someone has readlock on C */ T2={(C.readOwners).dequeue} in
{C.readOwners.enqueue T2 T2.stamp}
{C.queue.enqueue Sync#T T.stamp}
(T.state):=waiting_on(C)
if T.stamp<T2.stamp then
for Ti in {(C.readOwners).allItems} do
if Ti.stamp == T.stamp then
{Dictionary.remove T.save C.name}
{C.readOwners.delete T.stamp}
else
case @(Ti.state) of waiting_on(C2) then
Sync2#_={C2.queue.delete Ti.stamp} in
{self Unlockall(Ti true)}
{self Trans(Ti.body Ti.result Ti.stamp)}
Sync2=halt
[] waiting_read_on(C2) then
Sync2#_={C2.queue.delete Ti.stamp} in
{self Unlockall(Ti true)}
{self Trans(Ti.body Ti.result Ti.stamp)}
Sync2=halt
[] running then
(Ti.state):=probation
[] probation then skip end
end
end
end
end
end
meth newtrans(P ?R)
timestamp:=@timestamp+1 {self Trans(P R @timestamp)}
end
meth savereadstate(T C ?Sync)
if {Not {Dictionary.member T.save C.name}} then
(T.save).(C.name):= save(cell:C)
end Sync=ok
end
meth savewritestate(T C ?Sync)
if {Not {Dictionary.member T.save C.name}} then
(T.save).(C.name):=save(cell:C state:@(C.state))
end Sync=ok
end
meth commit(T) {self Unlockall(T false)} end
meth abort(T) {self Unlockall(T true)} end
end
fun {NewActive Class Init}
Obj={New Class Init}
P
in
thread S in
{NewPort S P}
for M in S do {Obj M} end
end
proc {$ M} {Send P M} end
end
fun {NewPrioQueue}
Q={NewCell nil}
proc {Enqueue X Prio}
fun {InsertLoop L}
case L of pair(Y P)|L2 then
if Prio<P then pair(X Prio)|L
else pair(Y P)|{InsertLoop L2} end
[] nil then [pair(X Prio)] end
end
in Q:={InsertLoop @Q} end
fun {Dequeue}
pair(Y _)|L2=@Q
in
Q:=L2 Y
end
fun {Delete Prio}
fun {DeleteLoop L}
case L of pair(Y P)|L2 then
if P==Prio then X=Y L2
else pair(Y P)|{DeleteLoop L2} end
[] nil then nil end
end X
in
Q:={DeleteLoop @Q}
X
end
%CHANGED - NEW ADDITION
fun {AllElems}
proc {Iter L ?A}
case L of pair(Y P)|L2 then X in
A=Y|X
{Iter L2 X}
[] nil then
A = nil
end
end
in
{Iter @Q $}
end
fun {IsEmpty} @Q==nil end
in
queue(enqueue:Enqueue dequeue:Dequeue allItems:AllElems
delete:Delete isEmpty:IsEmpty)
end
proc {NewTrans ?Trans ?NewCellT}
TM={NewActive TMClass init(TM)} in
fun {Trans P ?B} R in
{TM newtrans(P R)}
case R of abort then B=abort unit
[] abort(Exc) then B=abort raise Exc end
[] commit(Res) then B=commit Res end
end
fun {NewCellT X}
%CHANGED HERE
cell(name:{NewName} owner:{NewCell unit}
queue:{NewPrioQueue} state:{NewCell X}
readOwners:{NewPrioQueue})
end
end

==============NEW VERSION=====================
Transaction can now be in following states:
running
probation
waiting_read_on#C
waiting_write_on#C

The dictionary at T.save now stores records like save(cell:C state:S locktype:LockT) where LockT is either read or write.

Instead of a single owner, a Cell can now have either one owner with the write lock(represented by write#T) or several owners with the read lock(represented by read#T1,read#T2 etc) on the cell. These owners are stored in a priority queue at C.owners.

declare
class TMClass
attr timestamp tm
meth init(TM) timestamp:=0 tm:=TM end
meth Unlockall(T RestoreFlag)
{Show 'entered unlockall'}
for save(cell:C state:S locktype:LockT) in {Dictionary.items T.save} do
LockT#T = {C.owners.delete T.stamp}
if RestoreFlag then (C.state) := S end
if {Not {C.queue.isEmpty}} andthen {C.owners.isEmpty} then
Sync#T0 = {C.queue.peek} in
if @(T0.state)==waiting_write_on#C then
{Show 'unlockall01'}
Sync#T0 = {C.queue.delete T0.stamp}
(T0.state):=running
{Show 'unlockall02'}
in
{Show 'unlockall2'}
{@tm getWriteLock(T0 C Sync)}
{Show 'unlockall3'}
else
for Synci#Ti in {C.queue.allItems} do
if @(Ti.state) == waiting_read_on#C then
Synci#Ti = {C.queue.delete Ti.stamp}
(Ti.state) := running
{C.owners.enqueue read#Ti Ti.stamp}
Synci = ok
end
end
end
end
end
{Show 'exited unlockall'}
end
meth Trans(P ?R TS)
Halt={NewName}
T=trans(stamp:TS save:{NewDictionary} body:P
state:{NewCell running} result:R)
proc {ExcT C X Y} S1 S2 in
{Show 'entered ExcT'}
if {Dictionary.member T.save C.name} andthen
((T.save).(C.name)).locktype==write then
skip
else
{Show 'ExcT1'}
{@tm getWriteLock(T C S1)}
{Show 'Ext01'}
{Wait S1}
{Show 'Ext001'}
if S1==halt then {Show 'Ext02'} raise Halt end end
{Show 'Ext03'}
{@tm savewritestate(T C S2)} {Wait S2}
{Show 'ExcT2'}
end
{Show 'ExcT3'}
{Exchange C.state X Y}
{Show 'exited ExcT'}
end
proc {AccT C ?X}
{Show 'entered AccT'}
if {Not {Dictionary.member T.save C.name}} then S1 S2 in
{@tm getReadLock(T C S1)}
if S1==halt then raise Halt end end
{@tm savereadstate(T C S2)} {Wait S2}
end
{Show 'AccT2'}
{Exchange C.state X X}
{Show 'exited AccT'}
end
proc {AssT C X} {ExcT C _ X} end
proc {AboT} {@tm abort(T)} R=abort raise Halt end end
in
thread try Res={T.body t(access:AccT assign:AssT
exchange:ExcT abort:AboT)}
in {@tm commit(T)} R=commit(Res)
catch E then
if E\=Halt then {@tm abort(T)} R=abort(E) end
end end
end
meth getReadLock(T C ?Sync)
{Show 'entered getReadLock'}
if @(T.state)==probation then
{self Unlockall(T true)}
{self Trans(T.body T.result T.stamp)} Sync=halt
elseif {C.owners.isEmpty} then
{C.owners.enqueue read#T T.stamp} Sync=ok
else LockT#T2={C.owners.peek} in
if LockT==read then
{C.owners.enqueue read#T T.stamp} Sync=ok
elseif LockT==write then
if T.stamp==T2.stamp then Sync=ok
else
{C.queue.enqueue Sync#T T.stamp}
(T.state) := waiting_read_on#C
if T.stamp<T2.stamp then
case @(T2.state) of
running then
(T2.state) := probation
[] probation then skip
[] WaitT#C2 then
Sync2#_={C2.queue.delete T2.stamp} in
{self Unlockall(T2 true)}
{self Trans(T2.body T2.result T2.stamp)}
Sync2=halt
end
end
end
end
end
{Show 'exited getReadLock'}
end
meth getWriteLock(T C ?Sync)
{Show 'entered getWriteLock'}
if @(T.state)==probation then
{self Unlockall(T true)}
{self Trans(T.body T.result T.stamp)} Sync=halt
elseif {C.owners.isEmpty} then
{C.owners.enqueue write#T T.stamp} Sync=ok
else LockT#T2 = {C.owners.peek} in
if T.stamp==T2.stamp andthen LockT==write then
Sync = ok
elseif T.stamp==T2.stamp andthen LockT==read then
LockT#T2 = {C.owners.delete T2.stamp}
{Dictionary.remove T.save C.name}
{@tm getWriteLock(T C Sync)}
else
{C.queue.enqueue Sync#T T.stamp}
(T.state) := waiting_write_on#C
if T.stamp<T2.stamp then
%This is WRONG as we're not really emptying the
%owner queue and putting such condition.
{While
fun {$} {Not {C.owners.isEmpty}} end
proc {$}
_#Ti = {C.owners.dequeue}
in
{Show 'entered case'}
{Show state#Ti}
case @(Ti.state) of
running then
(Ti.state) := probation
[] probation then skip
[] WaitT#C2 then
Sync2#_={C2.queue.delete Ti.stamp} in
{self Unlockall(Ti true)}
{self Trans(Ti.body Ti.result Ti.stamp)}
Sync2=halt
end
{Show 'exited case'}
end}
end
end
end
{Show 'exited getWriteLock'}
end
meth newtrans(P ?R)
timestamp:=@timestamp+1 {self Trans(P R @timestamp)}
end
meth savereadstate(T C ?Sync)
if {Not {Dictionary.member T.save C.name}} then
(T.save).(C.name):= save(cell:C state:@(C.state) locktype:read)
end Sync=ok
end
meth savewritestate(T C ?Sync)
(T.save).(C.name):=save(cell:C state:@(C.state) locktype:write)
Sync = ok
end
meth commit(T) {self Unlockall(T false)} end
meth abort(T) {self Unlockall(T true)} end
end
fun {NewActive Class Init}
Obj={New Class Init}
P
in
thread S in
{NewPort S P}
for M in S do {Obj M} end
end
proc {$ M} {Send P M} end
end
fun {NewPrioQueue}
Q={NewCell nil}
proc {Enqueue X Prio}
fun {InsertLoop L}
case L of pair(Y P)|L2 then
if Prio<P then pair(X Prio)|L
else pair(Y P)|{InsertLoop L2} end
[] nil then [pair(X Prio)] end
end
in Q:={InsertLoop @Q} end
fun {Dequeue}
pair(Y _)|L2=@Q
in
Q:=L2 Y
end
fun {Delete Prio}
fun {DeleteLoop L}
case L of pair(Y P)|L2 then
if P==Prio then X=Y L2
else pair(Y P)|{DeleteLoop L2} end
[] nil then nil end
end X
in
{Show 'deleteentered'}
Q:={DeleteLoop @Q}
{Show 'deleteexited'#X}
X
end
%CHANGED - NEW ADDITION
fun {AllElems}
proc {Iter L ?A}
case L of pair(Y P)|L2 then X in
A=Y|X
{Iter L2 X}
[] nil then
A = nil
end
end
in
{Iter @Q $}
end
fun {Peek}
pair(Y _)|L2=@Q
in
Y
end
fun {IsEmpty} @Q==nil end
in
queue(enqueue:Enqueue dequeue:Dequeue allItems:AllElems
peek:Peek delete:Delete isEmpty:IsEmpty)
end
proc {NewTrans ?Trans ?NewCellT}
TM={NewActive TMClass init(TM)} in
fun {Trans P ?B} R in
{TM newtrans(P R)}
case R of abort then B=abort unit
[] abort(Exc) then B=abort raise Exc end
[] commit(Res) then B=commit Res end
end
fun {NewCellT X}
%CHANGED HERE
cell(name:{NewName} owners:{NewPrioQueue}
queue:{NewPrioQueue} state:{NewCell X})
end
end

=================code to run sum example==============
declare S D Rand Mix Sum Trans NewCellT
{NewTrans Trans NewCellT}
D={MakeTuple db 100}
for I in 1..100 do D.I={NewCellT I} end
fun {Rand} {OS.rand} mod 100 + 1 end
proc {Mix} {Trans
proc {$ T _}
I={Rand} J={Rand} K={Rand}
{Show I#J#K}
A={T.access D.I} B={T.access D.J} C={T.access D.K}
{Show mix1}
in
{Show mix2}
{T.assign D.I A+B-C}
{Show mix3}
{T.assign D.J A-B+C}
{Show mix4}
if I==J orelse I==K orelse J==K then {T.abort} end
{Show mix5}
{T.assign D.K ~A+B+C}
{Show mix6}
end _ _}
end
S={NewCellT 0}
fun {Sum}
{Trans
fun {$ T} {T.assign S 0}
for I in 1..100 do
{T.assign S {T.access S}+{T.access D.I}} end
{T.access S}
end _}
end

{Browse {Sum}} % Displays 5050
for I in 1..1000 do thread {Mix} end end
{Browse {Sum}} % Still displays 5050

Ch8, Ex11

Only definition of method Trans(inside class TMClass) needs to be modified. I've changed ExcT to check whether T already has a lock on the cell in question before doing getlock.
meth Trans(P ?R TS)
Halt={NewName}
T=trans(stamp:TS save:{NewDictionary} body:P
state:{NewCell running} result:R)
proc {ExcT C X Y}
if {Not {Dictionary.member T.save C.name}}
then S1 S2 in
{@tm getlock(T C S1)}
if S1==halt then raise Halt end end
{@tm savestate(T C S2)} {Wait S2}
end
{Exchange C.state X Y}
end
proc {AccT C ?X}
{ExcT C X X}
end
proc {AssT C X}
{ExcT C _ X}
end
proc {AboT} {@tm abort(T)} R=abort raise Halt end end
in
thread try Res={T.body t(access:AccT assign:AssT
exchange:ExcT abort:AboT)}
in {@tm commit(T)} R=commit(Res)
catch E then
if E\=Halt then {@tm abort(T)} R=abort(E) end
end end
end

Ch8, Ex10

The reason that sum is locking all the cells is that its calculating the whole sum in one big transaction and in the growing phase of this transaction it locks all the cells and keep them all locked unless whole sum is calculated. So, the strategy is to simply break the calculation of full sum into partial sums. Here is the code..
declare Sum S={NewCellT 0}
fun {Sum}
{Trans proc {$ T _} {T.assign S 0} end _ _}
for J in 0..80;20 do
{Trans
proc {$ T _}
for I in J+1..J+20 do
{T.assign S {T.access S}+{T.access D.I}} end
end _ _}
end
{Trans fun {$ T} {T.access S} end _}
end

transaction impl code from the book

This is needed to run various solutions.
declare
class TMClass
attr timestamp tm
meth init(TM) timestamp:=0 tm:=TM end
meth Unlockall(T RestoreFlag)
for save(cell:C state:S) in {Dictionary.items T.save} do
(C.owner):=unit
if RestoreFlag then (C.state):=S end
if {Not {C.queue.isEmpty}} then
Sync2#T2={C.queue.dequeue} in
(T2.state):=running
(C.owner):=T2 Sync2=ok
end
end
end
meth Trans(P ?R TS)
Halt={NewName}
T=trans(stamp:TS save:{NewDictionary} body:P
state:{NewCell running} result:R)
proc {ExcT C X Y} S1 S2 in
{@tm getlock(T C S1)}
if S1==halt then raise Halt end end
{@tm savestate(T C S2)} {Wait S2}
{Exchange C.state X Y}
end
proc {AccT C ?X} {ExcT C X X} end
proc {AssT C X} {ExcT C _ X} end
proc {AboT} {@tm abort(T)} R=abort raise Halt end end
in
thread try Res={T.body t(access:AccT assign:AssT
exchange:ExcT abort:AboT)}
in {@tm commit(T)} R=commit(Res)
catch E then
if E\=Halt then {@tm abort(T)} R=abort(E) end
end end
end
meth getlock(T C ?Sync)
if @(T.state)==probation then
{self Unlockall(T true)}
{self Trans(T.body T.result T.stamp)} Sync=halt
elseif @(C.owner)==unit then
(C.owner):=T Sync=ok
elseif T.stamp==@(C.owner).stamp then
Sync=ok
else /* T.stamp\=@(C.owner).stamp */ T2=@(C.owner) in
{C.queue.enqueue Sync#T T.stamp}
(T.state):=waiting_on(C)
if T.stamp<T2.stamp then
case @(T2.state) of waiting_on(C2) then
Sync2#_={C2.queue.delete T2.stamp} in
{self Unlockall(T2 true)}
{self Trans(T2.body T2.result T2.stamp)}
Sync2=halt
[] running then
(T2.state):=probation
[] probation then skip end
end
end
end
meth newtrans(P ?R)
timestamp:=@timestamp+1 {self Trans(P R @timestamp)}
end
meth savestate(T C ?Sync)
if {Not {Dictionary.member T.save C.name}} then
(T.save).(C.name):=save(cell:C state:@(C.state))
end Sync=ok
end
meth commit(T) {self Unlockall(T false)} end
meth abort(T) {self Unlockall(T true)} end
end
fun {NewActive Class Init}
Obj={New Class Init}
P
in
thread S in
{NewPort S P}
for M in S do {Obj M} end
end
proc {$ M} {Send P M} end
end
fun {NewPrioQueue}
Q={NewCell nil}
proc {Enqueue X Prio}
fun {InsertLoop L}
case L of pair(Y P)|L2 then
if Prio<P then pair(X Prio)|L
else pair(Y P)|{InsertLoop L2} end
[] nil then [pair(X Prio)] end
end
in Q:={InsertLoop @Q} end
fun {Dequeue}
pair(Y _)|L2=@Q
in
Q:=L2 Y
end
fun {Delete Prio}
fun {DeleteLoop L}
case L of pair(Y P)|L2 then
if P==Prio then X=Y L2
else pair(Y P)|{DeleteLoop L2} end
[] nil then nil end
end X
in
Q:={DeleteLoop @Q}
X
end
fun {IsEmpty} @Q==nil end
in
queue(enqueue:Enqueue dequeue:Dequeue
delete:Delete isEmpty:IsEmpty)
end
proc {NewTrans ?Trans ?NewCellT}
TM={NewActive TMClass init(TM)} in
fun {Trans P ?B} R in
{TM newtrans(P R)}
case R of abort then B=abort unit
[] abort(Exc) then B=abort raise Exc end
[] commit(Res) then B=commit Res end
end
fun {NewCellT X}
cell(name:{NewName} owner:{NewCell unit}
queue:{NewPrioQueue} state:{NewCell X})
end
end

Ch8, Ex9

The new monitor supports only two operations, lock and newcondition. lock is same and newcondition gives a new condition variable which supports two operations wait and notify.

I. Buffer reimplementation
class Buffer
attr m buf first last n i nonempty nonfull
meth init(N)
m:={NewMonitor}
buf:={NewArray 0 N-1 null}
n:=N i:=0 first:=0 last:=0
nonempty := @m.newcondition
nonfull := @m.newcondition
end
meth put(X)
{@m.'lock' proc {$}
if @i>=@n then
{@nonfull.wait}
{self put(X)}
else
@buf.@last:=X
last:=(@last+1) mod @n
i:=@i+1
{@nonempty.notify}
end
end}
end
meth get(X)
{@m.'lock' proc {$}
if @i==0 then
{@nonempty.wait}
{self get(X)}
else
X=@buf.@first
first:=(@first+1) mod @n
i:=@i-1
{@nonfull.notify}
end
end}
end
end

II. Monitor Reimplementation
fun {NewMonitor}
L={NewGRLock}
proc {LockM P}
{L.get} try {P} finally {L.release} end
end
fun {NewCondition}
Q={NewQueue}
proc {WaitM}
X in
{Q.insert X} {L.release} {Wait X} {L.get}
end
proc {NotifyM}
U={Q.deleteNonBlock} in
case U of [X] then X=unit else skip end
end
in
c(wait:WaitM notify:NotifyM)
end
in
monitor('lock':LockM newcondition:NewCondition)
end

Ch8, Ex8

declare NewThread
local
M = {NewMonitor}
C = {NewCell 0}
proc {ExitOnTermination}
if {Not (@C==0)} then
{M.'lock' proc {$}
{M.wait}
{M.notifyAll}
end}
end
end
in
proc {NewThread P ?SubThread}
Is Pt={NewPort Is}
in
proc {SubThread P}
X Y in
{Exchange C X Y}
Y = X+1
thread {M.'lock'
proc {$} X Y in {P}
C:=@C-1
{M.notifyAll} end}
end
end
{SubThread P}
{ExitOnTermination}
end
end

%Test
local SubThread in
{NewThread
proc {$} {Browse started1}
{SubThread proc {$} {Delay 5000} {Browse f2} end}
{Browse finished1}
end
SubThread}
{Browse done}
end

%Complete Monitor impl code from book
%copying here as its needed to run
%the above solution
declare
fun {NewQueue}
X C={NewCell q(0 X X)} L={NewLock}
proc {Insert X}
N S E1 N1 in
{Exchange C q(N S X|E1) q(N1 S E1)}
N1=N+1
end
fun {Delete}
N S1 E N1 X in
{Exchange C q(N X|S1 E) q(N1 S1 E)}
N1=N-1
X
end
fun {Size}
lock L then @C.1 end
end
fun {DeleteAll}
lock L then
X q(_ S E)=@C in
C:=q(0 X X)
E=nil S
end
end
fun {DeleteNonBlock}
lock L then
if {Size}>0 then [{Delete}] else nil end
end
end
in
queue(insert:Insert delete:Delete size:Size
deleteAll:DeleteAll deleteNonBlock:DeleteNonBlock)
end
fun {NewGRLock}
Token1={NewCell unit}
Token2={NewCell unit}
CurThr={NewCell unit}
proc {GetLock}
if {Thread.this}\=@CurThr then Old New in
{Exchange Token1 Old New}
{Wait Old}
Token2:=New
CurThr:={Thread.this}
end
end
proc {ReleaseLock}
CurThr:=unit
unit=@Token2
end
in
'lock'(get:GetLock release:ReleaseLock)
end
fun {NewMonitor}
Q={NewQueue}
L={NewGRLock}
proc {LockM P}
{L.get} try {P} finally {L.release} end
end
proc {WaitM}
X in
{Q.insert X} {L.release} {Wait X} {L.get}
end
proc {NotifyM}
U={Q.deleteNonBlock} in
case U of [X] then X=unit else skip end
end
proc {NotifyAllM}
L={Q.deleteAll} in
for X in L do X=unit end
end
in
monitor('lock':LockM wait:WaitM notify:NotifyM
notifyAll:NotifyAllM)
end

Ch8, Ex7 (todo)

tbd (haven't followed erlang section yet)

Ch8, Ex6

declare Channel
local
proc {MySend C M}
X in
{Send C.send send(M#X)}
{Wait X}
end
fun {Receive C}
M in
{Send C.receive receive(M)}
{Wait M}
M
end
proc {MReceive Cs}
%see later for MReceive definition
skip
end
fun {New}
P1 P2 S1 S2
{NewPort S1 P1} %for receive msgs
{NewPort S2 P2} %for send msgs
proc {Iter S1 S2}
case S1#S2 of (receive(R)|S1s)#(send(X#Y)|S2s)
then
R=X
Y = unit
{Iter S1s S2s}
end
end
in
thread {Iter S1 S2} end
channel(receive:P1 send:P2)
end
in
Channel = c(new:New send:MySend
receive:Receive
mreceive:MReceive)
end
From the book its not clear what mreceive is actually supposed to do, following are my various interpretations

mreceive version#1
In this version mreceive starts listeners for all the channels and exits immediately. All the Si will be called as and when messages are received on respective channels.
proc {MReceive Cs}
for Ci#Si in Cs do
thread
{Si {Channel.receive Ci}}
end
end
end

mreceive version#2
In this version mreceive starts listeners for all the channels and waits untill messages are received on all the channels and all corresponding Si are executed.
proc {MReceive Cs}
Exit Count = {NewCell 0} in
for Ci#Si in Cs do
thread
{Si {Channel.receive Ci}}
Count := 1+@Count
if @Count == {List.length Cs} then Exit=unit end
end
end
{Wait Exit}
end

mreceive version#3
In this version mreceive starts listeners for all the channels and waits. As soon as a message is received any of the channels, corresponding Si is executed and mreceive exits. In this case only one Si is executed on whose channel a message is received first.
proc {MReceive Cs}
Exit A = {NewCell 0} in
for Ci#Si in Cs do
thread
local M in
M = {Channel.receive Ci}
if @A==0 then
{Exchange A _ 1}
{Si M}
Exit = unit
end
end
end
end
{Wait Exit}
end

NOTE: I could not understand what exactly does the extended version of mreceive is supposed to do.

TESTS
%This test proves that receive operation blocks
%until there is a send
local C in
{Browse started}
C = {Channel.new}
thread {Browse rec1#{Channel.receive C}} end
thread {Browse rec2#{Channel.receive C}} end
thread {Browse rec3#{Channel.receive C}} end

{Delay 5000}
{Channel.send C msg1}
{Delay 5000}
{Channel.send C msg2}
{Delay 5000}
{Channel.send C msg3}

end

%This test proves that send operation blocks
%until there is a receive
local C in
{Browse started}
C = {Channel.new}
thread {Channel.send C msg1} {Browse send1done} end
thread {Channel.send C msg2} {Browse send2done} end
thread {Channel.send C msg3} {Browse send3done} end

{Delay 5000}
{Browse {Channel.receive C}}
{Delay 5000}
{Browse {Channel.receive C}}
{Delay 5000}
{Browse {Channel.receive C}}
end

Ch8, Ex5

declare
fun {NewMVar}
P1 P2 Si1 Si2
{NewPort Si1 P1} %put msgs received on P1
{NewPort Si2 P2} %get msgs received on P2
%Put puts the content in C if box is empty or waits
C = {NewCell _}
%content of D is either empty or unbound variable
%signifying empty or full box respectivaly
D = {NewCell empty}
proc {PutIter Xs}
case Xs of (Sync#X)|Xr then
{Wait @D}
@C = X
Sync = ok
D := _
{PutIter Xr}
end
end
proc {GetIter Xs}
case Xs of (Sync#X)|Xr then
{Wait @C}
X = @C
Sync = ok
C := _
@D = empty
{GetIter Xr}
end
end
proc {Put X}
Sync in
{Send P1 Sync#X}
{Wait Sync}
end
proc {Get ?X}
Sync in
{Send P2 Sync#X}
{Wait Sync}
end
in
thread {PutIter Si1} end
thread {GetIter Si2} end
mvar(put:Put get:Get)
end

Ch8, Ex4

A very naive version
declare
fun {SlowNet3 Obj Time}
D = {NewDictionary}
proc {RemoveTerminatedThreads L}
case L of X|Xr then
if {Thread.state X}==terminated
then {D.remove X} end
[] nil then skip
end
end
in
proc {$ M}
Old New T = {Thread.this} in
{RemoveTerminatedThreads {D.keys}}
if {Not {D.member T}} then
{D.put T {NewCell unit}}
end
{Exchange {D.get T} Old New}
thread
{Delay Time}
{Wait Old}
{Obj M}
New=unit
end
end
end
Note: We can't use the oz dictionary as it doesn't allow us to put result of {Thread.this} as key into it. Instead, the dictionary implementation done for Chapter-6, Ex-13 at http://ctamcp-himanshu.blogspot.com/2009/02/ch6-ex13.html is to be used.

Ch8, Ex3 (todo)

tbd