SerializedFunction(until 1.x)
Overview
You often needs to provide a custom function if you are working with DCF. Some of them will be serialized and transfer to master/worker, and run on a different process. It will be ok if you don't use any upvalues(variables/constants from enclosing scopes), but if you needs to use closure, you should read this article first.
Functions will be serialized as source string, then deserialized with ew Function
. Every upvalue should be contained in FunctionEnv:
const min = 5;
console.log(await rdd.filter(
v => v >= min, // Function that uses upvalues.
{ min } // Function Env that contains upvalues with same name.
).collect());
Every value in FunctionEnv
should also be serializable.
If you pass a function in FunctionEnv, it will be serialized/deserialized with same context. So you can use recursive function in this way.
function fib(i) {
if (i <= 1) return i;
return fib(i-1)+fib(i-2);
}
console.log(await rdd.filter(v => fib(v), { fib }));
If you need to use a third-party module, you should use require
instead of import
, as import grammar may have the variable name changed by compile. And you should use requireModule
in function env.
await numbers.filter(isPrime).saveAsTextFile('./prime', {
extension: 'gz',
compressor: serialize(
(buf: Buffer) => {
return zlib.gzipSync(buf);
},
{
zlib: requireModule('zlib'),
},
),
});
If you needs to use a seralized function from a different context, you can pass it in FunctionEnv, it will be automatic deseralized after the user function was deseralized.
class Context{
map<T1>(func: ((v: T) => T1), env?: FunctionEnv): RDD<T1> {
// func here can be either a raw function or a SerializedFunction.
if (typeof func === 'function') {
func = serialize(func, env);
}
return this.mapPartitions((partition: T[]) => partition.map(func), {
func,
});
}
}
API
type FunctionEnv = { [key: string]: any }
function serialize<T extends (...args: any[]) => any>(f: T, env?: FunctionEnv): SerializedFunction<T>;
Serialize a function.
function deserialize<T extends (...args: any[]) => any>(f: SerializedFunction<T>): T;
Deserialize a function and return a functino that can be called directly.
Last updated