# Background

K-Nearest Neighbour is a commonly used algorithm, but is difficult to compute for big data. Spark implements a couple of methods for getting approximate nearest neighbours using Local Sensitivity Hashing; Bucketed Random Projection for Euclidean Distance and MinHash for Jaccard Distance. The work to add these methods was done in collaboration with Uber, which you can read about here. Whereas traditional KNN algorithms find the exact nearest neighbours, these approximate methods will only find the nearest neighbours with high probability.

Local Sensitivity Hashing is a process where a lower-dimension hash is made of some data, such that data with similar hashes will be close together, with high probability. This paper provides a bit more detail on LSH and the maths behind it. One hashing algorithm is Bucketed Random Project, where a random projection is created by taking the dot product of each feature vector and a vector of random Gaussian noise. These random projections are then grouped into hash buckets that contain similar data. Bucketed Random Project can be expressed as,

$h^{x,b}(\vec{v} \ ) = \lfloor\frac{\vec{v} \cdot \vec{x} + b}{w}\rfloor$ where $$h$$ is a hashing function, $$v$$ is a feature vector, $$x$$ is a noise vector, $$b$$ is a random constant between $$0$$ and $$w$$, and $$w$$ is the bin width of the hashing bins. Larger values of $$w$$ result in fewer hash buckets. The constant $$b$$ is added to make “quantization error easier to analyze” (Slaney & Casey, 2008). This $$b$$ constant is not included in the Spark docs description of the algorithm. $$\lfloor \ \rfloor$$ denote the floor operation, meaning the hash will be an integer. This hashing process can be repeated multiple times with different noise vectors to improve the identification of similar instances. Doing this is referred to as creating multiple hash tables (see below).

To further clarify how this algorithm works, I had a look at the Scala source code for Spark. I don’t know any Scala, but I was able to clarify a few things that puzzled me. It appears that for each hash you reuse the same unit noise vector $$x$$ throughout, which seems reasonable. This means that if you have multiple hash tables (NumHashTables), each will have its own random vector.

# Bucketed Random Projection LSH locally in R

Below this Bucketed Random Projection hashing process is implemented in R. Following the Spark source code, I’m reusing the same noise vector $$x$$ throughout. Unlike the Spark implementation, I’m also including the $$b$$ constant.

library(tidyverse)

set.seed(1234)

# number of feature vectors to simulate
n = 1e04

# simulate n feature vectors of length two,
# giving a matrix with n rows and 2 columns
m <- t(replicate(n, rnorm(2, mean = 50, sd = 50), simplify = 'matrix'))

# get a random vector of noise
x <- rnorm(2)

# the width of each bin
w <- 5

# a random constant
b <- runif(1, 0, w)

# initialise a vector of hashes
hash <- vector(length = n)

# Loop over the rows of m (each of which is a vector length 2)
# multiplying by the random vector, adding the constant,
# and dividing by the bin width.
# We then take the floor of this result to get the integer hash.
for (i in 1:n) {
hash[i] <- floor((m[i, ] %*% x + b) / w)
}

# make a dataframe of the data and hashes
df <- data.frame(m,
hash = hash)

# summarise the two features by the hash
df_summary <- df %>%
group_by(hash) %>%
summarise(avg_x1 = mean(X1),
avg_x2 = mean(X2))

This process creates 132 unique hashes, ranging from -97 to 49. Now we’ve done this, we can plot how the hash and features vary together. The dashed blue line shows one feature, whereas the solid red line shows the other. We can see that feature values increase with the hash meaning that similar points will have similar hashes.

# set a theme
theme_set(theme_minimal(base_family = 'mono') +
theme(panel.grid.minor = element_blank()))

# plot
df_summary %>%
gather(x, value, avg_x1, avg_x2) %>%
ggplot(aes(hash, value, color = x, linetype = x)) +
geom_line(show.legend = FALSE) +
labs(x = 'Hash', y = 'Feature value') +
scale_color_viridis_d(option = 'plasma', end = 0.6) +
scale_linetype_manual(values = c(2, 1)) The pattern can be confirmed by plotting the two variables against each other, with the points coloured by their hash. For some noise vectors the relationship will be positive for one feature and negative for another, which is fine. You can see this by using set.seed(2018) above.

