Thursday 28 June 2007

Erlang Binary Map


(Updated to clean up some hypertext errors in the code examples.)

The Netflix Challenge dataset is very big, and there is not much RAM in my laptop at all.

To fit as much as possible of the rating data into memory, I converted the data into a bunch of Erlang binaries. Erlang binaries are 'just' contiguous blocks of bytes, so they do not incur the overhead of list cons cells or dictionary indexing. They are fast, but pretty simple, and using them to store data like C arrays means that you have to write your own code to manage access to them. (At least you will get a runtime error if you try to access past the end of a binary, which is a major step up from C.)


While wandering through the wasteland that is my code I happened to notice that one section did not smell quite right. This code was taking a chunk of that binary data, converting the chunk into a list of elements, and then mapping a function over this list of elements in order to return a list of function results. Creating that intermediate list seemed like a bit of unnecessary overhead - I would much rather iterate directly across the binary itself and save all of that extra list consing - but I could not find any binary:map-like function anywhere.

So I wrote my own.

It's not very big.


The original code had used a list_from_bin function to turn a binary into a list of elements:

% Convert a binary into a list of unsigned integer elements.
list_from_bin(Bin, BytesPerElem) ->
list_from_bin(Bin, BytesPerElem, BytesPerElem * 8, size(Bin), []).

list_from_bin(_Bin, _BytesPerElem, _BitsPerElem, 0, Results) -> Results;
list_from_bin(Bin, BytesPerElem, BitsPerElem, BytesOffset, Results) ->
BytesDiscard = BytesOffset - BytesPerElem,
<<_:BytesDiscard/binary,Element:BitsPerElem/unsigned-integer,_/binary>> = Bin,
list_from_bin(Bin, BytesPerElem, BitsPerElem, BytesDiscard, [Element|Results]).


And list_from_bin was used in this manner (with a trivial "2 * Elem" function):

[2 * N || N <- list_from_bin(<<1,2,3,4>>, 1)].
-> [2,4,6,8]

[2 * N || N <- list_from_bin(<<1,2,3,4>>, 2)].
-> [516,1544]

Note that list_from_bin iterates from the end of the binary backwards down to the beginning of the binary, so the list it builds is in the same order as the original binary and does not need reversing.

