scala - How to pass one RDD in another RDD through .map -


i have 2 rdd's, , want computation on rdd2 items each item of rdd1. so, passing rdd2 in user defined function below getting error rdd1 cannot passed in rdd. can know how achieve if want perform operations on 2 rdd's?

for example:

rdd1.map(line =>function(line,rdd2))

nesting rdds not supported spark, error says. have go around redesigning algorithm.

how depends on actual use case, happen in function , it's output.

sometimes rdd1.cartesian(rdd2), doing operations per tuple , reducing key work. sometimes, if have (k,v) type join between both rdd work.

if rdd2 small can collect in driver, make broadcast variable , use variable in function instead of rdd2.

@edit:

for example's sake let's assume rdds hold strings , function count how many times given record rdd occurs in rdd2:

def function(line: string, rdd: rdd[string]): (string, int) = {    (line, rdd.filter(_ == line).count) }  

this return rdd[(string, int)].

idea1

you can try using cartesian product using rdd's cartesian method.

val cartesianproduct = rdd1.cartesian(rdd2) // creates rdd[(string, string)]                            .map( (r1,r2) => (r1, function2) ) // creates rdd[(string, int)]                            .reducebykey( (c1,c2) => c1 + c2 ) // final rdd[(string, int)] 

here function2 takes r1 , r2 (which strings) , returns 1 if equal , 0 if not. final map result in rdd have tuples key record r1 , value total count.

problem1: not work if have duplicate strings in rdd1, though. you'd have think it. if rdd1 records have unique ids perfect.

problem2: create lot of pairs (for 1mln records in both rdd create around 500bln pairs), slow , result in lot of shuffling.

idea2

i didn't understand comment regarding rdd2's size lacs might or might not work:

val rdd2array = sc.broadcast(rdd2.collect()) val result = rdd1.map(line => function(line, rdd2array)) 

problem: might blow memory. collect() called on driver , all records rdd2 loaded memory on driver node.

idea3

depending on use case there other ways overcome this, instance brute force algorithm similarity search similar (pun not intended) use case. 1 of alternative solutions locality sensitive hashing.


Comments

Popular posts from this blog

c++ - llvm function pass ReplaceInstWithInst malloc -

java.lang.NoClassDefFoundError When Creating New Android Project -

Decoding a Python 2 `tempfile` with python-future -