df %>%
ggplot(aes(X1, X2, color = hash)) +
coord_cartesian(xlim = c(-150, 250), ylim = c(-150, 250)) +
geom_point(alpha = .5) +
scale_color_viridis_c(option = 'plasma') +
theme(legend.position = c(0.9, 0.8)) +
labs(x = 'Feature 1', y = 'Feature 2') The identification of neighbours from this approach could be improved by doing the hashing multiple times with different noise vectors, as discussed above. This is a parameter that can be specified to the Spark function we’ll access from sparklyr.

# Bucketed Random Projection LSH with sparklyr

Below we’re using the LSH functionality in sparklyr. To begin with, the connection is being set up, before loading in the titanic dataset and copying it to Spark.

library(sparklyr)

# connect to spark
sc <- spark_connect(master = 'local')

# get and clean the titanic
df_titanic <- titanic::titanic_train %>%
# select some columns
select(Survived, Pclass, Sex, Age, SibSp, Parch, Fare) %>%
# make the col names lower case
rename_all(tolower) %>%
# turn sex to an interger
mutate(sex = ifelse(sex == 'male', 1, 0),
id = 1:nrow(.)) %>%
# remove NAs
filter_all(all_vars(!is.na(.)))

# copy to the spark connection
tbl_titanic <- copy_to(sc, df_titanic, overwrite = T)

Once the data is copied to Spark, the input columns for the hashing can be specified. The outcome column shouldn’t be included as you won’t know this for new data points. When a new data-point comes in you will hash it on the features to find similar points in the training data. Once you’ve got those similar points in the training data, you can look at the target variable to make some prediction about the new data. If the target variable is continuous you might use the average as your prediction for the new data. If it’s categorical you might just take the majority class. You could also do some weighting based on distance, such that points nearer to the new point have more influence.

input_cols <- c('pclass', 'sex', 'age', 'sibsp', 'parch', 'fare')

As well as specifying the input columns, they need to be assembled into a single vector ($$\vec{v}$$ in the formula at the top). This is done using the ft_vector_assembler() in sparklyr. You provide this with a Spark Data Frame, some column names, and a name for the outputted vector column. When the data is printed we can see that it now has a features list column, where each element is a vector. This sort of thing will be familiar to people versed in the list-column workflow popularised by purrr, which you can read about here.

tbl_titanic_va <-
ft_vector_assembler(tbl_titanic,
input_cols = input_cols,
output_col = 'features')

# Collecting first as there's an issue with
# printing Spark DFs when knitting R Markdown.
# Printing works fine interactively.
tbl_titanic_va %>%
collect()
## # A tibble: 714 x 9
##    survived pclass   sex   age sibsp parch  fare    id features
##       <int>  <int> <dbl> <dbl> <int> <int> <dbl> <int> <list>
##  1        0      3     1    22     1     0  7.25     1 <dbl >
##  2        1      1     0    38     1     0 71.3      2 <dbl >
##  3        1      3     0    26     0     0  7.92     3 <dbl >
##  4        1      1     0    35     1     0 53.1      4 <dbl >
##  5        0      3     1    35     0     0  8.05     5 <dbl >
##  6        0      1     1    54     0     0 51.9      7 <dbl >
##  7        0      3     1     2     3     1 21.1      8 <dbl >
##  8        1      3     0    27     0     2 11.1      9 <dbl >
##  9        1      2     0    14     1     0 30.1     10 <dbl >
## 10        1      3     0     4     1     1 16.7     11 <dbl >
## # … with 704 more rows

Next an object is created specifying how the LSH algorithm should work. By specifying a low bucket_length we’ll end up with lots of bins. Multiple hash tables will also be used, improving performance.

brp_lsh <-
ft_bucketed_random_projection_lsh(
sc,
input_col = 'features',
output_col = 'hash',
bucket_length = 2,
num_hash_tables = 3
)

