HBase and Hive with Avro Column size limits

Yesterday, we had the privilege of having to create Hive tables on top of a HBase table with avro columns. Piece of cake, we thought, since you just have to specify the schema location in Hive (since Hive 1.0, it’s actually possible to map avro schema in HBase columns to Hive columns), map it to the column in HBase and voila, it works.

Except that it just doesn’t, at least if you have a large schema definition. In our specific case, we have a XSD Schema at the root – and like all things XML, it’s quite substantial. Converted to something decent like Avro, it is still quite big. That’s when we discovered that Hive has a limit for its Serializer of 4000 chars. That limit makes sense when you consider that Hive stores its metadata in a SQL database. So after googling a bit, we found this post:

http://gbif.blogspot.de/2014/03/lots-of-columns-with-hive-and-hbase.html

which basically pointed to the SERDE_PARAMS table. And unsurprisingly, the limit there is VARCHAR(4000).

Following that advice, we updated the schema of the database using this:

alter table SERDE_PARAMS MODIFY PARAM_VALUE TEXT;

That as such isn’t enough though, because Hive was still consistently failing. Since finding out the second cause was a slightly painful experience involving remote debugging Hive Server (thanks again, Roland), without further ado, here’s the second update command:

alter table SD_PARAMS MODIFY PARAM_VALUE TEXT;

That’s also necessary, because after deserializing, Hive stores the extracted column in StorageDefinition (hence the SD in the table name), and that has the same limit.

 

Posted in Uncategorized | Leave a comment

Scalding merge, concatenation and joins of pipes

I recently built a scalding job that ran everyday collection a set of ids with timestamps to determine the newest and oldest occurrence of a set, whilst merging that with previously aggregated set. A very simple task, involving simple mapping and reducing functions with joins.

Well the latter wasn’t as straightforward as I though. If you have two pipes to join you have the choice between the following:

- joinWithLarger
- joinWithSmaller
- joinWithTiny

which is decent, as this gives you the ability to optimise the sets and mapping behaviours.You can also specify the type of join operations with the Joiner, of which there are four:

- OuterJoiner
- LeftJoin
- RightJoin
- InnerJoin

All of which do basically what joins should. In my specific case though, as I was joining a tuple of three fields:

incomingPipe.joinWithLarger(('profileId, 'foreignId, 'systemId) ->('currentProfileId, 'currentForeignId, 'currentSystemId), createCurrentPipe(), joiner = new OuterJoin)

I encountered logical but still surprising results. If you have a disjunctive set of ids, those results will be nullified as they cannot be joined. If you print out the whole output with a filter for instance like this:

