SerializedFunction(until 1.x)

Overview

You often needs to provide a custom function if you are working with DCF. Some of them will be serialized and transfer to master/worker, and run on a different process. It will be ok if you don't use any upvalues(variables/constants from enclosing scopes), but if you needs to use closure, you should read this article first.

Functions will be serialized as source string, then deserialized with ew Function. Every upvalue should be contained in FunctionEnv:

const min = 5;
console.log(await rdd.filter(
  v => v >= min,    // Function that uses upvalues.
  { min }           // Function Env that contains upvalues with same name.
).collect());

Every value in FunctionEnv should also be serializable.

If you pass a function in FunctionEnv, it will be serialized/deserialized with same context. So you can use recursive function in this way.

function fib(i) {
  if (i <= 1) return i;
  return fib(i-1)+fib(i-2);
}
console.log(await rdd.filter(v => fib(v), { fib }));

If you need to use a third-party module, you should use require instead of import, as import grammar may have the variable name changed by compile. And you should use requireModule in function env.

await numbers.filter(isPrime).saveAsTextFile('./prime', {
  extension: 'gz',
  compressor: serialize(
    (buf: Buffer) => {
      return zlib.gzipSync(buf);
    },
    {
      zlib: requireModule('zlib'),
    },
  ),
});

If you needs to use a seralized function from a different context, you can pass it in FunctionEnv, it will be automatic deseralized after the user function was deseralized.

class Context{
  map<T1>(func: ((v: T) => T1), env?: FunctionEnv): RDD<T1> {
    // func here can be either a raw function or a SerializedFunction.
    if (typeof func === 'function') {
      func = serialize(func, env);
    }
    return this.mapPartitions((partition: T[]) => partition.map(func), {
      func,
    });
  }
}

Array/map of functions in FunctionEnv will not be deserialized automaticly. Instead, you can provide a function that returns a function instead, or you can deserialize functions by yourself.

Typescript hint:

SerializedFunction seems can be called but in fact it's not. It's for best practice to call a SerializedFunction in another function.

API

type FunctionEnv = { [key: string]: any }

function serialize<T extends (...args: any[]) => any>(f: T, env?: FunctionEnv): SerializedFunction<T>;

Serialize a function.

function deserialize<T extends (...args: any[]) => any>(f: SerializedFunction<T>): T;

Deserialize a function and return a functino that can be called directly.

Last updated