We then fit the LSH to our data to actually get the hashes.

brp_fit <- ml_fit(brp_lsh, tbl_titanic_va)

This will take the vector of six features and turn it into a single hash value for each row. We can confirm this by supplying a tbl to the function, instead of a spark connection. This returns a tbl with a hash column.

ft_bucketed_random_projection_lsh(
tbl_titanic_va,
'features',
'hash',
bucket_length = 2) %>%
select(features, hash) %>%
collect()
## # A tibble: 714 x 2
##    features  hash
##    <list>    <list>
##  1 <dbl > <list >
##  2 <dbl > <list >
##  3 <dbl > <list >
##  4 <dbl > <list >
##  5 <dbl > <list >
##  6 <dbl > <list >
##  7 <dbl > <list >
##  8 <dbl > <list >
##  9 <dbl > <list >
## 10 <dbl > <list >
## # … with 704 more rows

Now the hashes have been created, we might want to know the neighbours for all the points in our data. This can be done using the ml_approx_similarity_join() function. We could use this to join two datasets together, but we can also do a self-join to get the similarities between rows within a dataset. A threshold is specified to determine how close the hashes for two points needs to be for them to be counted as similar. Note that for this to work there needs to be an id column in the data that uniquely identifies each row.

asj_fit <-
ml_approx_similarity_join(
brp_fit,
tbl_titanic_va,
tbl_titanic_va,
threshold = 1.5,
dist_col = 'dist_col'
)

We can then see the similarities for the first few rows.

asj_fit %>%
collect()
## # A tibble: 4,388 x 3
##     id_a  id_b dist_col
##    <int> <int>    <dbl>
##  1     1   554    1.00
##  2     1   321    1
##  3     3   730    1.41
##  4     3   704    1.43
##  5     3   147    1.42
##  6     5   848    0.154
##  7     5   759    1
##  8     5   591    0.925
##  9     5   190    1.01
## 10     9     9    0
## # … with 4,378 more rows

Let’s take a look at three of these rows to see their similarities. These rows all have the same Age, Class, Sex, and number of parents/children aboard (parch), as well as very similar fares.

df_titanic %>%
filter(id %in% c(1, 554, 321))
##   survived pclass sex age sibsp parch  fare  id
## 1        0      3   1  22     1     0 7.250   1
## 2        0      3   1  22     0     0 7.250 321
## 3        1      3   1  22     0     0 7.225 554

## Predictions for new data

The next question to ask is how to generate predictions from this approach. Suppose we have some new data on Titanic passengers and want to use similar passengers in our training data to work out whether we think they survived. To try this out, the test set provided in the titanic package is being loaded in, cleaned a bit, then copied to Spark.

tbl_titanic_test <- titanic::titanic_test %>%
# select some columns
select(Pclass, Sex, Age, SibSp, Parch, Fare) %>%
# make the col names lower case
rename_all(tolower) %>%
# turn sex to a interger
mutate(sex = ifelse(sex == 'male', 1, 0),
id = 1:nrow(.)) %>%
# remove NAs
filter_all(all_vars(!is.na(.))) %>%
# copy to the spark connection
copy_to(sc, ., overwrite = T) %>%
# create the features vector column
ft_vector_assembler(input_cols = input_cols,
output_col = 'features')

There are two ways we could generate predictions for some new data using ml_approx_similarity_join(), which give identical results. With the first method, the hashes are generated separately then fed into the function to find similar instances in the training data. Alternatively, the hashes and the join can all be done at once.

# generate the hashes first
tbl_titanic_test_similarity_join <-
sdf_predict(tbl_titanic_test, brp_fit) %>%
ml_approx_similarity_join(
brp_fit,
tbl_titanic_va,
.,
threshold = 1)
## Warning: 'sdf_predict' is deprecated.
## See help("Deprecated")
# create the hashes and joined all in one go
tbl_titanic_test_similarity_join2 <-
ml_approx_similarity_join(
brp_fit,
tbl_titanic_va,
tbl_titanic_test,
threshold = 1)

