-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcoordinator.ex
More file actions
129 lines (98 loc) · 3.53 KB
/
coordinator.ex
File metadata and controls
129 lines (98 loc) · 3.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#
# Note: this is not a robust coordinator implementation, it lacks durable state,
# high availability, crash recovery and participant polling. this is only
# intended as a bare-bones demonstration of how to use the state maachine.
#
defmodule Coordinator do
use GenServer
def start_link do
GenServer.start_link(__MODULE__, nil)
end
def transaction(coordinator, fun) do
{:ok, txn_id} = GenServer.call(coordinator, {:begin, self()})
result = fun.(txn_id)
:ok = GenServer.call(coordinator, :prepare)
receive do
{:committed, ^txn_id} ->
{:ok, result}
{:aborted, ^txn_id} ->
{:error, :aborted}
end
end
def add_participant(coordinator, participant) do
GenServer.call(coordinator, {:add_participant, participant})
end
def prepared(coordinator, participant) do
GenServer.cast(coordinator, {:prepared, participant})
end
def aborted(coordinator, participant) do
GenServer.cast(coordinator, {:aborted, participant})
end
def rolled_back(coordinator, participant) do
GenServer.cast(coordinator, {:rolled_back, participant})
end
def committed(coordinator, participant) do
GenServer.cast(coordinator, {:committed, participant})
end
@impl true
def init(nil) do
{:ok, :ready}
end
@impl true
def handle_call({:begin, client}, _from, :ready) do
txn_id = self()
two_phase_commit = TwoPhaseCommit.new([], id: txn_id, client: client)
{:reply, {:ok, txn_id}, two_phase_commit}
end
def handle_call({:add_participant, participant}, _from, two_phase_commit) do
{:reply, :ok, TwoPhaseCommit.add_participant(two_phase_commit, participant)}
end
def handle_call(:prepare, _from, two_phase_commit) do
{:ok, two_phase_commit} = TwoPhaseCommit.prepare(two_phase_commit)
two_phase_commit
|> TwoPhaseCommit.participants()
|> Enum.each(&GenServer.cast(&1, {:prepare, self()}))
{:reply, :ok, two_phase_commit}
end
@impl true
def handle_cast({:prepared, participant}, two_phase_commit) do
{:ok, two_phase_commit} = TwoPhaseCommit.prepared(two_phase_commit, participant)
case TwoPhaseCommit.next_action(two_phase_commit) do
{:commit, participants} ->
Enum.each(participants, &GenServer.cast(&1, {:commit, self()}))
_ ->
:ok
end
{:noreply, two_phase_commit}
end
def handle_cast({:aborted, participant}, two_phase_commit) do
{:ok, two_phase_commit} = TwoPhaseCommit.aborted(two_phase_commit, participant)
#
# we send a raw message here because we don't assume that all participants are Accounts
#
two_phase_commit
|> TwoPhaseCommit.participants()
|> Enum.each(&GenServer.cast(&1, {:roll_back, self()}))
{:noreply, two_phase_commit}
end
def handle_cast({:rolled_back, participant}, %TwoPhaseCommit{id: id, client: client} = two_phase_commit) do
{:ok, two_phase_commit} = TwoPhaseCommit.rolled_back(two_phase_commit, participant)
case two_phase_commit do
%TwoPhaseCommit{state: :aborted} ->
send(client, {:aborted, id})
{:stop, :normal, two_phase_commit}
_ ->
{:noreply, two_phase_commit}
end
end
def handle_cast({:committed, participant}, %TwoPhaseCommit{id: id, client: client} = two_phase_commit) do
{:ok, two_phase_commit} = TwoPhaseCommit.committed(two_phase_commit, participant)
case two_phase_commit do
%TwoPhaseCommit{state: :committed} ->
send(client, {:committed, id})
{:stop, :normal, two_phase_commit}
_ ->
{:noreply, two_phase_commit}
end
end
end