Please don't pay attention to this solution as its far from complete and maybe wrong. Its here just for my record.each cell now has two queues, readqueue and writequeue. And yet another queue of readowners.
Issue: What happens when a T that first requests a read lock on a cell and then a write lock some time later, if it puts all read lock threads on probation then it'll put itself on probation too i.e. it'll just keep on restarting all the time no matter what happens. And if it waits then its a deadlock as its waiting for itself to release the read lock. Solution is that it does put everyone else on probation except itself.
Some Use-Cases:
I. T1, T2 and T3 have read lock on cell C and T4 requests a write lock. If priority(T4)>Max(priotiry(T1),priority(T2),priority(T3)) then T4 waits in the write wait queue of C. Else T1, T2 and T3 are put on probation.
II. T1, T2 have a read lock on cell C and T1 requests a write lock on C. First T1 releases the read lock on C then case I is followed.
III. T1 has a write lock on C, T2 requests a read lock. if priority(T2)>priority(T1) then T1 is put on probation and T2 waits in the read wait queue.
declare
class TMClass
attr timestamp tm
meth init(TM) timestamp:=0 tm:=TM end
meth Unlockall(T RestoreFlag)
for X in {Dictionary.items T.save} do
case X of save(cell:C state:S) then
(C.owner):=unit
if RestoreFlag then (C.state):=S end
if {Not {C.queue.isEmpty}} then
Sync2#T2 = {C.queue.dequeue} in
case @(T2.state) of waiting_on(C) then
(T2.state):=running
(C.owner):=T2 Sync2=ok
[] waiting_read_on(C) then
(T2.state):=running
(C.owner):=T2 Sync2=ok
for Sync3#T3 in {C.queue.allItems} do
case @(T3.state) of waiting_on(C) then
skip
[] waiting_read_on(C) then
Sync3#T3={(C.queue).delete T3.stamp}
(T3.state):=running
Sync3=ok
end
end
end
end
[] save(cell:C) then
T = {(C.readOwners).delete T.stamp}
if {C.readOwners.isEmpty} then
if {Not {C.queue.isEmpty}} then
Sync2#T2 = {C.queue.dequeue} in
case @(T2.state) of waiting_on(C) then
(T2.state):=running
(C.owner):=T2 Sync2=ok
[] waiting_read_on(C) then
(T2.state):=running
(C.owner):=T2 Sync2=ok
for Sync3#T3 in {C.queue.allItems} do
case @(T3.state) of waiting_on(C) then
skip
[] waiting_read_on(C) then
Sync3#T3={(C.queue).delete T3.stamp}
(T3.state):=running
Sync3=ok
end
end
end
end
end
end
end
end
meth Trans(P ?R TS)
Halt={NewName}
T=trans(stamp:TS save:{NewDictionary} body:P
state:{NewCell running} result:R)
proc {ExcT C X Y} S1 S2 in
if {Not {Dictionary.member T.save C.name}} then
{@tm getWriteLock(T C S1)}
if S1==halt then raise Halt end end
{@tm savewritestate(T C S2)} {Wait S2}
else if (T.save).(C.name) == save(cell:C) then
{Dictionary.remove T.save C.name}
T={C.readOwners.delete T.stamp}
{@tm getWriteLock(T C S1)}
if S1==halt then raise Halt end end
{@tm savewritestate(T C S2)} {Wait S2}
end
end
{Exchange C.state X Y}
end
proc {AccT C ?X}
if {Not {Dictionary.member T.save C.name}} then S1 S2 in
{@tm getReadLock(T C S1)}
if S1==halt then raise Halt end end
{@tm savereadstate(T C S2)} {Wait S2}
end
{Exchange C.state X X}
end
proc {AssT C X} {ExcT C _ X} end
proc {AboT} {@tm abort(T)} R=abort raise Halt end end
in
thread try Res={T.body t(access:AccT assign:AssT
exchange:ExcT abort:AboT)}
in {@tm commit(T)} R=commit(Res)
catch E then
if E\=Halt then {@tm abort(T)} R=abort(E) end
end end
end
meth getReadLock(T C ?Sync)
if @(T.state)==probation then
{self Unlockall(T true)}
{self Trans(T.body T.result T.stamp)} Sync=halt
elseif @(C.owner)==unit then
{C.readOwners.enqueue T T.stamp}
Sync = ok
else T2 = @(C.owner) in
if T2.stamp==T.stamp then
%what if T itself has the write lock
%just let it proceed
Sync = ok
else
%Put T on read wait queue of C
{C.queue.enqueue Sync#T T.stamp}
(T.state):=waiting_read_on(C)
if T.stamp < T2.stamp then
%Correct the cases that follow
case @(T2.state) of waiting_on(C2) then
Sync2#_={C2.queue.delete T2.stamp} in
{self Unlockall(T2 true)}
{self Trans(T2.body T2.result T2.stamp)}
Sync2=halt
[] waiting_read_on(C2) then
Sync2#_={C2.queue.delete T2.stamp} in
{self Unlockall(T2 true)}
{self Trans(T2.body T2.result T2.stamp)}
Sync2=halt
[] running then
(T2.state):=probation
[] probation then skip end
end
end
end
end
%here we need to take care of the issue if a read and then a
%a write is requested by the same thread.
meth getWriteLock(T C ?Sync)
if @(T.state)==probation then
{self Unlockall(T true)}
{self Trans(T.body T.result T.stamp)} Sync=halt
elseif @(C.owner)==unit andthen
{(C.readOwners).isEmpty}==true then
(C.owner):=T Sync=ok
elseif @(C.owner)\=unit andthen
T.stamp==@(C.owner).stamp then
Sync=ok
elseif @(C.owner)\=unit then T2=@(C.owner) in
{C.queue.enqueue Sync#T T.stamp}
(T.state):=waiting_on(C)
if T.stamp<T2.stamp then
case @(T2.state) of waiting_on(C2) then
Sync2#_={C2.queue.delete T2.stamp} in
{self Unlockall(T2 true)}
{self Trans(T2.body T2.result T2.stamp)}
Sync2=halt
[] waiting_read_on(C2) then
Sync2#_={C2.queue.delete T2.stamp} in
{self Unlockall(T2 true)}
{self Trans(T2.body T2.result T2.stamp)}
Sync2=halt
[] running then
(T2.state):=probation
[] probation then skip end
end
else /* someone has readlock on C */ T2={(C.readOwners).dequeue} in
{C.readOwners.enqueue T2 T2.stamp}
{C.queue.enqueue Sync#T T.stamp}
(T.state):=waiting_on(C)
if T.stamp<T2.stamp then
for Ti in {(C.readOwners).allItems} do
if Ti.stamp == T.stamp then
{Dictionary.remove T.save C.name}
{C.readOwners.delete T.stamp}
else
case @(Ti.state) of waiting_on(C2) then
Sync2#_={C2.queue.delete Ti.stamp} in
{self Unlockall(Ti true)}
{self Trans(Ti.body Ti.result Ti.stamp)}
Sync2=halt
[] waiting_read_on(C2) then
Sync2#_={C2.queue.delete Ti.stamp} in
{self Unlockall(Ti true)}
{self Trans(Ti.body Ti.result Ti.stamp)}
Sync2=halt
[] running then
(Ti.state):=probation
[] probation then skip end
end
end
end
end
end
meth newtrans(P ?R)
timestamp:=@timestamp+1 {self Trans(P R @timestamp)}
end
meth savereadstate(T C ?Sync)
if {Not {Dictionary.member T.save C.name}} then
(T.save).(C.name):= save(cell:C)
end Sync=ok
end
meth savewritestate(T C ?Sync)
if {Not {Dictionary.member T.save C.name}} then
(T.save).(C.name):=save(cell:C state:@(C.state))
end Sync=ok
end
meth commit(T) {self Unlockall(T false)} end
meth abort(T) {self Unlockall(T true)} end
end
fun {NewActive Class Init}
Obj={New Class Init}
P
in
thread S in
{NewPort S P}
for M in S do {Obj M} end
end
proc {$ M} {Send P M} end
end
fun {NewPrioQueue}
Q={NewCell nil}
proc {Enqueue X Prio}
fun {InsertLoop L}
case L of pair(Y P)|L2 then
if Prio<P then pair(X Prio)|L
else pair(Y P)|{InsertLoop L2} end
[] nil then [pair(X Prio)] end
end
in Q:={InsertLoop @Q} end
fun {Dequeue}
pair(Y _)|L2=@Q
in
Q:=L2 Y
end
fun {Delete Prio}
fun {DeleteLoop L}
case L of pair(Y P)|L2 then
if P==Prio then X=Y L2
else pair(Y P)|{DeleteLoop L2} end
[] nil then nil end
end X
in
Q:={DeleteLoop @Q}
X
end
%CHANGED - NEW ADDITION
fun {AllElems}
proc {Iter L ?A}
case L of pair(Y P)|L2 then X in
A=Y|X
{Iter L2 X}
[] nil then
A = nil
end
end
in
{Iter @Q $}
end
fun {IsEmpty} @Q==nil end
in
queue(enqueue:Enqueue dequeue:Dequeue allItems:AllElems
delete:Delete isEmpty:IsEmpty)
end
proc {NewTrans ?Trans ?NewCellT}
TM={NewActive TMClass init(TM)} in
fun {Trans P ?B} R in
{TM newtrans(P R)}
case R of abort then B=abort unit
[] abort(Exc) then B=abort raise Exc end
[] commit(Res) then B=commit Res end
end
fun {NewCellT X}
%CHANGED HERE
cell(name:{NewName} owner:{NewCell unit}
queue:{NewPrioQueue} state:{NewCell X}
readOwners:{NewPrioQueue})
end
end
==============NEW VERSION=====================
Transaction can now be in following states:
running
probation
waiting_read_on#C
waiting_write_on#C
The dictionary at T.save now stores records like save(cell:C state:S locktype:LockT) where LockT is either read or write.
Instead of a single owner, a Cell can now have either one owner with the write lock(represented by write#T) or several owners with the read lock(represented by read#T1,read#T2 etc) on the cell. These owners are stored in a priority queue at C.owners.
declare
class TMClass
attr timestamp tm
meth init(TM) timestamp:=0 tm:=TM end
meth Unlockall(T RestoreFlag)
{Show 'entered unlockall'}
for save(cell:C state:S locktype:LockT) in {Dictionary.items T.save} do
LockT#T = {C.owners.delete T.stamp}
if RestoreFlag then (C.state) := S end
if {Not {C.queue.isEmpty}} andthen {C.owners.isEmpty} then
Sync#T0 = {C.queue.peek} in
if @(T0.state)==waiting_write_on#C then
{Show 'unlockall01'}
Sync#T0 = {C.queue.delete T0.stamp}
(T0.state):=running
{Show 'unlockall02'}
in
{Show 'unlockall2'}
{@tm getWriteLock(T0 C Sync)}
{Show 'unlockall3'}
else
for Synci#Ti in {C.queue.allItems} do
if @(Ti.state) == waiting_read_on#C then
Synci#Ti = {C.queue.delete Ti.stamp}
(Ti.state) := running
{C.owners.enqueue read#Ti Ti.stamp}
Synci = ok
end
end
end
end
end
{Show 'exited unlockall'}
end
meth Trans(P ?R TS)
Halt={NewName}
T=trans(stamp:TS save:{NewDictionary} body:P
state:{NewCell running} result:R)
proc {ExcT C X Y} S1 S2 in
{Show 'entered ExcT'}
if {Dictionary.member T.save C.name} andthen
((T.save).(C.name)).locktype==write then
skip
else
{Show 'ExcT1'}
{@tm getWriteLock(T C S1)}
{Show 'Ext01'}
{Wait S1}
{Show 'Ext001'}
if S1==halt then {Show 'Ext02'} raise Halt end end
{Show 'Ext03'}
{@tm savewritestate(T C S2)} {Wait S2}
{Show 'ExcT2'}
end
{Show 'ExcT3'}
{Exchange C.state X Y}
{Show 'exited ExcT'}
end
proc {AccT C ?X}
{Show 'entered AccT'}
if {Not {Dictionary.member T.save C.name}} then S1 S2 in
{@tm getReadLock(T C S1)}
if S1==halt then raise Halt end end
{@tm savereadstate(T C S2)} {Wait S2}
end
{Show 'AccT2'}
{Exchange C.state X X}
{Show 'exited AccT'}
end
proc {AssT C X} {ExcT C _ X} end
proc {AboT} {@tm abort(T)} R=abort raise Halt end end
in
thread try Res={T.body t(access:AccT assign:AssT
exchange:ExcT abort:AboT)}
in {@tm commit(T)} R=commit(Res)
catch E then
if E\=Halt then {@tm abort(T)} R=abort(E) end
end end
end
meth getReadLock(T C ?Sync)
{Show 'entered getReadLock'}
if @(T.state)==probation then
{self Unlockall(T true)}
{self Trans(T.body T.result T.stamp)} Sync=halt
elseif {C.owners.isEmpty} then
{C.owners.enqueue read#T T.stamp} Sync=ok
else LockT#T2={C.owners.peek} in
if LockT==read then
{C.owners.enqueue read#T T.stamp} Sync=ok
elseif LockT==write then
if T.stamp==T2.stamp then Sync=ok
else
{C.queue.enqueue Sync#T T.stamp}
(T.state) := waiting_read_on#C
if T.stamp<T2.stamp then
case @(T2.state) of
running then
(T2.state) := probation
[] probation then skip
[] WaitT#C2 then
Sync2#_={C2.queue.delete T2.stamp} in
{self Unlockall(T2 true)}
{self Trans(T2.body T2.result T2.stamp)}
Sync2=halt
end
end
end
end
end
{Show 'exited getReadLock'}
end
meth getWriteLock(T C ?Sync)
{Show 'entered getWriteLock'}
if @(T.state)==probation then
{self Unlockall(T true)}
{self Trans(T.body T.result T.stamp)} Sync=halt
elseif {C.owners.isEmpty} then
{C.owners.enqueue write#T T.stamp} Sync=ok
else LockT#T2 = {C.owners.peek} in
if T.stamp==T2.stamp andthen LockT==write then
Sync = ok
elseif T.stamp==T2.stamp andthen LockT==read then
LockT#T2 = {C.owners.delete T2.stamp}
{Dictionary.remove T.save C.name}
{@tm getWriteLock(T C Sync)}
else
{C.queue.enqueue Sync#T T.stamp}
(T.state) := waiting_write_on#C
if T.stamp<T2.stamp then
%This is WRONG as we're not really emptying the
%owner queue and putting such condition.
{While
fun {$} {Not {C.owners.isEmpty}} end
proc {$}
_#Ti = {C.owners.dequeue}
in
{Show 'entered case'}
{Show state#Ti}
case @(Ti.state) of
running then
(Ti.state) := probation
[] probation then skip
[] WaitT#C2 then
Sync2#_={C2.queue.delete Ti.stamp} in
{self Unlockall(Ti true)}
{self Trans(Ti.body Ti.result Ti.stamp)}
Sync2=halt
end
{Show 'exited case'}
end}
end
end
end
{Show 'exited getWriteLock'}
end
meth newtrans(P ?R)
timestamp:=@timestamp+1 {self Trans(P R @timestamp)}
end
meth savereadstate(T C ?Sync)
if {Not {Dictionary.member T.save C.name}} then
(T.save).(C.name):= save(cell:C state:@(C.state) locktype:read)
end Sync=ok
end
meth savewritestate(T C ?Sync)
(T.save).(C.name):=save(cell:C state:@(C.state) locktype:write)
Sync = ok
end
meth commit(T) {self Unlockall(T false)} end
meth abort(T) {self Unlockall(T true)} end
end
fun {NewActive Class Init}
Obj={New Class Init}
P
in
thread S in
{NewPort S P}
for M in S do {Obj M} end
end
proc {$ M} {Send P M} end
end
fun {NewPrioQueue}
Q={NewCell nil}
proc {Enqueue X Prio}
fun {InsertLoop L}
case L of pair(Y P)|L2 then
if Prio<P then pair(X Prio)|L
else pair(Y P)|{InsertLoop L2} end
[] nil then [pair(X Prio)] end
end
in Q:={InsertLoop @Q} end
fun {Dequeue}
pair(Y _)|L2=@Q
in
Q:=L2 Y
end
fun {Delete Prio}
fun {DeleteLoop L}
case L of pair(Y P)|L2 then
if P==Prio then X=Y L2
else pair(Y P)|{DeleteLoop L2} end
[] nil then nil end
end X
in
{Show 'deleteentered'}
Q:={DeleteLoop @Q}
{Show 'deleteexited'#X}
X
end
%CHANGED - NEW ADDITION
fun {AllElems}
proc {Iter L ?A}
case L of pair(Y P)|L2 then X in
A=Y|X
{Iter L2 X}
[] nil then
A = nil
end
end
in
{Iter @Q $}
end
fun {Peek}
pair(Y _)|L2=@Q
in
Y
end
fun {IsEmpty} @Q==nil end
in
queue(enqueue:Enqueue dequeue:Dequeue allItems:AllElems
peek:Peek delete:Delete isEmpty:IsEmpty)
end
proc {NewTrans ?Trans ?NewCellT}
TM={NewActive TMClass init(TM)} in
fun {Trans P ?B} R in
{TM newtrans(P R)}
case R of abort then B=abort unit
[] abort(Exc) then B=abort raise Exc end
[] commit(Res) then B=commit Res end
end
fun {NewCellT X}
%CHANGED HERE
cell(name:{NewName} owners:{NewPrioQueue}
queue:{NewPrioQueue} state:{NewCell X})
end
end
=================code to run sum example==============
declare S D Rand Mix Sum Trans NewCellT
{NewTrans Trans NewCellT}
D={MakeTuple db 100}
for I in 1..100 do D.I={NewCellT I} end
fun {Rand} {OS.rand} mod 100 + 1 end
proc {Mix} {Trans
proc {$ T _}
I={Rand} J={Rand} K={Rand}
{Show I#J#K}
A={T.access D.I} B={T.access D.J} C={T.access D.K}
{Show mix1}
in
{Show mix2}
{T.assign D.I A+B-C}
{Show mix3}
{T.assign D.J A-B+C}
{Show mix4}
if I==J orelse I==K orelse J==K then {T.abort} end
{Show mix5}
{T.assign D.K ~A+B+C}
{Show mix6}
end _ _}
end
S={NewCellT 0}
fun {Sum}
{Trans
fun {$ T} {T.assign S 0}
for I in 1..100 do
{T.assign S {T.access S}+{T.access D.I}} end
{T.access S}
end _}
end
{Browse {Sum}} % Displays 5050
for I in 1..1000 do thread {Mix} end end
{Browse {Sum}} % Still displays 5050