Now we can look at the matches for the first person in the test data. It looks as if none of the matches for this person survived, so we’d predict they didn’t survive either.

# get the ids of the instances in
# the training data that are similar
# to the first instance of the test data
training_ids <- tbl_titanic_test_similarity_join %>%
filter(id_b == 1) %>%
pull(id_a)

# filter on the training ids
df_titanic %>%
filter(id %in% training_ids) 
##   survived pclass sex age sibsp parch   fare  id
## 1        0      3   1  35     0     0 8.0500   5
## 2        0      3   1  35     0     0 7.0500 364
## 3        0      3   1  34     0     0 8.0500 462
## 4        0      3   1  35     0     0 7.1250 591
## 5        0      3   1  35     0     0 8.0500 615
## 6        0      3   1  34     0     0 8.0500 759
## 7        0      3   1  35     0     0 7.8958 848

### Approximate nearest neighbours

The ml_approx_nearest_neighbors() function gives us an alternate way to answer the same question – who in our training data is similar to an unlabelled instance? With the similarity join we specify a distance threshold and get back all instances with distances less than that threshold. For approximate nearest neighbours, in contrast, we get a fixed number of nearest neighbours for a new instance.

Above we found there were 7 instances in the training data with a difference in hash less than 1 when compared to the first instance of the test data. If we ask for 7 nearest neighbours back from ml_approx_nearest_neighbors() we will get the same instances, ordered by distance. Note that the input to the key argument is just a standard R vector containing the values for each feature. The vector is printed out below prior to the results to show this.

# create a vector of features for
# the first instance in the test data
(id1_input <- tbl_titanic_test %>%
filter(id == 1) %>%
pull(features) %>%
unlist())
##   3.0000  1.0000 34.5000  0.0000  0.0000  7.8292
ml_approx_nearest_neighbors(
brp_fit,
tbl_titanic_va,
key = id1_input,
dist_col = 'dist_col',
num_nearest_neighbors = 7
) %>%
collect()
## # A tibble: 7 x 11
##   survived pclass   sex   age sibsp parch  fare    id features  hash    dist_col
##      <int>  <int> <dbl> <dbl> <int> <int> <dbl> <int> <list>    <list>     <dbl>
## 1        0      3     1    35     0     0  7.90   848 <dbl > <list …    0.504
## 2        0      3     1    35     0     0  8.05   615 <dbl > <list …    0.547
## 3        0      3     1    34     0     0  8.05   759 <dbl > <list …    0.547
## 4        0      3     1    34     0     0  8.05   462 <dbl > <list …    0.547
## 5        0      3     1    35     0     0  8.05     5 <dbl > <list …    0.547
## 6        0      3     1    35     0     0  7.12   591 <dbl > <list …    0.864
## 7        0      3     1    35     0     0  7.05   364 <dbl > <list …    0.926

We could generate a naive predicted probability of survival for each instance as $$\frac{n_{survive}}{K}$$, or use more involved methods that make use of the distance information (dist_col). For this case the predicted probability would be $$\frac{0}{7} = 0$$. Using some of the more complicated methods for deriving probabilities linked above avoids this issue of zero probabilities.

## Fixing the distance or the number of neighbours

It seems to me that whether you use similarity joins or approximate neighbours will depend on what you’re trying to achieve. In some cases we might only want to say two instances are similar if a fixed threshold is met, such as in duplicate or fraud detection systems. At other times we might want to generate a prediction for every new instance, even if they aren’t that similar to anyone in training data. Which option you chose will depend on your understanding of the problem you’re trying to deal with.

# Appendix: Relationship to SMOTE

I originally got reading about LSH by looking into using SMOTE with sparklyr. My reading took me to this R Studio community post and then the paper on SMOTE-BD. In the the paper the authors talk about using KNN to create a version of SMOTE for big data. This all makes sense considering what we’ve seen so far. KNN, including this approximation, is about finding points close to some instance, just as SMOTE is about generating new data similar to some (minority) training instances.

# Resources

This section includes some references and further resources.