filter('profileId, 'foreignId, 'systemId) {
fields: (String, String, String) => {
println(s"pid : ${fields._1} systemId ${fields._3}")
true
}

you’ll see a couple of Nulls in your output:

pid : 1337984600323010250219044000012966 systemId 82202
pid : Null systemId Null
pid : 1338141060494010250219044000006465 systemId 82202
pid : 1338151102885010250219044000007688 systemId 92078

To solve this particular problem, it makes more sense to concatenate / unionise the pipes using the ++ operator in RichPipe:

createCurrentPipe() ++ incomingPipe

One word of caution here: if you do that, make sure you bind your concatenated pipes to a val. Otherwise you’ll get the slightly frustrating

cascading.flow.planner.PlannerException: not all source taps bound to head pipes, remaining source tap names

error. If you code looks like this:

val joinedPipes = createCurrentPipe() ++ incomingPipe
.groupBy('profileId, 'foreignId, 'systemId)

change it to

val joinedPipes = createCurrentPipe() ++ incomingPipe
joinedPipes.groupBy('profileId, 'foreignId, 'systemId)

binding it to the val first will help cascading properly construct the flow and bind the sources taps.

Posted in Scala, Uncategorized | Tagged , , , | Leave a comment

ClassNotFoundException org.apache.hadoop.io.SequenceFile in Flume with HDFS Sink

If you ever encounter this error while setting up Flume with HDFS Sink:

Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.io.SequenceFile$CompressionType
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)

Just add the following JAR to your classpath in flume-env.sh:

HADOOP_HOME=your path 

FLUME_CLASSPATH="$HADOOP_HOME/share/hadoop/hdfs//hadoop-hdfs-2.5.0-cdh5.3.1.jar"
Posted in Uncategorized | 1 Comment

Writing Avro records to HBase columns

I’ve been digging around recently to see how to store avro records in HBase without “exploding” values to single columns; this being a viable alternative since Hive 0.14 with it’s support for Avro queries in HBase columns.

What you basically need to do is to serialize your avro record to a byte array and put that in your column:

ByteArrayOutputStream baos = new ByteArrayOutputStream();
DatumWriter writer = new SpecificDatumWriter(SpecifcRecord.SCHEMA$);
DataFileWriter dfw = new DataFileWriter(writer);
PutRequest req = null;
try {
    dfw.create(SpecifcRecord.SCHEMA$, baos);
    dfw.append(currentEvent);
    dfw.close();
    req = new PutRequest(table, currentRowKey, colFam,
        "record".getBytes(), baos.toByteArray());
} catch (IOException e) {
    e.printStackTrace();
}

That’s basically it.

Posted in Uncategorized | Leave a comment

Counters using Cascading Flow Listeners in Scalding

As of now, Scalding doesn’t provide full support for counters – you will find a few pull requests and the Stats class, nothing more. This will probably change in the future, until then, I found using Cascading FlowListeners for counters was the most convenient solution.

In my Scalding Job class, I just need to override the listeners and add my own implementation of a FlowListener:


class MyJob (args: Args) extends Job(args) {

  // my stats group, this helps keep them apart from others
  val statsGroupName = "MyStatsGroup"
  // my metric
  val deletedUsersStat = Stat("deletedUsers", statsGroupName)


  override def listeners = super.listeners ++ List(new FlowListener {

    override def onStarting(flow: Flow[_]) {
       // init stuff goes in here
    }

    override def onCompleted(flow: Flow[_]) {
      try {
        val fs = flow.getFlowStats
        fs.getCounterGroups.foreach { group =>
          fs.getCountersFor(group).foreach { counter =>
            println(group + "::" + counter + ":" + fs.getCounterValue(group, counter))
          }
        }
        val myStat = (fs.getCounterValue(deletedUsersStat.group, deletedUsersStat.name))
      } catch {
        case e: Exception => e.printStackTrace()
      }
    }

    override def onStopping(flow: Flow[_]) {

    }

    override def onThrowable(flow: Flow[_], e: Throwable): Boolean = {
      e.printStackTrace()
      // return true after handling, otherwise your listener will stop
      true
    }
  })

You will have to use the reference to your Stat object to update and increment your metric:


  val myPipe.map {
     //... other map code
     deletedUsersStat.incBy(1)
  }

deletedUsersStat being the stat val declared at the top of MyJob in the code block with the listener declaration

Posted in Uncategorized | Tagged , | Leave a comment

If you’re wondering why your iPhone’s battery is empty again

Look no further, ditch Facebook.

In terms of battery usage, it’s the worst hog ever:

IMG_5742-2

Data usage is quite massive too:

IMG_5743

Once you terminate the app and switch to using safari for facebook, you’ll notice the difference.

Posted in Uncategorized | Leave a comment

How to debug a hadoop Job with eclipse (or any other IDE)

Before we get started – just a quick note: this will only work for as long as your hob haven’t been submitted to a cluster or as long as your jobs run locally.

This is basically just the right thing to do if you want to debug configuration parameters or other set up relevant processes. I used this to debug a CLI call to a scalding job, for instance.

1) The first thing you need to do is to add the remote debugging facility to hadoop:


export HADOOP_OPTS="$HADOOP_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=8999"

This need to be added to your conf/hadoop-env.sh or exported to your env variables.

2) Now eclipse:

Choose Run -> DebugConfigurations -> Remote Java Application

and add port 8999 to your connection settings. That’s what it should look like:

Screen Shot 2014-09-30 at 17.34.44

Posted in Uncategorized | Leave a comment