RDD<T>
A Resilient Distributed Dataset (RDD), the basic abstraction in DCF. Represents an immutable, partitioned collection of elements that can be operated on in parallel.
persist(storageType?: StorageType): CacheRDD<T>
Persist this rdd into storage after the first time it is computed. You should manually unpersist()
it later. If no storage level is specified, defaults to 'memory'.
cache(): CachedRDD<T>
Persist this RDD with the default storage level ('memory').
getNumPartitions: Promise<number>
Returns the number of partitions in RDD, skip calculating if possible.
> rdd = sc.parallelize([1, 2, 3, 4], 2)
> rdd.getNumPartitions()
2
collect(): Promise<T[]>
Return a list that contains all of the elements in this RDD.
take(count: number): Promise<T[]>
Take the first num elements of the RDD.
count(): Promise<number>
Return the number of elements in this RDD.
> dcc.parallelize([2, 3, 4]).count()
3
max(): Promise<T | null>
Find the maximum item in this RDD. If the RDD is empty, returns null.
> rdd = dcc.parallelize([2, 5, 43, 10])
> rdd.max()
43
> rdd.filter(v => v > 100).max()
null
min(): Promise<T | null>
Find the minimum item in this RDD. If the RDD is empty, returns null.
> rdd = dcc.parallelize([2, 5, 43, 10])
> rdd.min()
2
> rdd.filter(v => v < 0).min()
null
mapPartitions<T1>(func: (v: T[]) => T1[] | Promise<T1[]>, env?: FunctionEnv): RDD<T1>
Return a new RDD by applying a function to each partition of this RDD.
> rdd = dcc.parallelize([1, 2, 3, 4], 2)
> rdd.mapPartitions(arr => arr.reduce((a,b)=>a+b, 0)).collect()
[3, 7]
glom(): RDD<T[]>
Returns an RDD created by coalescing all elements within each partition into a list.
> rdd = dcc.parallelize([1, 2, 3, 4], 2)
> rdd.glom().collect()
[[1, 2], [3, 4]]
map<T1>(func: ((v: T) => T1), env?: FunctionEnv): RDD<T1>
Return a new RDD by applying a function to each element of this RDD.
> rdd = dcc.parallelize(["b", "a", "c"])
> rdd.map(x => [x, 1]).collect()
[ [ 'b', 1 ], [ 'a', 1 ], [ 'c', 1 ] ]
reduce(func: ((a: T, b: T) => T), env?: FunctionEnv): Promise<T | null>
Reduces the elements of this RDD using the specified commutative and associative binary operator, returns the final result.
If the input RDD is empty, returns null.
> dcc.parallelize([1,2,3,4,5]).reduce((a,b)=>a+b)
15
> dcc.parallelize(new Array(10).fill(2)).cache().reduce((a,b)=>a+b)
20
> dcc.parallelize([]).reduce((a,b)=>a+b)
null
flatMap<T1>(func: ((v: T) => T1[]), env?: FunctionEnv): RDD<T1>
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
> rdd = dcc.parallelize([2,3,4])
> rdd.flatMap(v => new Array(v).fill(1).map((v,i)=>i + 1)).collect()
[ 1, 2, 1, 2, 3, 1, 2, 3, 4 ]
filter(func: (v: T) => boolean, env?: FunctionEnv): RDD<T>
Return a new RDD containing only the elements that satisfy a predicate.
> rdd = dcc.parallelize([1, 2, 3, 4, 5])
> rdd.filter(x => x % 2 == 0).collect()
[2, 4]
distinct(numPartitions?: number): RDD<T>
Return a new RDD containing the distinct elements in this RDD.
> dcc.parallelize([1, 1, 2, 3]).distinct().collect().then(v => v.sort())
[1, 2, 3]
repartition(numPartitions: number): RDD<T>
Return a new RDD that has exactly numPartitions partitions.
Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data. If you are decreasing the number of partitions in this RDD, consider using coalesce, which can avoid performing a full shuffle.
> rdd = dcc.parallelize([1, 2, 3, 4, 5, 6, 7], 4)
> rdd.glom().collect()
[ [ 1, 2 ], [ 3, 4 ], [ 5, 6 ], [ 7 ] ]
> rdd.repartition(2).glom().collect().then(v => v.length)
2
> rdd.repartition(10).glom().collect().then(v => v.length)
10
partitionBy(numPartitions: number, partitionFunc: (v: T) => number, env?: FunctionEnv): GeneratedRDD<T>
Return a copy of the RDD partitioned using the specified partitioner.
> rdd = dcc.parallelize([1, 2, 3, 4, 5, 6, 7], 4)
> rdd.glom().collect()
[ [ 1, 2 ], [ 3, 4 ], [ 5, 6 ], [ 7 ] ]
> rdd.partitionBy(3, v => v % 3).glom().collect()
[ [ 3, 6 ], [ 1, 4, 7 ], [ 2, 5 ] ]
coalesce(numPartitions: number): GeneratedRDD<T>
Return a new RDD that is reduced into numPartitions partitions, but keep in orders.
> rdd = dcc.parallelize([1, 2, 3, 4, 5, 6, 7], 4)
> rdd.glom().collect()
[ [ 1, 2 ], [ 3, 4 ], [ 5, 6 ], [ 7 ] ]
> rdd.coalesce(3).glom().collect()
[ [ 1, 2 ], [ 3, 4, 5 ], [ 6, 7 ] ]
reduceByKey<K, V>(this: RDD<[K, V]>, func: ((a: V, B: V) => V), numPartitions?: number, partitionFunc?: (v: K) => number, env?: FunctionEnv, ): RDD<[K, V]>
Merge the values for each key using an associative and commutative reduce function.
This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.
Output will be partitioned with numPartitions
partitions, or the default parallelism level if numPartitions
is not specified. Default partitioner is hash-partition.
> rdd = dcc.parallelize([["a", 1], ["b", 1], ["a", 1]])
> rdd.reduceByKey((a,b)=>a+b).collect().then(v => v.sort())
[ [ 'a', 2 ], [ 'b', 1 ] ]
combineByKey<K, V, C>( this: RDD<[K, V]>, createCombiner: ((a: V) => C), mergeValue: ((a: C, b: V) => C), mergeCombiners: ((a: C, b: C) => C), numPartitions?: number, partitionFunc?: (v: K) => number, env?: FunctionEnv, ): RDD<[K, C]>
Generic function to combine the elements for each key using a custom set of aggregation functions.
Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a “combined type” C.
Users provide three functions:
createCombiner
, which turns a V into a C (e.g., creates a one-element list)mergeValue
, to merge a V into a C (e.g., adds it to the end of a list)mergeCombiners
, to combine two C’s into a single one (e.g., merges the lists)
To avoid memory allocation, both mergeValue and mergeCombiners are allowed to modify and return their first argument instead of creating a new C.
In addition, users can control the partitioning of the output RDD.
> rdd = dcc.parallelize([["a", 1], ["b", 1], ["a", 2]])
> to_list = a => [a];
> append = (a,b) => a.push(b)
> extend = (a,b) => a.concat(b)
> rdd.combineByKey(to_list, append, extend).collect().then(v => v.sort())
[ [ 'a', [ 1, 2 ] ], [ 'b', [ 1 ] ] ]
union(...others: RDD<T>[]): RDD<T>
Return the union of this RDD and others.
> rdd = dcc.parallelize([1, 1, 2, 3])
> rdd.union(rdd).collect()
[ 1, 1, 2, 3, 1, 1, 2, 3 ]
saveAsTextFile(baseUrl: string, options?: Options): Promise<void>
Save this RDD as a text file, using string representations of elements.
Options
overwrite?:boolean – if should dcf erase any existing file.
encoding?:string – encoding of text file(s) (default: 'utf8')
extension?: string – extension of each partition (default: 'txt')
compressor?: (data: Buffer)=>Buffer
provide a compressor function
functionEnv?: FunctionEnv – function serialize context(upvalues)
groupWith<K, V, ...V1>(this: RDD<[K, V]>, ...others: RDD<[K, ...V1]>): RDD<[K, [V[], ...V1[]]]>
Alias for cogroup but with support for multiple RDDs.
> w = dcc.parallelize([["a", 5], ["b", 6]])
> x = dcc.parallelize([["a", 1], ["b", 4]])
> y = dcc.parallelize([["a", 2]])
> z = dcc.parallelize([["b", 42]])
> w.groupWith(x, y, z).collect().then(v => JSON.stringify(v.sort()))
[["a",[[5],[1],[2],[]]],["b",[[6],[4],[],[42]]]]
cogroup<K, V, V1>(this: RDD<[K, V]>, other: RDD<[K, V1]>, numPartitions?: number): RDD<[K, [V[], V1[]]]>
For each key k in self
or other
, return a resulting RDD that contains a tuple with the list of values for that key in self
as well as other
.
> x = dcc.parallelize([["a", 1], ["b", 4]])
> y = dcc.parallelize([["a", 2]])
> x.cogroup(y).collect().then(v => JSON.stringify(v.sort()))
[["a",[[1],[2]]],["b",[[4],[]]]]
join<K, V, V1>(this: RDD<[K, V]>, other: RDD<[K, V1]>, numPartitions?: number): RDD<[K, [V, V1]]>
Return an RDD containing all pairs of elements with matching keys in this
and other
.
Each pair of elements will be returned as a [k, [v1, v2]] tuple, where [k, v1] is in this
and [k, v2] is in other
.
Performs a hash join across the cluster.
> x = dcc.parallelize([["a", 1], ["b", 4]])
> y = dcc.parallelize([["a", 2], ["a", 3]])
> x.join(y).collect().then(v => v.sort())
[ [ 'a', [ 1, 2 ] ], [ 'a', [ 1, 3 ] ] ]
leftOuterJoin<K, V, V1>(this: RDD<[K, V]>, other: RDD<[K, V1]>, numPartitions?: number): RDD<[K, [V, V1]]>
Perform a left outer join of this
and other
.
For each element [k, v] in this
, the resulting RDD will either contain all pairs [k, [v, w]] for w in other
, or the pair [k, [v, null]] if no elements in other
have key k.
Hash-partitions the resulting RDD into the given number of partitions.
> x = dcc.parallelize([["a", 1], ["b", 4]])
> y = dcc.parallelize([["a", 2]])
> x.leftOuterJoin(y).collect().then(v => v.sort())
[ [ 'a', [ 1, 2 ] ], [ 'b', [ 4, null ] ] ]
rightOuterJoin<K, V, V1>(this: RDD<[K, V]>, other: RDD<[K, V1]>, numPartitions?: number): RDD<[K, [V, V1]]>
Perform a right outer join of this
and other
.
For each element [k, w] in other
, the resulting RDD will either contain all pairs [k, [v, w]] for v in this
, or the pair [k, [null, w]] if no elements in this
have key k.
Hash-partitions the resulting RDD into the given number of partitions.
> x = dcc.parallelize([["a", 1], ["b", 4]])
> y = dcc.parallelize([["a", 2]])
> y.rightOuterJoin(x).collect().then(v => v.sort())
[ [ 'a', [ 2, 1 ] ], [ 'b', [ null, 4 ] ] ]
fullOuterJoin<K, V, V1>(this: RDD<[K, V]>, other: RDD<[K, V1]>, numPartitions?: number): RDD<[K, [V, V1]]>
Perform a right outer join of this
and other
.
For each element [k, v] in this
, the resulting RDD will either contain all pairs [k, [v, w]] for w in other
, or the pair [k, [v, null]] if no elements in other
have key k.
Similarly, for each element [k, w] in other
, the resulting RDD will either contain all pairs [k, [v, w]] for v in this
, or the pair [k, [null, w]] if no elements in this
have key k.
Hash-partitions the resulting RDD into the given number of partitions.
> x = dcc.parallelize([["a", 1], ["b", 4]])
> y = dcc.parallelize([["a", 2], ["c", 2]])
> y.fullOuterJoin(x).collect().then(v => v.sort())
[ [ 'a', [ 2, 1 ] ], [ 'b', [ null, 4 ] ], [ 'c', [ 2, null ] ] ]
sort(ascending?: boolean, numPartitions?: number): RDD<T>
Sorts this RDD.
> dcc.parallelize([1, 3, 2, 6, 5, 4]).sort().collect()
[ 1, 2, 3, 4, 5, 6 ]
> dcc.parallelize([1, 3, 2, 6, 5, 4]).sort(false).collect()
[ 6, 5, 4, 3, 2, 1 ]
sortBy<K>(keyFunc: (data: T) => K, ascending?: boolean, numPartitions?: number, env?: FunctionEnv): RDD<T>
Sorts this RDD by the given keyfunc
> rdd = dcc.parallelize([['a', 1], ['c', 6], ['d', 5], ['b', 3], ['e', 4], ['f', 2]])
> rdd.sortBy(v => v[0]).collect()
[ [ 'a', 1 ],
[ 'b', 3 ],
[ 'c', 6 ],
[ 'd', 5 ],
[ 'e', 4 ],
[ 'f', 2 ] ]
> rdd.sortBy(v => v[1]).collect()
[ [ 'a', 1 ],
[ 'f', 2 ],
[ 'b', 3 ],
[ 'e', 4 ],
[ 'd', 5 ],
[ 'c', 6 ] ]
CachedRDD<T> extends RDD<T>
unpersist(): Promise<void>
Remove cached rdd from memory and disk.
Last updated