Wednesday, 20 June 2007

Collaborative Filtering: Weighted Slope One in Erlang (v2)

Okay, so my initial Weighted Slope One Erlang translation wasn't very Erlang-ish... not a single spawn in sight and side-effects galore. Yuck.

I've ripped out ETS and replaced it with dictionaries, and modified the build_matrix loops to palm off the real work to spawned processes rather than do it themselves.

The main change was with the build_matric function. A long-running process is spawned for every item in the system (the 'X' item in the {x, y} difference/frequency matrix), and short-term user rating processes are spawned to send off difference and frequency information to the relevant item and wait for confirmation that the message was received. Once all of the data has been sent the item processes are asked to return their individual dictionaries to the main process, which then merges them into one big dictionary.

The item processes die naturally after they return their dictionaries, and the user rating processes only live long enough to ensure that their data is received by the relevant item process.

The only other significant changes to the program were to make the predict function use a dictionary rather than an ETS table.

The concurrent bits of this program use this idiom[*]:

process_flag(trap_exit, true),
{'EXIT', Pid, normal} -> ok;
{'EXIT', Pid, Reason} -> exit(self(), Reason)
|| Pid <- [spawn_link(fun() -> >>do something<< end)
|| >>value/s <- list of value/s<<]]

In other words: spawn a process to do something for every element in the list of values, and wait for each of those processes to signal that they have finished normally. (I used a substitution macro to avoid typing in all that boilerplate.)

An unfortunate consequence of this modification is that the code is about 50% larger than the earlier Erlang version:

%% slopeone2.erl
% Philip Robinson
% A parallel implementation of the weighted slope one
% algorithm in Erlang for item-based collaborative
% filtering.
% Based on the same algorithm in Java by Daniel Lemire and Marco Ponzi:


-define(SPAWN_AND_WAIT(Block, Data),
process_flag(trap_exit, true),
{'EXIT', Pid, normal} -> ok;
{'EXIT', Pid, Reason} -> exit(self(), Reason)
|| Pid <- [spawn_link(fun() -> Block end)
|| Data]]).

start() ->
% The rating database: A list of users, each containing a list of {item, rating} elements.
Items = [{item1,"candy"}, {item2,"dog"}, {item3,"cat"}, {item4,"war"}, {item5,"strange food"}],
DataRating = [{user1, "Bob", [{item1,1.0}, {item2,0.5}, {item4,0.1}]},
{user2, "Jane", [{item1,1.0}, {item3,0.5}, {item4,0.2}]},
{user3, "Jo", [{item1,0.9}, {item2,0.4}, {item3,0.5}, {item4,0.1}]},
{user4, "StrangeJo", [{item1,0.1}, {item4,1.0}, {item5,0.4}]}],
% The difference & frequency database: a dictionary of {{item X, itemY}, {diff, freq}}.
% Create the predictor engine.
DiffFreq = build_matrix(Items, DataRating),
io:format("Here's the data I have accumulated...~n"),
print_data(Items, DataRating, DiffFreq),
io:format("Ok, now we predict...~n"),
TestRatings1 = [{item5,0.4}],
print_user_ratings(Items, TestRatings1),
print_user_ratings(Items, predict(Items, TestRatings1, DiffFreq)),
TestRatings2 = [{item4,0.2}|TestRatings1],
print_user_ratings(Items, TestRatings2),
print_user_ratings(Items, predict(Items, TestRatings2, DiffFreq)),

% Based on existing data, and using weights, try to predict all missing ratings.
% The trick to make this more scalable is to consider only difference entries
% having a large (> 1) frequency entry.
% Precondition: ItemRatings is sorted as per lists:sort/1 (to re-merge actual ratings).
predict(Items, ItemRatings, DiffFreq) ->
% Gather the sum of the rating differences and frequencies for all available items given the known item ratings.
RatingsPredicted = lists:foldl(
fun({IdItemX, IdItemY, RatingX}, Dict) ->
case dict:find({IdItemY, IdItemX}, DiffFreq) of
error -> Dict;
{ok, {Diff, DFreq}} ->
fun({Pred, PFreq}) -> {Pred + ((RatingX + Diff) * DFreq), PFreq + DFreq} end,
{(RatingX + Diff) * DFreq, DFreq},
[{IdItemX, IdItemY, RatingX}
|| {IdItemX, RatingX} <- ItemRatings,
{IdItemY, _} <- Items]),
% Put the original (actual) ratings back into our prediction list.
RatingsPredictedAndActual = lists:foldl(
fun({Item,Rating}, Ratings) -> dict:store(Item, {Rating, 1}, Ratings) end,
RatingsPredicted, ItemRatings),
% Divide the total rating difference by the frequency and return as a list.
[{Item, Rating / Freq} || {Item, {Rating, Freq}} <- dict:to_list(RatingsPredictedAndActual)].

