%% @copyright 2007 Hynek Vychodil %% @author Hynek Vychodil %% [http://pichis_blog.blogspot.com/] %% @version 0.0.1 %% @end %% ===================================================================== %% @doc Concurrent new line terminated blocks reader %% %% nlt_reader provides swarm of block splitters and catenators which allows %% fast reading of new line terminated blocks. -module(nlt_reader). -export([split_on_nl/1, worker/2]). -export([close/1, open/1, open/2, open/3, read/1]). -export([test_read/3]). -define(BestSize, 1024 * 512). -define(BestProc, erlang:system_info(schedulers)*2). % how many second have to wait for response -define(TIMEOUT, 60). % @spec (FileName::list(), ChunkSize::integer(), Processes::integer()) % -> {ok, nlt_reader()} % @type nlt_reader() = {nlt_reader, pid()} % @doc Open new line terminated blocks reader open(FileName, ChunkSize, N) -> Open = fun () -> chunk_reader:open(FileName, ChunkSize) end, Read = fun (CR) -> chunk_reader:read_seq(CR) end, Close = fun (CR) -> chunk_reader:close(CR) end, open({Open, Read, Close}, N). % @spec (FileHandler::file_handler(), Processes::integer()) % -> {ok, nlt_reader()} % @type file_handler() = {Open::open_handler(), Read::read_handler(), % Close::close_handler()} % @type open_handler() = () -> {ok, CR::term()} % @type read_handler() = (CR::term()) -> {{ok, Data::binary()}|eof, % PreviousReader::pid()} % @type close_handler() = (CR::term()) -> ok % @doc Open new line terminated blocks reader open({Open, Read, Close} = _Handler, N) when N > 0, is_function(Open, 0), is_function(Read, 1), is_function(Close, 1) -> Master = self(), Reader = fun () -> erlang:monitor(process, Master), {ok, CR} = Open(), M = self(), Read_1 = fun () -> Read(CR) end, Worker = fun () -> worker(Read_1, M) end, % one more is waiting, then max is N+1 loop(Worker, sets:new(), [], 0, N + 1), ok = Close(CR) end, {ok, {nlt_reader, spawn_opt(Reader, [link])}}; open(FileName, ChunkSize) -> open(FileName, ChunkSize, ?BestProc). open({Open, Read, Close} = Handler) when is_function(Open, 0), is_function(Read, 1), is_function(Close, 1) -> open(Handler, ?BestProc); open(FileName) -> open(FileName, ?BestSize). % @spec (CR::chunk_reader()) -> ok | {error, invalid} % @doc Closes chunk reader. close({nlt_reader, Pid}) when is_pid(Pid) -> case is_process_alive(Pid) of true -> Pid ! close, ok; false -> {error, invalid} end. % @spec (CR::chunk_reader()) -> read_result() | read_error() % @type read_result() = eof | {ok, binary()} % @type read_error() = {error, invalid | closed} % @doc Reads one block from chunk reader. read({nlt_reader, Pid}) when is_pid(Pid) -> case is_process_alive(Pid) of true -> Pid ! {read, self()}, wait_response(Pid, 0); false -> {error, invalid} end. wait_response(Pid, N) when N < (?TIMEOUT) -> receive {ok, _} = Msg -> Msg; eof -> eof after 1000 -> % take it long? case is_process_alive(Pid) of true -> wait_response(Pid, N + 1); false -> {error, closed} end end; wait_response(_, _) -> {error, timeout}. % @spec (FileName::list(), ChunkSize::integer(), Processes::integer()) -> ok % @doc Tests read whole `FileName' file by `nlt_reader'. test_read(FileName, ChunkSize, N) -> {ok, File} = open(FileName, ChunkSize, N), eof = test_read_loop(File, read(File)), close(File). test_read_loop(File, {ok, _}) -> test_read_loop(File, read(File)); test_read_loop(_, eof) -> eof. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Dispatcher %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% loop(Worker, Running, Blocks, N, Max) when N < Max -> New = spawn_opt(Worker, [link]), loop(Worker, sets:add_element(New, Running), Blocks, N + 1, Max); loop(Worker, Running, [_ | _] = Blocks, N, N) -> receive {done, Pid, B} -> Running1 = sets:del_element(Pid, Running), loop(Worker, Running1, [B | Blocks], N, N); {empty, Pid} -> Running1 = sets:del_element(Pid, Running), loop(Worker, Running1, Blocks, N - 1, N); {eof, Pid} -> Running1 = sets:del_element(Pid, Running), wait(Running1, Blocks); {read, From} -> [H | T] = Blocks, From ! {ok, H}, loop(Worker, Running, T, N - 1, N); close -> closing(Running); {'DOWN', _, _, _} -> closing(Running) end; loop(Worker, Running, [], N, N) -> receive {done, Pid, B} -> Running1 = sets:del_element(Pid, Running), loop(Worker, Running1, [B], N, N); {empty, Pid} -> Running1 = sets:del_element(Pid, Running), loop(Worker, Running1, [], N - 1, N); {eof, Pid} -> Running1 = sets:del_element(Pid, Running), wait(Running1, []); close -> closing(Running); {'DOWN', _, _, _} -> closing(Running) end. wait(Running, Blocks) -> case sets:size(Running) of N when N > 0 -> wait1(Running, Blocks); 0 -> eof(Blocks) end. wait1(Running, [_ | _] = Blocks) -> receive {done, Pid, B} -> Running1 = sets:del_element(Pid, Running), wait(Running1, [B | Blocks]); {empty, Pid} -> Running1 = sets:del_element(Pid, Running), wait(Running1, Blocks); {eof, Pid} -> Running1 = sets:del_element(Pid, Running), wait(Running1, Blocks); {read, From} -> [H | T] = Blocks, From ! {ok, H}, wait(Running, T); close -> closing(Running); {'DOWN', _, _, _} -> closing(Running) end; wait1(Running, []) -> receive {done, Pid, B} -> Running1 = sets:del_element(Pid, Running), wait(Running1, [B]); {empty, Pid} -> Running1 = sets:del_element(Pid, Running), wait(Running1, []); {eof, Pid} -> Running1 = sets:del_element(Pid, Running), wait(Running1, []); close -> closing(Running); {'DOWN', _, _, _} -> closing(Running) end. eof([H | T]) -> receive {read, From} -> From ! {ok, H}, eof(T); close -> ok; {'DOWN', _, _, _} -> ok end; eof([]) -> receive {read, From} -> From ! eof, eof([]); close -> ok; {'DOWN', _, _, _} -> ok end. closing(Running) -> Killer = fun (E, _) -> unlink(E), exit(E, kill) end, sets:fold(Killer, true, Running). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Splitter - Catenator %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % reader should be fun() -> chunk_reader:read_seq(CR) end worker(Reader, M) -> split(Reader(), M). split({{ok, B}, first}, M) -> M ! {done, self(), join_next(B)}; split({eof, first}, M) -> M ! {eof, self()}; split({{ok, B}, PrevPid}, M) -> case split_on_nl(B) of N when is_integer(N) -> <> = B, PrevPid ! {next_part, Line}, M ! {done, self(), join_next(Rest)}; none -> % do nothing yourself, send it M ! {empty, self()}, % avoid death lock PrevPid ! {next_part, join_next(B)} end; split({eof, PrevPid}, M) -> M ! {eof, self()}, PrevPid ! {next_part, eof}. join_next(Data) -> receive {next_part, Next} when is_binary(Next) -> <>; {next_part, eof} -> Data end. %splitter split_on_nl(B) -> split_on_nl(B, 0, size(B)). split_on_nl(B, N, S) when N < S -> case B of <<_:N/binary, $\n, _/binary>> -> N; _ -> split_on_nl(B, N + 1, S) end; split_on_nl(_, _, _) -> none.