248 lines
8.4 KiB
Erlang

-module(client).
-behavior(gen_server).
-export([start/1, start/2, connect/1, connect/2, asend/2, send/3, send/2,
disconnect/1, ssend/3, str/1, format/1, sformat/1, ssend/2,
get_all_results/1]).
-export([init/1, handle_call/3, handle_cast/2,
handle_info/2, terminate/2, code_change/3]).
-include("erldis.hrl").
-define(EOL, "\r\n").
%% Helpers
str(X) when is_list(X) ->
X;
str(X) when is_atom(X) ->
atom_to_list(X);
str(X) when is_binary(X) ->
binary_to_list(X);
str(X) when is_integer(X) ->
integer_to_list(X);
str(X) when is_float(X) ->
float_to_list(X).
format([], Result) ->
string:join(lists:reverse(Result), ?EOL);
format([Line|Rest], Result) ->
JoinedLine = string:join([str(X) || X <- Line], " "),
format(Rest, [JoinedLine|Result]).
format(Lines) ->
format(Lines, []).
sformat(Line) ->
format([Line], []).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Exported API
start(Host) ->
connect(Host).
start(Host, Port) ->
connect(Host, Port).
connect(Host) ->
connect(Host, 6379).
connect(Host, Port) ->
gen_server:start_link(?MODULE, [Host, Port], []).
ssend(Client, Cmd) -> ssend(Client, Cmd, []).
ssend(Client, Cmd, Args) ->
gen_server:cast(Client, {send, sformat([Cmd|Args])}).
send(Client, Cmd) -> send(Client, Cmd, []).
send(Client, Cmd, Args) ->
gen_server:cast(Client, {send,
string:join([str(Cmd), format(Args)], " ")}).
asend(Client, Cmd) ->
gen_server:cast(Client, {asend, Cmd}).
disconnect(Client) ->
gen_server:call(Client, disconnect).
get_all_results(Client) ->
gen_server:call(Client, get_all_results).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% gen_server callbacks
init([Host, Port]) ->
process_flag(trap_exit, true),
ConnectOptions = [list, {active, once}, {packet, line}, {nodelay, true}],
case gen_tcp:connect(Host, Port, ConnectOptions) of
{error, Why} ->
{error, {socket_error, Why}};
{ok, Socket} ->
{ok, #redis{socket=Socket, calls=0}}
end.
handle_call({send, Cmd}, From, State) ->
gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
{noreply, State#redis{reply_caller=fun(V) -> gen_server:reply(From, lists:nth(1, V)) end,
remaining=1}};
handle_call(disconnect, _From, State) ->
{stop, normal, ok, State};
handle_call(get_all_results, From, State) ->
case State#redis.calls of
0 ->
% answers came earlier than we could start listening...
% Very unlikely but totally possible.
{reply, lists:reverse(State#redis.results), State#redis{results=[], calls=0}};
_ ->
% We are here earlier than results came, so just make
% ourselves wait until stuff is ready.
{noreply, State#redis{reply_caller=fun(V) -> gen_server:reply(From, V) end}}
end;
handle_call(_, _From, State) -> {noreply, State}.
handle_cast({asend, Cmd}, State) ->
gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
{noreply, State};
handle_cast({send, Cmd}, State=#redis{remaining=Remaining, calls=Calls}) ->
% how we should do here: if remaining is already != 0 then we'll
% let handle_info take care of keeping track how many remaining things
% there are. If instead it's 0 we are the first call so let's just
% do it.
gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
case Remaining of
0 ->
{noreply, State#redis{remaining=1, calls=1}};
_ ->
{noreply, State#redis{calls=Calls+1}}
end;
handle_cast(_Msg, State) -> {noreply, State}.
trim2({ok, S}) ->
string:substr(S, 1, length(S)-2);
trim2(S) ->
trim2({ok, S}).
% This function helps with pipelining by creating a pubsub system with
% the caller. The caller could submit multiple requests and not listen
% until later when all or some of them have been answered, at that
% point 2 conditions can be true:
% 1) We still need to process more things in this response chain
% 2) We are finished.
%
% And these 2 are together with the following 2:
% 1) We called get_all_results before the end of the responses.
% 2) We called get_all_results after the end of the responses.
%
% If there's stuff missing in the chain we just push results, this also
% happens when there's nothing more to process BUT we haven't requested
% results yet.
% In case we have requested results: if requests are not yet ready we
% just push them, otherwise we finally answer all of them.
save_or_reply(Result, State=#redis{calls=Calls, results=Results, reply_caller=ReplyCaller}) ->
case Calls of
0 ->
% We don't reverse results here because if all the requests
% come in and then we submit another one, if we reverse
% they will be scrambled in the results field of the record.
% instead if we wait just before we reply they will be
% in the right order.
FullResults = [Result|Results],
NewState = case ReplyCaller of
undefined ->
State#redis{results=FullResults};
_ ->
ReplyCaller(lists:reverse(FullResults)),
State#redis{results=[]}
end,
NewState#redis{remaining=0, pstate=empty,
reply_caller=undefined, buffer=[],
calls=0};
_ ->
State#redis{results=[Result|Results], remaining=1, pstate=empty, buffer=[], calls=Calls}
end.
handle_info({tcp, Socket, Data}, State=#redis{calls=Calls}) ->
Trimmed = trim2(Data),
NewState = case {State#redis.remaining-1, proto:parse(State#redis.pstate, Trimmed)} of
% This line contained an error code. Next line will hold
% The error message that we will parse.
{0, error} ->
State#redis{remaining=1, pstate=error};
% The stateful parser just started and tells us the number
% of results that we will have to parse for those calls
% where more than one result is expected. The next
% line will start with the first item to read.
{0, {hold, Remaining}} ->
case Remaining of
nil ->
save_or_reply(nil, State#redis{calls=Calls-1});
_ ->
% Reset the remaining value to the number of results that we need to parse.
State#redis{remaining=Remaining, pstate=read}
end;
% We either had only one thing to read or we are at the
% end of the stuff that we need to read. either way
% just pack up the buffer and send.
{0, {read, NBytes}} ->
CurrentValue = case NBytes of
nil ->
nil;
_ ->
inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
CV = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n
inet:setopts(Socket, [{packet, line}]), % go back to line mode
CV
end,
OldBuffer = State#redis.buffer,
case OldBuffer of
[] ->
save_or_reply(CurrentValue, State#redis{calls=Calls-1});
_ ->
save_or_reply(lists:reverse([CurrentValue|OldBuffer]), State#redis{calls=Calls-1})
end;
% The stateful parser tells us to read some bytes
{N, {read, NBytes}} ->
% annoying repetition... I should reuse this code.
CurrentValue = case NBytes of
nil ->
nil;
_ ->
inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
CV = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n
inet:setopts(Socket, [{packet, line}]), % go back to line mode
CV
end,
OldBuffer = State#redis.buffer,
State#redis{remaining=N, buffer=[CurrentValue|OldBuffer], pstate=read};
% Simple return values contained in a single line
{0, Value} ->
save_or_reply(Value, State#redis{calls=Calls-1})
end,
inet:setopts(Socket, [{active, once}]),
{noreply, NewState};
handle_info(_Info, State) -> {noreply, State}.
terminate(_Reason, State) ->
case State#redis.socket of
undefined ->
pass;
Socket ->
gen_tcp:close(Socket)
end,
ok.
code_change(_OldVsn, State, _Extra) -> {ok, State}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%