mirror of
https://github.com/fluencelabs/redis
synced 2025-04-01 23:31:03 +00:00
273 lines
9.9 KiB
Erlang
273 lines
9.9 KiB
Erlang
|
-module(client).
|
||
|
-behavior(gen_server).
|
||
|
|
||
|
-export([start/1, start/2, connect/1, connect/2, asend/2, send/3, send/2,
|
||
|
disconnect/1, ssend/3, str/1, format/1, sformat/1, ssend/2,
|
||
|
get_all_results/1]).
|
||
|
-export([init/1, handle_call/3, handle_cast/2,
|
||
|
handle_info/2, terminate/2, code_change/3]).
|
||
|
|
||
|
-include("erldis.hrl").
|
||
|
|
||
|
-define(EOL, "\r\n").
|
||
|
|
||
|
|
||
|
%% Helpers
|
||
|
str(X) when is_list(X) ->
|
||
|
X;
|
||
|
str(X) when is_atom(X) ->
|
||
|
atom_to_list(X);
|
||
|
str(X) when is_binary(X) ->
|
||
|
binary_to_list(X);
|
||
|
str(X) when is_integer(X) ->
|
||
|
integer_to_list(X);
|
||
|
str(X) when is_float(X) ->
|
||
|
float_to_list(X).
|
||
|
|
||
|
format([], Result) ->
|
||
|
string:join(lists:reverse(Result), ?EOL);
|
||
|
format([Line|Rest], Result) ->
|
||
|
JoinedLine = string:join([str(X) || X <- Line], " "),
|
||
|
format(Rest, [JoinedLine|Result]).
|
||
|
|
||
|
format(Lines) ->
|
||
|
format(Lines, []).
|
||
|
sformat(Line) ->
|
||
|
format([Line], []).
|
||
|
|
||
|
get_parser(Cmd)
|
||
|
when Cmd =:= set orelse Cmd =:= setnx orelse Cmd =:= del
|
||
|
orelse Cmd =:= exists orelse Cmd =:= rename orelse Cmd =:= renamenx
|
||
|
orelse Cmd =:= rpush orelse Cmd =:= lpush orelse Cmd =:= ltrim
|
||
|
orelse Cmd =:= lset orelse Cmd =:= sadd orelse Cmd =:= srem
|
||
|
orelse Cmd =:= sismember orelse Cmd =:= select orelse Cmd =:= move
|
||
|
orelse Cmd =:= save orelse Cmd =:= bgsave orelse Cmd =:= flushdb
|
||
|
orelse Cmd =:= flushall ->
|
||
|
fun proto:parse/2;
|
||
|
get_parser(Cmd) when Cmd =:= lrem ->
|
||
|
fun proto:parse_special/2;
|
||
|
get_parser(Cmd)
|
||
|
when Cmd =:= incr orelse Cmd =:= incrby orelse Cmd =:= decr
|
||
|
orelse Cmd =:= decrby orelse Cmd =:= llen orelse Cmd =:= scard ->
|
||
|
fun proto:parse_int/2;
|
||
|
get_parser(Cmd) when Cmd =:= type ->
|
||
|
fun proto:parse_types/2;
|
||
|
get_parser(Cmd) when Cmd =:= randomkey ->
|
||
|
fun proto:parse_string/2;
|
||
|
get_parser(Cmd)
|
||
|
when Cmd =:= get orelse Cmd =:= lindex orelse Cmd =:= lpop
|
||
|
orelse Cmd =:= rpop ->
|
||
|
fun proto:single_stateful_parser/2;
|
||
|
get_parser(Cmd)
|
||
|
when Cmd =:= keys orelse Cmd =:= lrange orelse Cmd =:= sinter
|
||
|
orelse Cmd =:= smembers orelse Cmd =:= sort ->
|
||
|
fun proto:stateful_parser/2.
|
||
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||
|
|
||
|
|
||
|
%% Exported API
|
||
|
start(Host) ->
|
||
|
connect(Host).
|
||
|
start(Host, Port) ->
|
||
|
connect(Host, Port).
|
||
|
|
||
|
connect(Host) ->
|
||
|
connect(Host, 6379).
|
||
|
connect(Host, Port) ->
|
||
|
gen_server:start_link(?MODULE, [Host, Port], []).
|
||
|
|
||
|
ssend(Client, Cmd) -> ssend(Client, Cmd, []).
|
||
|
ssend(Client, Cmd, Args) ->
|
||
|
gen_server:cast(Client, {send, sformat([Cmd|Args]), get_parser(Cmd)}).
|
||
|
|
||
|
send(Client, Cmd) -> send(Client, Cmd, []).
|
||
|
send(Client, Cmd, Args) ->
|
||
|
gen_server:cast(Client, {send,
|
||
|
string:join([str(Cmd), format(Args)], " "), get_parser(Cmd)}).
|
||
|
|
||
|
asend(Client, Cmd) ->
|
||
|
gen_server:cast(Client, {asend, Cmd}).
|
||
|
disconnect(Client) ->
|
||
|
gen_server:call(Client, disconnect).
|
||
|
|
||
|
get_all_results(Client) ->
|
||
|
gen_server:call(Client, get_all_results).
|
||
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||
|
|
||
|
|
||
|
|
||
|
%% gen_server callbacks
|
||
|
init([Host, Port]) ->
|
||
|
process_flag(trap_exit, true),
|
||
|
ConnectOptions = [list, {active, once}, {packet, line}, {nodelay, true}],
|
||
|
case gen_tcp:connect(Host, Port, ConnectOptions) of
|
||
|
{error, Why} ->
|
||
|
{error, {socket_error, Why}};
|
||
|
{ok, Socket} ->
|
||
|
{ok, #redis{socket=Socket, parsers=queue:new()}}
|
||
|
end.
|
||
|
|
||
|
handle_call({send, Cmd, Parser}, From, State=#redis{parsers=Parsers}) ->
|
||
|
gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
|
||
|
{noreply, State#redis{reply_caller=fun(V) -> gen_server:reply(From, lists:nth(1, V)) end,
|
||
|
parsers=queue:in(Parser, Parsers), remaining=1}};
|
||
|
|
||
|
handle_call(disconnect, _From, State) ->
|
||
|
{stop, normal, ok, State};
|
||
|
handle_call(get_all_results, From, State) ->
|
||
|
case queue:is_empty(State#redis.parsers) of
|
||
|
true ->
|
||
|
% answers came earlier than we could start listening...
|
||
|
% Very unlikely but totally possible.
|
||
|
{reply, lists:reverse(State#redis.results), State#redis{results=[]}};
|
||
|
false ->
|
||
|
% We are here earlier than results came, so just make
|
||
|
% ourselves wait until stuff is ready.
|
||
|
{noreply, State#redis{reply_caller=fun(V) -> gen_server:reply(From, V) end}}
|
||
|
end;
|
||
|
handle_call(_, _From, State) -> {noreply, State}.
|
||
|
|
||
|
|
||
|
handle_cast({asend, Cmd}, State) ->
|
||
|
gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
|
||
|
{noreply, State};
|
||
|
handle_cast({send, Cmd, Parser}, State=#redis{parsers=Parsers, remaining=Remaining}) ->
|
||
|
% how we should do here: if remaining is already != 0 then we'll
|
||
|
% let handle_info take care of keeping track how many remaining things
|
||
|
% there are. If instead it's 0 we are the first call so let's just
|
||
|
% do it.
|
||
|
gen_tcp:send(State#redis.socket, [Cmd|?EOL]),
|
||
|
NewParsers = queue:in(Parser, Parsers),
|
||
|
case Remaining of
|
||
|
0 ->
|
||
|
{noreply, State#redis{remaining=1, parsers=NewParsers}};
|
||
|
_ ->
|
||
|
{noreply, State#redis{parsers=NewParsers}}
|
||
|
end;
|
||
|
handle_cast(_Msg, State) -> {noreply, State}.
|
||
|
|
||
|
|
||
|
trim2({ok, S}) ->
|
||
|
string:substr(S, 1, length(S)-2);
|
||
|
trim2(S) ->
|
||
|
trim2({ok, S}).
|
||
|
|
||
|
% This is useful to know if there are more messages still coming.
|
||
|
get_remaining(ParsersQueue) ->
|
||
|
case queue:is_empty(ParsersQueue) of
|
||
|
true -> 0;
|
||
|
false -> 1
|
||
|
end.
|
||
|
|
||
|
% This function helps with pipelining by creating a pubsub system with
|
||
|
% the caller. The caller could submit multiple requests and not listen
|
||
|
% until later when all or some of them have been answered, at that
|
||
|
% point 2 conditions can be true:
|
||
|
% 1) We still need to process more things in this response chain
|
||
|
% 2) We are finished.
|
||
|
%
|
||
|
% And these 2 are together with the following 2:
|
||
|
% 1) We called get_all_results before the end of the responses.
|
||
|
% 2) We called get_all_results after the end of the responses.
|
||
|
%
|
||
|
% If there's stuff missing in the chain we just push results, this also
|
||
|
% happens when there's nothing more to process BUT we haven't requested
|
||
|
% results yet.
|
||
|
% In case we have requested results: if requests are not yet ready we
|
||
|
% just push them, otherwise we finally answer all of them.
|
||
|
save_or_reply(Result, State=#redis{results=Results, reply_caller=ReplyCaller, parsers=Parsers}) ->
|
||
|
case get_remaining(Parsers) of
|
||
|
1 ->
|
||
|
State#redis{results=[Result|Results], remaining=1, pstate=empty, buffer=[]};
|
||
|
0 ->
|
||
|
% We don't reverse results here because if all the requests
|
||
|
% come in and then we submit another one, if we reverse
|
||
|
% they will be scrambled in the results field of the record.
|
||
|
% instead if we wait just before we reply they will be
|
||
|
% in the right order.
|
||
|
FullResults = [Result|Results],
|
||
|
NewState = case ReplyCaller of
|
||
|
undefined ->
|
||
|
State#redis{results=FullResults};
|
||
|
_ ->
|
||
|
ReplyCaller(lists:reverse(FullResults)),
|
||
|
State#redis{results=[]}
|
||
|
end,
|
||
|
NewState#redis{remaining=0, pstate=empty,
|
||
|
reply_caller=undefined, buffer=[],
|
||
|
parsers=Parsers}
|
||
|
end.
|
||
|
|
||
|
handle_info({tcp, Socket, Data}, State) ->
|
||
|
{{value, Parser}, NewParsers} = queue:out(State#redis.parsers),
|
||
|
Trimmed = trim2(Data),
|
||
|
NewState = case {State#redis.remaining-1, Parser(State#redis.pstate, Trimmed)} of
|
||
|
% This line contained an error code. Next line will hold
|
||
|
% The error message that we will parse.
|
||
|
{0, error} ->
|
||
|
% reinsert the parser in the front, next step is still gonna be needed
|
||
|
State#redis{remaining=1, pstate=error,
|
||
|
parsers=queue:in_r(Parser, NewParsers)};
|
||
|
|
||
|
% The stateful parser just started and tells us the number
|
||
|
% of results that we will have to parse for those calls
|
||
|
% where more than one result is expected. The next
|
||
|
% line will start with the first item to read.
|
||
|
{0, {hold, Remaining}} ->
|
||
|
% Reset the remaining value to the number of results
|
||
|
% that we need to parse.
|
||
|
% and reinsert the parser in the front, next step is still gonna be needed
|
||
|
State#redis{remaining=Remaining, pstate=read,
|
||
|
parsers=queue:in_r(Parser, NewParsers)};
|
||
|
|
||
|
% We either had only one thing to read or we are at the
|
||
|
% end of the stuff that we need to read. either way
|
||
|
% just pack up the buffer and send.
|
||
|
{0, {read, NBytes}} ->
|
||
|
inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
|
||
|
CurrentValue = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n
|
||
|
inet:setopts(Socket, [{packet, line}]), % go back to line mode
|
||
|
OldBuffer = State#redis.buffer,
|
||
|
case OldBuffer of
|
||
|
[] ->
|
||
|
save_or_reply(CurrentValue, State#redis{parsers=NewParsers});
|
||
|
_ ->
|
||
|
save_or_reply(lists:reverse([CurrentValue|OldBuffer]), State#redis{parsers=NewParsers})
|
||
|
end;
|
||
|
|
||
|
|
||
|
% The stateful parser tells us to read some bytes
|
||
|
{N, {read, NBytes}} ->
|
||
|
inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
|
||
|
CurrentValue = trim2(gen_tcp:recv(Socket, NBytes+2)), % also consume the \r\n
|
||
|
inet:setopts(Socket, [{packet, line}]), % go back to line mode
|
||
|
OldBuffer = State#redis.buffer,
|
||
|
State#redis{remaining=N, buffer=[CurrentValue|OldBuffer],
|
||
|
pstate=read, parsers=queue:in_r(Parser, NewParsers)};
|
||
|
|
||
|
|
||
|
% Simple return values contained in a single line
|
||
|
{0, Value} ->
|
||
|
save_or_reply(Value, State#redis{parsers=NewParsers})
|
||
|
|
||
|
end,
|
||
|
inet:setopts(Socket, [{active, once}]),
|
||
|
{noreply, NewState};
|
||
|
handle_info(_Info, State) -> {noreply, State}.
|
||
|
|
||
|
|
||
|
terminate(_Reason, State) ->
|
||
|
case State#redis.socket of
|
||
|
undefined ->
|
||
|
pass;
|
||
|
Socket ->
|
||
|
gen_tcp:close(Socket)
|
||
|
end,
|
||
|
ok.
|
||
|
|
||
|
|
||
|
code_change(_OldVsn, State, _Extra) -> {ok, State}.
|
||
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||
|
|
||
|
|