Spaces:
Sleeping
Sleeping
Scott Hiett
commited on
Commit
•
abcde98
1
Parent(s):
4ef9714
Transaction support!
Browse files- config/config.exs +3 -3
- config/prod.exs +1 -1
- config/runtime.exs +3 -3
- lib/srh.ex +1 -1
- lib/srh/http/base_router.ex +9 -0
- lib/srh/http/command_handler.ex +81 -6
- lib/srh/redis/client.ex +21 -0
- lib/srh/redis/client_registry.ex +63 -17
config/config.exs
CHANGED
@@ -1,8 +1,8 @@
|
|
1 |
import Config
|
2 |
|
3 |
config :srh,
|
4 |
-
|
5 |
-
|
6 |
-
|
7 |
|
8 |
import_config "#{config_env()}.exs"
|
|
|
1 |
import Config
|
2 |
|
3 |
config :srh,
|
4 |
+
mode: "file",
|
5 |
+
file_path: "srh-config/tokens.json",
|
6 |
+
port: 8080
|
7 |
|
8 |
import_config "#{config_env()}.exs"
|
config/prod.exs
CHANGED
@@ -1,4 +1,4 @@
|
|
1 |
import Config
|
2 |
|
3 |
config :srh,
|
4 |
-
|
|
|
1 |
import Config
|
2 |
|
3 |
config :srh,
|
4 |
+
port: 80
|
config/runtime.exs
CHANGED
@@ -1,6 +1,6 @@
|
|
1 |
import Config
|
2 |
|
3 |
config :srh,
|
4 |
-
|
5 |
-
|
6 |
-
|
|
|
1 |
import Config
|
2 |
|
3 |
config :srh,
|
4 |
+
mode: System.get_env("TOKEN_RESOLUTION_MODE") || "file",
|
5 |
+
file_path: System.get_env("TOKEN_RESOLUTION_FILE_PATH") || "srh-config/tokens.json",
|
6 |
+
port: Integer.parse(System.get_env("PORT") || "8080")
|
lib/srh.ex
CHANGED
@@ -5,7 +5,7 @@ defmodule Srh do
|
|
5 |
|
6 |
def start(_type, _args) do
|
7 |
IO.puts("Using port #{@port}")
|
8 |
-
|
9 |
children = [
|
10 |
Srh.Auth.TokenResolver,
|
11 |
{GenRegistry, worker_module: Srh.Redis.Client},
|
|
|
5 |
|
6 |
def start(_type, _args) do
|
7 |
IO.puts("Using port #{@port}")
|
8 |
+
|
9 |
children = [
|
10 |
Srh.Auth.TokenResolver,
|
11 |
{GenRegistry, worker_module: Srh.Redis.Client},
|
lib/srh/http/base_router.ex
CHANGED
@@ -23,6 +23,12 @@ defmodule Srh.Http.BaseRouter do
|
|
23 |
|> handle_response(conn)
|
24 |
end
|
25 |
|
|
|
|
|
|
|
|
|
|
|
|
|
26 |
match _ do
|
27 |
send_resp(conn, 404, "Endpoint not found")
|
28 |
end
|
@@ -51,6 +57,9 @@ defmodule Srh.Http.BaseRouter do
|
|
51 |
{:malformed_data, message} ->
|
52 |
%{code: 400, message: message, json: false}
|
53 |
|
|
|
|
|
|
|
54 |
{:not_authorized, message} ->
|
55 |
%{code: 401, message: message, json: false}
|
56 |
|
|
|
23 |
|> handle_response(conn)
|
24 |
end
|
25 |
|
26 |
+
post "/multi-exec" do
|
27 |
+
conn
|
28 |
+
|> handle_extract_auth(&CommandHandler.handle_command_transaction_array(conn, &1))
|
29 |
+
|> handle_response(conn)
|
30 |
+
end
|
31 |
+
|
32 |
match _ do
|
33 |
send_resp(conn, 404, "Endpoint not found")
|
34 |
end
|
|
|
57 |
{:malformed_data, message} ->
|
58 |
%{code: 400, message: message, json: false}
|
59 |
|
60 |
+
{:redis_error, data} ->
|
61 |
+
%{code: 400, message: Jason.encode!(data), json: true}
|
62 |
+
|
63 |
{:not_authorized, message} ->
|
64 |
%{code: 401, message: message, json: false}
|
65 |
|
lib/srh/http/command_handler.ex
CHANGED
@@ -24,6 +24,17 @@ defmodule Srh.Http.CommandHandler do
|
|
24 |
end
|
25 |
end
|
26 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
27 |
defp do_handle_command(command_array, token) do
|
28 |
case TokenResolver.resolve(token) do
|
29 |
{:ok, connection_info} ->
|
@@ -44,6 +55,16 @@ defmodule Srh.Http.CommandHandler do
|
|
44 |
end
|
45 |
end
|
46 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
47 |
defp dispatch_command_array(_arr, _connection_info, responses \\ [])
|
48 |
|
49 |
defp dispatch_command_array([current | rest], connection_info, responses) do
|
@@ -52,9 +73,8 @@ defmodule Srh.Http.CommandHandler do
|
|
52 |
{:ok, result_map} ->
|
53 |
[result_map | responses]
|
54 |
|
55 |
-
{:
|
56 |
-
|
57 |
-
[Jason.decode!(result_json) | responses]
|
58 |
end
|
59 |
|
60 |
dispatch_command_array(rest, connection_info, updated_responses)
|
@@ -65,6 +85,61 @@ defmodule Srh.Http.CommandHandler do
|
|
65 |
{:ok, Enum.reverse(responses)}
|
66 |
end
|
67 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
68 |
defp dispatch_command(
|
69 |
command_array,
|
70 |
%{"srh_id" => srh_id, "max_connections" => max_connections} = connection_info
|
@@ -80,10 +155,10 @@ defmodule Srh.Http.CommandHandler do
|
|
80 |
|
81 |
{:error, error} ->
|
82 |
{
|
83 |
-
:
|
84 |
-
|
85 |
error: error.message
|
86 |
-
}
|
87 |
}
|
88 |
end
|
89 |
|
|
|
24 |
end
|
25 |
end
|
26 |
|
27 |
+
def handle_command_transaction_array(conn, token) do
|
28 |
+
# Transactions use the same body format as pipelines, so we can use the same validator
|
29 |
+
case RequestValidator.validate_pipeline_redis_body(conn.body_params) do
|
30 |
+
{:ok, array_of_command_arrays} ->
|
31 |
+
do_handle_command_transaction_array(array_of_command_arrays, token)
|
32 |
+
|
33 |
+
{:error, error_message} ->
|
34 |
+
{:malformed_data, error_message}
|
35 |
+
end
|
36 |
+
end
|
37 |
+
|
38 |
defp do_handle_command(command_array, token) do
|
39 |
case TokenResolver.resolve(token) do
|
40 |
{:ok, connection_info} ->
|
|
|
55 |
end
|
56 |
end
|
57 |
|
58 |
+
defp do_handle_command_transaction_array(array_of_command_arrays, token) do
|
59 |
+
case TokenResolver.resolve(token) do
|
60 |
+
{:ok, connection_info} ->
|
61 |
+
dispatch_command_transaction_array(array_of_command_arrays, connection_info)
|
62 |
+
|
63 |
+
{:error, msg} ->
|
64 |
+
{:not_authorized, msg}
|
65 |
+
end
|
66 |
+
end
|
67 |
+
|
68 |
defp dispatch_command_array(_arr, _connection_info, responses \\ [])
|
69 |
|
70 |
defp dispatch_command_array([current | rest], connection_info, responses) do
|
|
|
73 |
{:ok, result_map} ->
|
74 |
[result_map | responses]
|
75 |
|
76 |
+
{:redis_error, result} ->
|
77 |
+
[result | responses]
|
|
|
78 |
end
|
79 |
|
80 |
dispatch_command_array(rest, connection_info, updated_responses)
|
|
|
85 |
{:ok, Enum.reverse(responses)}
|
86 |
end
|
87 |
|
88 |
+
defp dispatch_command_transaction_array(
|
89 |
+
command_array,
|
90 |
+
%{"srh_id" => srh_id, "max_connections" => max_connections} = connection_info,
|
91 |
+
responses \\ []
|
92 |
+
) do
|
93 |
+
case GenRegistry.lookup_or_start(Client, srh_id, [max_connections, connection_info]) do
|
94 |
+
{:ok, client_pid} ->
|
95 |
+
# Borrow a client, then run all of the commands (wrapped in MULTI and EXEC)
|
96 |
+
worker_pid = Client.borrow_worker(client_pid)
|
97 |
+
|
98 |
+
wrapped_command_array = [["MULTI"] | command_array]
|
99 |
+
do_dispatch_command_transaction_array(wrapped_command_array, worker_pid, responses)
|
100 |
+
|
101 |
+
# Now manually run the EXEC - this is what contains the information to form the response, not the above
|
102 |
+
result = case ClientWorker.redis_command(worker_pid, ["EXEC"]) do
|
103 |
+
{:ok, res} ->
|
104 |
+
{
|
105 |
+
:ok,
|
106 |
+
res
|
107 |
+
|> Enum.map(&(%{result: &1}))
|
108 |
+
}
|
109 |
+
# TODO: Can there be any inline errors here? Wouldn't they fail the whole tx?
|
110 |
+
|
111 |
+
{:error, error} ->
|
112 |
+
{:redis_error, %{error: error.message}}
|
113 |
+
end
|
114 |
+
|
115 |
+
Client.return_worker(client_pid, worker_pid)
|
116 |
+
|
117 |
+
result
|
118 |
+
{:error, msg} ->
|
119 |
+
{:server_error, msg}
|
120 |
+
end
|
121 |
+
end
|
122 |
+
|
123 |
+
defp do_dispatch_command_transaction_array([current | rest], worker_pid, responses) when is_pid(worker_pid) do
|
124 |
+
updated_responses = case ClientWorker.redis_command(worker_pid, current) do
|
125 |
+
{:ok, res} ->
|
126 |
+
[%{result: res} | responses]
|
127 |
+
|
128 |
+
{:error, error} ->
|
129 |
+
[
|
130 |
+
%{
|
131 |
+
error: error.message
|
132 |
+
} | responses
|
133 |
+
]
|
134 |
+
end
|
135 |
+
|
136 |
+
do_dispatch_command_transaction_array(rest, worker_pid, updated_responses)
|
137 |
+
end
|
138 |
+
|
139 |
+
defp do_dispatch_command_transaction_array([], worker_pid, responses) when is_pid(worker_pid) do
|
140 |
+
{:ok, Enum.reverse(responses)}
|
141 |
+
end
|
142 |
+
|
143 |
defp dispatch_command(
|
144 |
command_array,
|
145 |
%{"srh_id" => srh_id, "max_connections" => max_connections} = connection_info
|
|
|
155 |
|
156 |
{:error, error} ->
|
157 |
{
|
158 |
+
:redis_error,
|
159 |
+
%{
|
160 |
error: error.message
|
161 |
+
}
|
162 |
}
|
163 |
end
|
164 |
|
lib/srh/redis/client.ex
CHANGED
@@ -27,6 +27,14 @@ defmodule Srh.Redis.Client do
|
|
27 |
GenServer.call(client, {:find_worker})
|
28 |
end
|
29 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
30 |
def handle_call({:find_worker}, _from, %{registry_pid: registry_pid} = state)
|
31 |
when is_pid(registry_pid) do
|
32 |
{:ok, worker} = ClientRegistry.find_worker(registry_pid)
|
@@ -34,10 +42,23 @@ defmodule Srh.Redis.Client do
|
|
34 |
{:reply, worker, state}
|
35 |
end
|
36 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
37 |
def handle_call(_msg, _from, state) do
|
38 |
{:reply, :ok, state}
|
39 |
end
|
40 |
|
|
|
|
|
|
|
|
|
|
|
|
|
41 |
def handle_cast(_msg, state) do
|
42 |
{:noreply, state}
|
43 |
end
|
|
|
27 |
GenServer.call(client, {:find_worker})
|
28 |
end
|
29 |
|
30 |
+
def borrow_worker(client) do
|
31 |
+
GenServer.call(client, {:borrow_worker})
|
32 |
+
end
|
33 |
+
|
34 |
+
def return_worker(client, pid) do
|
35 |
+
GenServer.cast(client, {:return_worker, pid})
|
36 |
+
end
|
37 |
+
|
38 |
def handle_call({:find_worker}, _from, %{registry_pid: registry_pid} = state)
|
39 |
when is_pid(registry_pid) do
|
40 |
{:ok, worker} = ClientRegistry.find_worker(registry_pid)
|
|
|
42 |
{:reply, worker, state}
|
43 |
end
|
44 |
|
45 |
+
def handle_call({:borrow_worker}, _from, %{registry_pid: registry_pid} = state)
|
46 |
+
when is_pid(registry_pid) do
|
47 |
+
{:ok, worker} = ClientRegistry.borrow_worker(registry_pid)
|
48 |
+
Process.send(self(), :reset_idle_death, [])
|
49 |
+
{:reply, worker, state}
|
50 |
+
end
|
51 |
+
|
52 |
def handle_call(_msg, _from, state) do
|
53 |
{:reply, :ok, state}
|
54 |
end
|
55 |
|
56 |
+
def handle_cast({:return_worker, pid}, %{registry_pid: registry_pid} = state)
|
57 |
+
when is_pid(pid) and is_pid(registry_pid) do
|
58 |
+
ClientRegistry.return_worker(registry_pid, pid)
|
59 |
+
{:noreply, state}
|
60 |
+
end
|
61 |
+
|
62 |
def handle_cast(_msg, state) do
|
63 |
{:noreply, state}
|
64 |
end
|
lib/srh/redis/client_registry.ex
CHANGED
@@ -10,7 +10,8 @@ defmodule Srh.Redis.ClientRegistry do
|
|
10 |
:ok,
|
11 |
%{
|
12 |
worker_pids: [],
|
13 |
-
last_worker_index: 0
|
|
|
14 |
}
|
15 |
}
|
16 |
end
|
@@ -19,6 +20,14 @@ defmodule Srh.Redis.ClientRegistry do
|
|
19 |
GenServer.call(registry, {:find_worker})
|
20 |
end
|
21 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
22 |
def add_worker(registry, pid) do
|
23 |
GenServer.cast(registry, {:add_worker, pid})
|
24 |
end
|
@@ -27,25 +36,31 @@ defmodule Srh.Redis.ClientRegistry do
|
|
27 |
GenServer.cast(registry, {:destroy_workers})
|
28 |
end
|
29 |
|
30 |
-
def handle_call({:
|
31 |
-
case
|
32 |
-
|
33 |
-
{:reply, {:error,
|
34 |
-
|
35 |
-
|
36 |
-
|
37 |
-
|
38 |
-
|
39 |
-
|
40 |
-
|
41 |
-
|
42 |
-
|
43 |
-
|
44 |
-
|
45 |
-
|
|
|
46 |
end
|
47 |
end
|
48 |
|
|
|
|
|
|
|
|
|
|
|
49 |
def handle_call(_msg, _from, state) do
|
50 |
{:reply, :ok, state}
|
51 |
end
|
@@ -72,6 +87,12 @@ defmodule Srh.Redis.ClientRegistry do
|
|
72 |
{:noreply, %{state | worker_pids: [], last_worker_index: 0}}
|
73 |
end
|
74 |
|
|
|
|
|
|
|
|
|
|
|
|
|
75 |
def handle_cast(_msg, state) do
|
76 |
{:noreply, state}
|
77 |
end
|
@@ -83,4 +104,29 @@ defmodule Srh.Redis.ClientRegistry do
|
|
83 |
def handle_info(_msg, state) do
|
84 |
{:noreply, state}
|
85 |
end
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
86 |
end
|
|
|
10 |
:ok,
|
11 |
%{
|
12 |
worker_pids: [],
|
13 |
+
last_worker_index: 0,
|
14 |
+
currently_borrowed_pids: []
|
15 |
}
|
16 |
}
|
17 |
end
|
|
|
20 |
GenServer.call(registry, {:find_worker})
|
21 |
end
|
22 |
|
23 |
+
def borrow_worker(registry) do
|
24 |
+
GenServer.call(registry, {:borrow_worker})
|
25 |
+
end
|
26 |
+
|
27 |
+
def return_worker(registry, pid) do
|
28 |
+
GenServer.cast(registry, {:return_worker, pid})
|
29 |
+
end
|
30 |
+
|
31 |
def add_worker(registry, pid) do
|
32 |
GenServer.cast(registry, {:add_worker, pid})
|
33 |
end
|
|
|
36 |
GenServer.cast(registry, {:destroy_workers})
|
37 |
end
|
38 |
|
39 |
+
def handle_call({:borrow_worker}, _from, state) do
|
40 |
+
case do_find_worker(state) do
|
41 |
+
{{:error, msg}, state_update} ->
|
42 |
+
{:reply, {:error, msg}, state_update}
|
43 |
+
|
44 |
+
{{:ok, pid}, state_update} ->
|
45 |
+
# We want to put this pid into the borrowed pids state list
|
46 |
+
{
|
47 |
+
:reply,
|
48 |
+
{:ok, pid},
|
49 |
+
%{
|
50 |
+
state_update
|
51 |
+
| currently_borrowed_pids:
|
52 |
+
[pid | state_update.currently_borrowed_pids]
|
53 |
+
|> Enum.uniq()
|
54 |
+
}
|
55 |
+
}
|
56 |
end
|
57 |
end
|
58 |
|
59 |
+
def handle_call({:find_worker}, _from, state) do
|
60 |
+
{res, state_update} = do_find_worker(state)
|
61 |
+
{:reply, res, state_update}
|
62 |
+
end
|
63 |
+
|
64 |
def handle_call(_msg, _from, state) do
|
65 |
{:reply, :ok, state}
|
66 |
end
|
|
|
87 |
{:noreply, %{state | worker_pids: [], last_worker_index: 0}}
|
88 |
end
|
89 |
|
90 |
+
def handle_cast({:return_worker, pid}, state) do
|
91 |
+
# Remove it from the borrowed array
|
92 |
+
{:noreply,
|
93 |
+
%{state | currently_borrowed_pids: List.delete(state.currently_borrowed_pids, pid)}}
|
94 |
+
end
|
95 |
+
|
96 |
def handle_cast(_msg, state) do
|
97 |
{:noreply, state}
|
98 |
end
|
|
|
104 |
def handle_info(_msg, state) do
|
105 |
{:noreply, state}
|
106 |
end
|
107 |
+
|
108 |
+
defp do_find_worker(state) do
|
109 |
+
filtered_pids =
|
110 |
+
state.worker_pids
|
111 |
+
|> Enum.filter(&(!Enum.member?(state.currently_borrowed_pids, &1)))
|
112 |
+
|
113 |
+
case length(filtered_pids) do
|
114 |
+
0 ->
|
115 |
+
{{:error, :none_available}, state}
|
116 |
+
|
117 |
+
len ->
|
118 |
+
target = state.last_worker_index + 1
|
119 |
+
|
120 |
+
corrected_target =
|
121 |
+
case target >= len do
|
122 |
+
true -> 0
|
123 |
+
false -> target
|
124 |
+
end
|
125 |
+
|
126 |
+
{
|
127 |
+
{:ok, Enum.at(state.worker_pids, corrected_target)},
|
128 |
+
%{state | last_worker_index: corrected_target}
|
129 |
+
}
|
130 |
+
end
|
131 |
+
end
|
132 |
end
|