Scalding hiding NPEs in “operator Each failed executing operation”

Yesterday I was surprised by a failing Scalding task. Everything worked fine locally and all I git was like “job failed, see cluster log”. In the cluster log I saw the following:

2014-10-24 14:38:41,222 INFO org.apache.hadoop.mapred.TaskInProgress: Error from attempt_201410101555_2230_m_000005_3: cascading.pipe.OperatorException: [com.twitter.scalding.T…][com.twitter.scalding.RichPipe.each(RichPipe.scala:471)] operator Each failed executing operation
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
at cascading.flow.stream.FunctionEachStage$1.collect(FunctionEachStage.java:80)
at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:133)
at cascading.operation.Identity$2.operate(Identity.java:137)
at cascading.operation.Identity.operate(Identity.java:150)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
at cascading.flow.stream.SourceStage.map(SourceStage.java:102)
at cascading.flow.stream.SourceStage.run(SourceStage.java:58)
at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:130)

Eh – what? Pretty stupid because the message doesn’t even tell where to look for the error in my own code!

Lucky me, it was just a very very simple job, so I could easily run it with fake data in local mode, too.
Not so lucky me: it worked locally. So I reduced the code down to the critical code block, so the test looked like the following:

class example2(args: Args) extends Job(args) {
  val a = new IterableSource(List[(String, Int)](
    ("a,b", 0),
  ), ('a, 'b))

  a.flatMap('a -> 'c) {
    tags:String => tags.split(",")
  }
  .debug
  .write(NullSource)
}

So the error has something to do with the .flatMap(...) Operation. And suddenly the scales fell from my eyes …

  val a = new IterableSource(List[(String, Int)](
    ("a,b", 0),
    (null, 1)  // <--- causes the NPE in .flatMap
  ), ('a, 'b))

And suddenly I got the same error message in local mode – plus an additional stack trace showing the cause in my code (which I couldn’t find in the cluster log).

I’m just happy that this hit me in such an easy job and not in a more complex one …

Keep up coding!