(If all of my elements were one byte long then I could have just used the standard Erlang binary_to_list/1 function, but sometimes the elements I used were two or three bytes in length. I should have probably included an initial clause of "list_from_bin(Bin, 1) -> list_from_binary(Bin);", but didn't think of it at the time.)


The new map_bin function maps a function over the elements in a binary, and returns a list of the function's results:

map_bin(Fun, Bin, BytesPerElem) ->
map_bin(Fun, Bin, BytesPerElem, 0, size(Bin) div BytesPerElem, []).

map_bin(_Fun, _Bin, _BytesPerElem, _BytesDiscard, 0, Results) -> lists:reverse(Results);
map_bin(Fun, Bin, BytesPerElem, BytesDiscard, CountElemRemain, Results) ->
<<_:BytesDiscard/binary,Elem:BytesPerElem/binary,_/binary>> = Bin,
map_bin(Fun, Bin, BytesPerElem, BytesDiscard + BytesPerElem, CountElemRemain - 1, [Fun(Elem)|Results]).


The main function (map_bin/3) takes these arguments:
  • Fun: A function that takes a single binary element as input. May return any value.
  • Bin: The original binary data block. Its size should be an exact multiple of BytesPerElem. If the size of the binary is not an exact multiple of the BytesPerElem value then any excess bytes at the end of the binary are discarded.
  • BytesPerElem: The number of bytes taken up by each element in the binary.

The helper function (map_bin/6) takes three additional arguments:
  • BytesDiscard: The number of bytes to skip at the beginning of the binary, for the current iteration.
  • CountElemRemain: The number of elements remaining to process.
  • Results: An accumulated, reversed list of the function's results.

The BytesDiscard argument was added to avoid having to recalculate the number of bytes to skip for every iteration (with, for example, something like "BytesDiscard = Offset * BytesPerElem"). I am not sure if this was a good decision or if it reeks too much of premature optimisation. Old C habits die hard.

CountElemRemain starts at the number of elements to process and decrements each iteration so the terminating condition can be written simply as 0, rather than having to have a "when Offset > CountElems" guard on the function clause.

And here is map_bin in action:

map_bin(fun(<< Elem:8>>) -> 2 * Elem end, <<1,2,3,4>>, 1).
-> [2,4,6,8]

map_bin(fun(<< Elem:16>>) -> 2 * Elem end, <<1,2,3,4>>, 2).
-> [516,1544]


Now with the new map_bin function my code can skip the creation of an intermediate list, and, quite entirely by accident, is actually more flexible than before. The original code always produced lists of unsigned integers from the binaries; the new code can be used to operate on multiple elements. For example:

map_bin(fun(<< Elem1:8,Elem2:16>>) -> Elem1 + Elem2 end, <<1,2,3,4,5,6>>, 3).
-> [516,1290]

It's just not quite as nice to look at as a good list comprehension.


Performance

The map_bin function is all well and good, but the real question we all want answered is... does using this new code actually make our program run any faster?

Well, according to my very informal use of now/0 and timer:now_diff/2, with large binaries and a trivial "x2" function for each element, map_bin seems to be around 11% faster than using list_from_bin. That's... not too bad. But we could go faster with multiple processes, I think.


Parallelism

Really, what is the point of using Erlang if we don't spawn a few hundred processes for every trivial piece of code?

Luckily for my fingers it is only a minor modification to make a parallel version of map_bin:

pmap_bin(Fun, Bin, BytesPerElem) ->
pmap_bin(Fun, Bin, BytesPerElem, 0, size(Bin) div BytesPerElem, []).

pmap_bin(_Fun, _Bin, _BytesPerElem, _BytesDiscard, 0, Pids) ->
[receive {done, Pid, Result} -> Result end || Pid <- lists:reverse(Pids)];
pmap_bin(Fun, Bin, BytesPerElem, BytesDiscard, CountElemRemain, Pids) ->
PidThis = self(),
<<_:BytesDiscard/binary,Elem:BytesPerElem/binary,_/binary>> = Bin,
pmap_bin(Fun, Bin, BytesPerElem, BytesDiscard + BytesPerElem, CountElemRemain - 1,
[spawn(fun() -> PidThis ! {done, self(), Fun(Elem)} end)|Pids]).


Sadly, I cannot comment much on the speed difference of this conversion because I do not (yet) have access to a multi-core machine. A routine like this is probably best avoided for a single-CPU system as the overhead of spawning many processes would be wasted, but it should perform better on multiple-CPU systems. (Feel free to try it out and let me know how it goes!)


There is still room for improvement in this function, if we wanted to take it further. We may not want to spawn a separate process for each element, for instance, but rather split the original binary into N chunks and spawn a process to apply a function to each chunk. Also, we might want to expand on the parallelism and include other nodes into the mix, to spread the calculations across multiple machines.

Something for another day.


And now someone is going to tell me that binary comprehensions have been available in the standard Erlang distribution for a while, and I just haven't been paying enough attention to the Erlang mailing-list announcements.

Friday 22 June 2007

Erlang Macro Oddness


I found a little oddity with Erlang macros while I was writing version 2 of the Weighted Slope One algorithm. It seems that passing a multi-statement anonymous function as a parameter into a macro confuses the compiler.

For example, this code works:

-module(macro_oddness).
-export([start/0]).
-define(A_MACRO(FunAnon), apply(FunAnon, [])).

start() ->
?A_MACRO(
fun() ->
io:format("Single-element function works fine.~n")
end).


But this code produces a compile-time error:
-module(macro_oddness).
-export([start/0]).
-define(A_MACRO(FunAnon), apply(FunAnon, [])).

start() -> ?A_MACRO(
fun() ->
io:format("Multiple-element function "),
io:format("does not compile.~n")
end).


An "argument mismatch for macro ''A_MACRO''" error, to be precise.

Interestingly, multiple statements in a begin..end block seem to be okay:

-module(macro_oddness).
-export([start/0]).
-define(A_MACRO(FunAnon), apply(FunAnon, [])).

start() -> ?A_MACRO(
fun() ->
begin
io:format("Multiple-element function "),
io:format("with a begin..end block is okay.~n")
end
end).


Something to keep an eye out for.

Wednesday 20 June 2007

Collaborative Filtering: Weighted Slope One in Erlang (v2)


Okay, so my initial Weighted Slope One Erlang translation wasn't very Erlang-ish... not a single spawn in sight and side-effects galore. Yuck.

I've ripped out ETS and replaced it with dictionaries, and modified the build_matrix loops to palm off the real work to spawned processes rather than do it themselves.

The main change was with the build_matric function. A long-running process is spawned for every item in the system (the 'X' item in the {x, y} difference/frequency matrix), and short-term user rating processes are spawned to send off difference and frequency information to the relevant item and wait for confirmation that the message was received. Once all of the data has been sent the item processes are asked to return their individual dictionaries to the main process, which then merges them into one big dictionary.

The item processes die naturally after they return their dictionaries, and the user rating processes only live long enough to ensure that their data is received by the relevant item process.

The only other significant changes to the program were to make the predict function use a dictionary rather than an ETS table.


The concurrent bits of this program use this idiom[*]:

process_flag(trap_exit, true),
[receive
{'EXIT', Pid, normal} -> ok;
{'EXIT', Pid, Reason} -> exit(self(), Reason)
end
|| Pid <- [spawn_link(fun() -> >>do something<< end)
|| >>value/s <- list of value/s<<]]


In other words: spawn a process to do something for every element in the list of values, and wait for each of those processes to signal that they have finished normally. (I used a substitution macro to avoid typing in all that boilerplate.)


An unfortunate consequence of this modification is that the code is about 50% larger than the earlier Erlang version:

%% slopeone2.erl
% Philip Robinson
% A parallel implementation of the weighted slope one
% algorithm in Erlang for item-based collaborative
% filtering.
% Based on the same algorithm in Java by Daniel Lemire and Marco Ponzi:
% http://www.daniel-lemire.com/fr/documents/publications/SlopeOne.java

-module(slopeone2).
-export([start/0]).

-define(SPAWN_AND_WAIT(Block, Data),
process_flag(trap_exit, true),
[receive
{'EXIT', Pid, normal} -> ok;
{'EXIT', Pid, Reason} -> exit(self(), Reason)
end
|| Pid <- [spawn_link(fun() -> Block end)
|| Data]]).

start() ->
% The rating database: A list of users, each containing a list of {item, rating} elements.
Items = [{item1,"candy"}, {item2,"dog"}, {item3,"cat"}, {item4,"war"}, {item5,"strange food"}],
DataRating = [{user1, "Bob", [{item1,1.0}, {item2,0.5}, {item4,0.1}]},
{user2, "Jane", [{item1,1.0}, {item3,0.5}, {item4,0.2}]},
{user3, "Jo", [{item1,0.9}, {item2,0.4}, {item3,0.5}, {item4,0.1}]},
{user4, "StrangeJo", [{item1,0.1}, {item4,1.0}, {item5,0.4}]}],
% The difference & frequency database: a dictionary of {{item X, itemY}, {diff, freq}}.
% Create the predictor engine.
DiffFreq = build_matrix(Items, DataRating),
io:format("Here's the data I have accumulated...~n"),
print_data(Items, DataRating, DiffFreq),
io:format("Ok, now we predict...~n"),
TestRatings1 = [{item5,0.4}],
io:format("Inputting...~n"),
print_user_ratings(Items, TestRatings1),
io:format("Getting...~n"),
print_user_ratings(Items, predict(Items, TestRatings1, DiffFreq)),
TestRatings2 = [{item4,0.2}|TestRatings1],
io:format("Inputting...~n"),
print_user_ratings(Items, TestRatings2),
io:format("Getting...~n"),
print_user_ratings(Items, predict(Items, TestRatings2, DiffFreq)),
ok.

% Based on existing data, and using weights, try to predict all missing ratings.
% The trick to make this more scalable is to consider only difference entries
% having a large (> 1) frequency entry.
% Precondition: ItemRatings is sorted as per lists:sort/1 (to re-merge actual ratings).
predict(Items, ItemRatings, DiffFreq) ->
% Gather the sum of the rating differences and frequencies for all available items given the known item ratings.
RatingsPredicted = lists:foldl(
fun({IdItemX, IdItemY, RatingX}, Dict) ->
case dict:find({IdItemY, IdItemX}, DiffFreq) of
error -> Dict;
{ok, {Diff, DFreq}} ->
dict:update(IdItemY,
fun({Pred, PFreq}) -> {Pred + ((RatingX + Diff) * DFreq), PFreq + DFreq} end,
{(RatingX + Diff) * DFreq, DFreq},
Dict)
end
end,
dict:new(),
[{IdItemX, IdItemY, RatingX}
|| {IdItemX, RatingX} <- ItemRatings,
{IdItemY, _} <- Items]),
% Put the original (actual) ratings back into our prediction list.
RatingsPredictedAndActual = lists:foldl(
fun({Item,Rating}, Ratings) -> dict:store(Item, {Rating, 1}, Ratings) end,
RatingsPredicted, ItemRatings),
% Divide the total rating difference by the frequency and return as a list.
[{Item, Rating / Freq} || {Item, {Rating, Freq}} <- dict:to_list(RatingsPredictedAndActual)].


print_data(Items, DataRating, DiffFreq) ->
[begin io:format("~s~n", [Name]), print_user_ratings(Items, ItemRatings) end
|| {_Id, Name, ItemRatings} <- DataRating],
[print_item_diffs(Items, Item, DiffFreq) || Item <- Items],
io:format("~n").

print_user_ratings(Items, ItemRatings) ->
[begin {value, {Item, NameItem}} = lists:keysearch(Item, 1, Items),
io:format(" ~12s --> ~4.2f~n", [NameItem, Rating]) end
|| {Item, Rating} <- lists:sort(ItemRatings)].

print_item_diffs(Items, {ItemX, Name}, DiffFreq) ->
io:format("~n~12s:", [Name]),
[case dict:find({ItemX, ItemY}, DiffFreq) of
error -> io:format(" ");
{ok, {Diff, Freq}} -> io:format(" ~6.3f ~1B", [Diff, Freq])
end || {ItemY, _} <- Items].


% Long-running itemX process to manage a dictionary of {{itemX, itemY}, {Diff, Freq}} entries.
dict_itemX(IdItemX, DictDiffFreq) ->
receive
{data, PidSender, IdItemY, DiffNew} ->
PidSender ! {done, self(), IdItemY},
dict_itemX(IdItemX,
dict:update({IdItemX, IdItemY},
fun({DiffOld, FreqOld}) -> {DiffOld + DiffNew, FreqOld + 1} end,
{DiffNew, 1}, DictDiffFreq));
{results, PidParent} -> PidParent ! {results, self(), DictDiffFreq}
end.

% Build a matrix (dictionary) of {{itemX, itemY}, Difference, Frequency} entries.
build_matrix(Items, DataRating) ->
PidThis = self(),
% Create a bunch of long-running processes to manage itemX data.
PidItems = dict:from_list(
[{IdItemX, spawn(fun() -> dict_itemX(IdItemX, dict:new()) end)} || {IdItemX, _NameItem} <- Items]),
% Spawn a short-term process for each user's ratings and wait until they are all done.
?SPAWN_AND_WAIT(process_user_ratings(PidItems, Ratings), {_, _, Ratings} <- DataRating),
% Retrieve and merge all the item process dictionaries, then divide all differences by their frequency.
dict:map(
fun(_Key, {Diff, Freq}) -> {Diff / Freq, Freq} end,
dict:fold(
fun(_IdItemX, PidItemX, DictIn) ->
PidItemX ! {results, PidThis},
receive
{results, PidItemX, DiffFreq} ->
dict:merge(fun(_, _, _) -> io:format("Key collision~n") end, DictIn, DiffFreq)
end
end,
dict:new(), PidItems)).

process_user_ratings(PidItems, Ratings) ->
% Spawn a short-term process for each itemX rating and wait until they are all done.
?SPAWN_AND_WAIT(process_user_itemX_ratings(dict:fetch(IdItemX, PidItems), RatingX, Ratings),
{IdItemX, RatingX} <- Ratings).

process_user_itemX_ratings(PidItemX, RatingX, Ratings) ->
% Spawn a process for each itemY rating that sends a message to the long-running itemX process,
% waits for confirmation that its message has been processed, then signals that it is done.
?SPAWN_AND_WAIT(begin
PidItemX ! {data, self(), IdItemY, RatingX - RatingY},
receive {done, PidItemX, IdItemY} -> ok end
end, {IdItemY, RatingY} <- Ratings).




[*] If trapping exits is not your cup of tea then you could use plain old spawn like this:

PidThis = self(),
[receive {done, Pid} -> ok end
|| Pid <- [spawn(fun() ->
>>do something<<,
PidThis ! {done, self()}
end)
|| >>value/s <- list of value/s<<]]


I had used this code until I thought of using spawn_link. I am not sure which version would be considered better by Erlang Gurus, but my personal preference leans towards spawn_link. Spawn_link seems easier to extend to handle multiple Erlang nodes, and if I was concerned about setting my main process to trap all exits then I could simply spawn a single process to manage all of the other spawn_linking.

Tuesday 19 June 2007

Collaborative Filtering: Weighted Slope One in Erlang


I have been toying with the Netflix Challenge for a while now. It's fascinating stuff.

I knew nothing about collaborative filtering when I started this project, but that it pretty normal for my coding hobbies. (Hey, why would I start something new if I already knew how to do it?)

During my standard "collect and absorb everything" phase I ran across an article on Wikipedia that described the Slope One algorithm. This article had links to various implementations of the algorithm, including a standalone Java program by Daniel Lemire. More information on the Weighted Slope One algorithm may be found on Daniel's site.

I am not using Slope One in my current Netflix algorithm attempt, but I translated Daniel's Java code into Erlang as a learning exercise anyway:

%% slopeone.erl
% Philip Robinson
% A simple implementation of the weighted slope one
% algorithm in Erlang for item-based collaborative
% filtering.
% Based on the same algorithm in Java by Daniel Lemire and Marco Ponzi:
% http://www.daniel-lemire.com/fr/documents/publications/SlopeOne.java

-module(slopeone).
-export([start/0]).

start() ->
% The rating database: A list of users, each containing a list of {item, rating} elements.
Items = [{item1,"candy"}, {item2,"dog"}, {item3,"cat"}, {item4,"war"}, {item5,"strange food"}],
DataRating = [{user1, "Bob", [{item1,1.0}, {item2,0.5}, {item4,0.1}]},
{user2, "Jane", [{item1,1.0}, {item3,0.5}, {item4,0.2}]},
{user3, "Jo", [{item1,0.9}, {item2,0.4}, {item3,0.5}, {item4,0.1}]},
{user4, "StrangeJo", [{item1,0.1}, {item4,1.0}, {item5,0.4}]}],
% The difference & frequency database: an ETS table of {{item X, itemY}, diff, freq}.
ets:new(diff_freq, [private, set, named_table]),
% Create the predictor engine.
build_matrix(DataRating),
io:format("Here's the data I have accumulated...~n"),
print_data(Items, DataRating),
io:format("Ok, now we predict...~n"),
TestRatings1 = [{item5,0.4}],
io:format("Inputting...~n"),
print_user_ratings(Items, TestRatings1),
io:format("Getting...~n"),
print_user_ratings(Items, predict(Items, TestRatings1)),
TestRatings2 = [{item4,0.2}|TestRatings1],
io:format("Inputting...~n"),
print_user_ratings(Items, TestRatings2),
io:format("Getting...~n"),
print_user_ratings(Items, predict(Items, TestRatings2)),
ets:delete(diff_freq).

% Based on existing data, and using weights, try to predict all missing ratings.
% The trick to make this more scalable is to consider only diff_freq entries
% having a large (> 1) frequency entry.
predict(Items, ItemRatings) ->
PredFreq = ets:new(pred_freq, []),
[ets:insert(PredFreq, {Item, 0.0, 0}) || {Item, _} <- Items],
[[case {ets:match(diff_freq, {{ItemY, ItemX}, '$1', '$2'}),
ets:match(PredFreq, {ItemY, '$1', '$2'})} of
{[], _} -> ok;
{[[Diff, DFreq]], [[Pred, PFreq]]} ->
ets:insert(PredFreq, {ItemY, Pred + ((RatingX + Diff) * DFreq), PFreq + DFreq})
end || {ItemY, _} <- Items] || {ItemX, RatingX} <- ItemRatings],
ets:match_delete(PredFreq, {'_', '_', 0}), % Remove all zero-frequency predictions.
[ets:insert(PredFreq, {Item, Rating, 1}) || {Item, Rating} <- ItemRatings], % Re-insert actual ratings.
Results = [{Item, Rating / Freq} || {Item, Rating, Freq} <- ets:tab2list(PredFreq)],
ets:delete(PredFreq),
Results.

print_data(Items, DataRating) ->
[begin io:format("~s~n", [Name]),
print_user_ratings(Items, ItemRatings) end
|| {_Id, Name, ItemRatings} <- DataRating],
[print_item_diffs(Items, Item) || Item <- Items],
io:format("~n").

print_user_ratings(Items, ItemRatings) ->
[begin {value, {Item, NameItem}} = lists:keysearch(Item, 1, Items),
io:format(" ~12s --> ~4.2f~n", [NameItem, Rating]) end
|| {Item, Rating} <- lists:sort(ItemRatings)].

print_item_diffs(Items, {Item, Name}) ->
io:format("~n~12s:", [Name]),
[case ets:match(diff_freq, {{Item, Id}, '$1', '$2'}) of
[] -> io:format(" ");
[[Diff, Freq]] -> io:format(" ~6.3f ~1B", [Diff, Freq])
end || {Id, _} <- Items].

% Build a matrix (ETS table) of {{itemX, itemY}, Difference, Frequency} entries.
build_matrix(DataRating) ->
% Gather the sum difference and the total count (frequency).
[[[case ets:lookup(diff_freq, {ItemX, ItemY}) of
[] -> ets:insert(diff_freq, {{ItemX, ItemY}, RatingX - RatingY, 1});
[{Key, Diff, Freq}] -> ets:insert(diff_freq, {Key, Diff + RatingX - RatingY, Freq + 1})
end || {ItemY, RatingY} <- ItemRatings]
|| {ItemX, RatingX} <- ItemRatings]
|| {_, _, ItemRatings} <- DataRating],
% Divide sum of difference by frequency to get mean difference.
DivideFun = fun({Key, Diff, Freq}, _) -> ets:insert(diff_freq, {Key, Diff / Freq, Freq}) end,
ets:foldl(DivideFun, undefined, diff_freq).


Some musings:

* I do not really consider this code to be a typical "Erlang-style" program. In particular, I am making substantial use of side-effects via ETS tables; comparisons between this code and the Java original will probably not be representative of comparisons between Erlang and Java programs in general.

* I may have gone a little overboard with the use of list comprehensions. I did not really like LCs when first exposed to them and it seems that I am overcompensating for that now.

* While some functions are obviously shorter in this Erlang version (compare build_matrix and buildDiffMatrix, for example), I am not convinced that they are necessarily clearer in Erlang than in Java. At least one advantage of smaller functions is that I can fit more of them on my 8.9" screen, but if that was my main concern then I would be programming in something even less verbose.

* While I haven't delved into this much, I suspect that using ETS has made this program harder to parallelise effectively. While the loops could easily be converted to use a parallel map algorithm, all CRUD activity still has to go through a single ETS process bottleneck. One possible way of utilising multiple CPUs might be to spawn a bunch of processes to manage individual items, send each of these messages regarding specific ratings, and then convert the results into a dictionary via a list comprehension of receive statements and dict:from_list/1.

* I did run the Dialyzer over the code but I have not even considered optimising it for performance. It will be slower than the Java version. :-)

Obligatory legal stuff

Unless otherwise noted, all code appearing on this blog is released into the public domain and provided "as-is", without any warranty of any kind, express or implied, including but not limited to the warranties of merchantability, fitness for a particular purpose and noninfringement. In no event shall the author(s) be liable for any claim, damages, or other liability, whether in an action of contract, tort or otherwise, arising from, out of or in connection with the software or the use or other dealings in the software.