print_data(Items, DataRating, DiffFreq) ->
[begin io:format("~s~n", [Name]), print_user_ratings(Items, ItemRatings) end
|| {_Id, Name, ItemRatings} <- DataRating],
[print_item_diffs(Items, Item, DiffFreq) || Item <- Items],

print_user_ratings(Items, ItemRatings) ->
[begin {value, {Item, NameItem}} = lists:keysearch(Item, 1, Items),
io:format(" ~12s --> ~4.2f~n", [NameItem, Rating]) end
|| {Item, Rating} <- lists:sort(ItemRatings)].

print_item_diffs(Items, {ItemX, Name}, DiffFreq) ->
io:format("~n~12s:", [Name]),
[case dict:find({ItemX, ItemY}, DiffFreq) of
error -> io:format(" ");
{ok, {Diff, Freq}} -> io:format(" ~6.3f ~1B", [Diff, Freq])
end || {ItemY, _} <- Items].

% Long-running itemX process to manage a dictionary of {{itemX, itemY}, {Diff, Freq}} entries.
dict_itemX(IdItemX, DictDiffFreq) ->
{data, PidSender, IdItemY, DiffNew} ->
PidSender ! {done, self(), IdItemY},
dict:update({IdItemX, IdItemY},
fun({DiffOld, FreqOld}) -> {DiffOld + DiffNew, FreqOld + 1} end,
{DiffNew, 1}, DictDiffFreq));
{results, PidParent} -> PidParent ! {results, self(), DictDiffFreq}

% Build a matrix (dictionary) of {{itemX, itemY}, Difference, Frequency} entries.
build_matrix(Items, DataRating) ->
PidThis = self(),
% Create a bunch of long-running processes to manage itemX data.
PidItems = dict:from_list(
[{IdItemX, spawn(fun() -> dict_itemX(IdItemX, dict:new()) end)} || {IdItemX, _NameItem} <- Items]),
% Spawn a short-term process for each user's ratings and wait until they are all done.
?SPAWN_AND_WAIT(process_user_ratings(PidItems, Ratings), {_, _, Ratings} <- DataRating),
% Retrieve and merge all the item process dictionaries, then divide all differences by their frequency.
fun(_Key, {Diff, Freq}) -> {Diff / Freq, Freq} end,
fun(_IdItemX, PidItemX, DictIn) ->
PidItemX ! {results, PidThis},
{results, PidItemX, DiffFreq} ->
dict:merge(fun(_, _, _) -> io:format("Key collision~n") end, DictIn, DiffFreq)
dict:new(), PidItems)).

process_user_ratings(PidItems, Ratings) ->
% Spawn a short-term process for each itemX rating and wait until they are all done.
?SPAWN_AND_WAIT(process_user_itemX_ratings(dict:fetch(IdItemX, PidItems), RatingX, Ratings),
{IdItemX, RatingX} <- Ratings).

process_user_itemX_ratings(PidItemX, RatingX, Ratings) ->
% Spawn a process for each itemY rating that sends a message to the long-running itemX process,
% waits for confirmation that its message has been processed, then signals that it is done.
PidItemX ! {data, self(), IdItemY, RatingX - RatingY},
receive {done, PidItemX, IdItemY} -> ok end
end, {IdItemY, RatingY} <- Ratings).

[*] If trapping exits is not your cup of tea then you could use plain old spawn like this:

PidThis = self(),
[receive {done, Pid} -> ok end
|| Pid <- [spawn(fun() ->
>>do something<<,
PidThis ! {done, self()}
|| >>value/s <- list of value/s<<]]

I had used this code until I thought of using spawn_link. I am not sure which version would be considered better by Erlang Gurus, but my personal preference leans towards spawn_link. Spawn_link seems easier to extend to handle multiple Erlang nodes, and if I was concerned about setting my main process to trap all exits then I could simply spawn a single process to manage all of the other spawn_linking.

No comments:

Post a Comment

Obligatory legal stuff

Unless otherwise noted, all code appearing on this blog is released into the public domain and provided "as-is", without any warranty of any kind, express or implied, including but not limited to the warranties of merchantability, fitness for a particular purpose and noninfringement. In no event shall the author(s) be liable for any claim, damages, or other liability, whether in an action of contract, tort or otherwise, arising from, out of or in connection with the software or the use or other dealings in the software.