Scalding merge, concatenation and joins of pipes

I recently built a scalding job that ran everyday collection a set of ids with timestamps to determine the newest and oldest occurrence of a set, whilst merging that with previously aggregated set. A very simple task, involving simple mapping and reducing functions with joins.

Well the latter wasn’t as straightforward as I though. If you have two pipes to join you have the choice between the following:

- joinWithLarger
- joinWithSmaller
- joinWithTiny

which is decent, as this gives you the ability to optimise the sets and mapping behaviours.You can also specify the type of join operations with the Joiner, of which there are four:

- OuterJoiner
- LeftJoin
- RightJoin
- InnerJoin

All of which do basically what joins should. In my specific case though, as I was joining a tuple of three fields:

incomingPipe.joinWithLarger(('profileId, 'foreignId, 'systemId) ->('currentProfileId, 'currentForeignId, 'currentSystemId), createCurrentPipe(), joiner = new OuterJoin)

I encountered logical but still surprising results. If you have a disjunctive set of ids, those results will be nullified as they cannot be joined. If you print out the whole output with a filter for instance like this:

filter('profileId, 'foreignId, 'systemId) {
fields: (String, String, String) => {
println(s"pid : ${fields._1} systemId ${fields._3}")

you’ll see a couple of Nulls in your output:

pid : 1337984600323010250219044000012966 systemId 82202
pid : Null systemId Null
pid : 1338141060494010250219044000006465 systemId 82202
pid : 1338151102885010250219044000007688 systemId 92078

To solve this particular problem, it makes more sense to concatenate / unionise the pipes using the ++ operator in RichPipe:

createCurrentPipe() ++ incomingPipe

One word of caution here: if you do that, make sure you bind your concatenated pipes to a val. Otherwise you’ll get the slightly frustrating

cascading.flow.planner.PlannerException: not all source taps bound to head pipes, remaining source tap names

error. If you code looks like this:

val joinedPipes = createCurrentPipe() ++ incomingPipe
.groupBy('profileId, 'foreignId, 'systemId)

change it to

val joinedPipes = createCurrentPipe() ++ incomingPipe
joinedPipes.groupBy('profileId, 'foreignId, 'systemId)

binding it to the val first will help cascading properly construct the flow and bind the sources taps.

This entry was posted in Scala, Uncategorized and tagged , , , . Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s