RDD<T>

A Resilient Distributed Dataset (RDD), the basic abstraction in DCF. Represents an immutable, partitioned collection of elements that can be operated on in parallel.

persist(storageType?: StorageType): CacheRDD<T>

Persist this rdd into storage after the first time it is computed. You should manually unpersist() it later. If no storage level is specified, defaults to 'memory'.

cache(): CachedRDD<T>

Persist this RDD with the default storage level ('memory').

getNumPartitions: Promise<number>

Returns the number of partitions in RDD, skip calculating if possible.

> rdd = sc.parallelize([1, 2, 3, 4], 2)
> rdd.getNumPartitions()
2

collect(): Promise<T[]>

Return a list that contains all of the elements in this RDD.

This method should be used if the resulting array is expected to be small, as all the data is loaded into the client's memory.

This method should be used only if the resulting type is serializable. Otherwise the result could be unpredictable.

take(count: number): Promise<T[]>

Take the first num elements of the RDD.

This method should be used if the resulting array is expected to be small, as all the data is loaded into the client's memory.

This method should be used only if the resulting type is serializable. Otherwise the result could be unpredictable.

count(): Promise<number>

Return the number of elements in this RDD.

> dcc.parallelize([2, 3, 4]).count()
3

max(): Promise<T | null>

Find the maximum item in this RDD. If the RDD is empty, returns null.

> rdd = dcc.parallelize([2, 5, 43, 10])
> rdd.max()
43
> rdd.filter(v => v > 100).max()
null

min(): Promise<T | null>

Find the minimum item in this RDD. If the RDD is empty, returns null.

> rdd = dcc.parallelize([2, 5, 43, 10])
> rdd.min()
2
> rdd.filter(v => v < 0).min()
null

mapPartitions<T1>(func: (v: T[]) => T1[] | Promise<T1[]>, env?: FunctionEnv): RDD<T1>

Return a new RDD by applying a function to each partition of this RDD.

> rdd = dcc.parallelize([1, 2, 3, 4], 2)
> rdd.mapPartitions(arr => arr.reduce((a,b)=>a+b, 0)).collect()
[3, 7]

glom(): RDD<T[]>

Returns an RDD created by coalescing all elements within each partition into a list.

> rdd = dcc.parallelize([1, 2, 3, 4], 2)
> rdd.glom().collect()
[[1, 2], [3, 4]]

map<T1>(func: ((v: T) => T1), env?: FunctionEnv): RDD<T1>

Return a new RDD by applying a function to each element of this RDD.

> rdd = dcc.parallelize(["b", "a", "c"])
> rdd.map(x => [x, 1]).collect()
[ [ 'b', 1 ], [ 'a', 1 ], [ 'c', 1 ] ]

reduce(func: ((a: T, b: T) => T), env?: FunctionEnv): Promise<T | null>

Reduces the elements of this RDD using the specified commutative and associative binary operator, returns the final result.

If the input RDD is empty, returns null.

> dcc.parallelize([1,2,3,4,5]).reduce((a,b)=>a+b)
15
> dcc.parallelize(new Array(10).fill(2)).cache().reduce((a,b)=>a+b)
20
> dcc.parallelize([]).reduce((a,b)=>a+b)
null

flatMap<T1>(func: ((v: T) => T1[]), env?: FunctionEnv): RDD<T1>

Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.

> rdd = dcc.parallelize([2,3,4])
> rdd.flatMap(v => new Array(v).fill(1).map((v,i)=>i + 1)).collect()
[ 1, 2, 1, 2, 3, 1, 2, 3, 4 ]

filter(func: (v: T) => boolean, env?: FunctionEnv): RDD<T>

Return a new RDD containing only the elements that satisfy a predicate.

> rdd = dcc.parallelize([1, 2, 3, 4, 5])
> rdd.filter(x => x % 2 == 0).collect()
[2, 4]

distinct(numPartitions?: number): RDD<T>

Return a new RDD containing the distinct elements in this RDD.

> dcc.parallelize([1, 1, 2, 3]).distinct().collect().then(v => v.sort())
[1, 2, 3]

repartition(numPartitions: number): RDD<T>

Return a new RDD that has exactly numPartitions partitions.

Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data. If you are decreasing the number of partitions in this RDD, consider using coalesce, which can avoid performing a full shuffle.

> rdd = dcc.parallelize([1, 2, 3, 4, 5, 6, 7], 4)
> rdd.glom().collect()
[ [ 1, 2 ], [ 3, 4 ], [ 5, 6 ], [ 7 ] ]
> rdd.repartition(2).glom().collect().then(v => v.length)
2
> rdd.repartition(10).glom().collect().then(v => v.length)
10

partitionBy(numPartitions: number, partitionFunc: (v: T) => number, env?: FunctionEnv): GeneratedRDD<T>

Return a copy of the RDD partitioned using the specified partitioner.

> rdd = dcc.parallelize([1, 2, 3, 4, 5, 6, 7], 4)
> rdd.glom().collect()
[ [ 1, 2 ], [ 3, 4 ], [ 5, 6 ], [ 7 ] ]
> rdd.partitionBy(3, v => v % 3).glom().collect()
[ [ 3, 6 ], [ 1, 4, 7 ], [ 2, 5 ] ]

coalesce(numPartitions: number): GeneratedRDD<T>

Return a new RDD that is reduced into numPartitions partitions, but keep in orders.

> rdd = dcc.parallelize([1, 2, 3, 4, 5, 6, 7], 4)
> rdd.glom().collect()
[ [ 1, 2 ], [ 3, 4 ], [ 5, 6 ], [ 7 ] ]
> rdd.coalesce(3).glom().collect()
[ [ 1, 2 ], [ 3, 4, 5 ], [ 6, 7 ] ]

reduceByKey<K, V>(this: RDD<[K, V]>, func: ((a: V, B: V) => V), numPartitions?: number, partitionFunc?: (v: K) => number, env?: FunctionEnv, ): RDD<[K, V]>

Merge the values for each key using an associative and commutative reduce function.

This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.

Output will be partitioned with numPartitions partitions, or the default parallelism level if numPartitions is not specified. Default partitioner is hash-partition.

> rdd = dcc.parallelize([["a", 1], ["b", 1], ["a", 1]])
> rdd.reduceByKey((a,b)=>a+b).collect().then(v => v.sort())
[ [ 'a', 2 ], [ 'b', 1 ] ]

combineByKey<K, V, C>( this: RDD<[K, V]>, createCombiner: ((a: V) => C), mergeValue: ((a: C, b: V) => C), mergeCombiners: ((a: C, b: C) => C), numPartitions?: number, partitionFunc?: (v: K) => number, env?: FunctionEnv, ): RDD<[K, C]>

Generic function to combine the elements for each key using a custom set of aggregation functions.

Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a “combined type” C.

Users provide three functions:

  • createCombiner, which turns a V into a C (e.g., creates a one-element list)

  • mergeValue, to merge a V into a C (e.g., adds it to the end of a list)

  • mergeCombiners, to combine two C’s into a single one (e.g., merges the lists)

To avoid memory allocation, both mergeValue and mergeCombiners are allowed to modify and return their first argument instead of creating a new C.

In addition, users can control the partitioning of the output RDD.

> rdd = dcc.parallelize([["a", 1], ["b", 1], ["a", 2]])
> to_list = a => [a];
> append = (a,b) => a.push(b)
> extend = (a,b) => a.concat(b)
> rdd.combineByKey(to_list, append, extend).collect().then(v => v.sort())
[ [ 'a', [ 1, 2 ] ], [ 'b', [ 1 ] ] ]

union(...others: RDD<T>[]): RDD<T>

Return the union of this RDD and others.

> rdd = dcc.parallelize([1, 1, 2, 3])
> rdd.union(rdd).collect()
[ 1, 1, 2, 3, 1, 1, 2, 3 ]

saveAsTextFile(baseUrl: string, options?: Options): Promise<void>

Save this RDD as a text file, using string representations of elements.

Options

  • overwrite?:boolean – if should dcf erase any existing file.

  • encoding?:string – encoding of text file(s) (default: 'utf8')

  • extension?: string – extension of each partition (default: 'txt')

  • compressor?: (data: Buffer)=>Buffer

    • provide a compressor function

  • functionEnv?: FunctionEnv – function serialize context(upvalues)

groupWith<K, V, ...V1>(this: RDD<[K, V]>, ...others: RDD<[K, ...V1]>): RDD<[K, [V[], ...V1[]]]>

Alias for cogroup but with support for multiple RDDs.

> w = dcc.parallelize([["a", 5], ["b", 6]])
> x = dcc.parallelize([["a", 1], ["b", 4]])
> y = dcc.parallelize([["a", 2]])
> z = dcc.parallelize([["b", 42]])
> w.groupWith(x, y, z).collect().then(v => JSON.stringify(v.sort()))
[["a",[[5],[1],[2],[]]],["b",[[6],[4],[],[42]]]]

cogroup<K, V, V1>(this: RDD<[K, V]>, other: RDD<[K, V1]>, numPartitions?: number): RDD<[K, [V[], V1[]]]>

For each key k in self or other, return a resulting RDD that contains a tuple with the list of values for that key in self as well as other.

> x = dcc.parallelize([["a", 1], ["b", 4]])
> y = dcc.parallelize([["a", 2]])
> x.cogroup(y).collect().then(v => JSON.stringify(v.sort()))
[["a",[[1],[2]]],["b",[[4],[]]]]

join<K, V, V1>(this: RDD<[K, V]>, other: RDD<[K, V1]>, numPartitions?: number): RDD<[K, [V, V1]]>

Return an RDD containing all pairs of elements with matching keys in this and other.

Each pair of elements will be returned as a [k, [v1, v2]] tuple, where [k, v1] is in this and [k, v2] is in other.

Performs a hash join across the cluster.

> x = dcc.parallelize([["a", 1], ["b", 4]])
> y = dcc.parallelize([["a", 2], ["a", 3]])
> x.join(y).collect().then(v => v.sort())
[ [ 'a', [ 1, 2 ] ], [ 'a', [ 1, 3 ] ] ]

leftOuterJoin<K, V, V1>(this: RDD<[K, V]>, other: RDD<[K, V1]>, numPartitions?: number): RDD<[K, [V, V1]]>

Perform a left outer join of this and other.

For each element [k, v] in this, the resulting RDD will either contain all pairs [k, [v, w]] for w in other, or the pair [k, [v, null]] if no elements in other have key k.

Hash-partitions the resulting RDD into the given number of partitions.

> x = dcc.parallelize([["a", 1], ["b", 4]])
> y = dcc.parallelize([["a", 2]])
> x.leftOuterJoin(y).collect().then(v => v.sort())
[ [ 'a', [ 1, 2 ] ], [ 'b', [ 4, null ] ] ]

rightOuterJoin<K, V, V1>(this: RDD<[K, V]>, other: RDD<[K, V1]>, numPartitions?: number): RDD<[K, [V, V1]]>

Perform a right outer join of this and other.

For each element [k, w] in other, the resulting RDD will either contain all pairs [k, [v, w]] for v in this, or the pair [k, [null, w]] if no elements in this have key k.

Hash-partitions the resulting RDD into the given number of partitions.

> x = dcc.parallelize([["a", 1], ["b", 4]])
> y = dcc.parallelize([["a", 2]])
> y.rightOuterJoin(x).collect().then(v => v.sort())
[ [ 'a', [ 2, 1 ] ], [ 'b', [ null, 4 ] ] ]

fullOuterJoin<K, V, V1>(this: RDD<[K, V]>, other: RDD<[K, V1]>, numPartitions?: number): RDD<[K, [V, V1]]>

Perform a right outer join of this and other.

For each element [k, v] in this, the resulting RDD will either contain all pairs [k, [v, w]] for w in other, or the pair [k, [v, null]] if no elements in other have key k.

Similarly, for each element [k, w] in other, the resulting RDD will either contain all pairs [k, [v, w]] for v in this, or the pair [k, [null, w]] if no elements in this have key k.

Hash-partitions the resulting RDD into the given number of partitions.

> x = dcc.parallelize([["a", 1], ["b", 4]])
> y = dcc.parallelize([["a", 2], ["c", 2]])
> y.fullOuterJoin(x).collect().then(v => v.sort())
[ [ 'a', [ 2, 1 ] ], [ 'b', [ null, 4 ] ], [ 'c', [ 2, null ] ] ]

sort(ascending?: boolean, numPartitions?: number): RDD<T>

Sorts this RDD.

> dcc.parallelize([1, 3, 2, 6, 5, 4]).sort().collect()
[ 1, 2, 3, 4, 5, 6 ]
> dcc.parallelize([1, 3, 2, 6, 5, 4]).sort(false).collect()
[ 6, 5, 4, 3, 2, 1 ]

sortBy<K>(keyFunc: (data: T) => K, ascending?: boolean, numPartitions?: number, env?: FunctionEnv): RDD<T>

Sorts this RDD by the given keyfunc

> rdd = dcc.parallelize([['a', 1], ['c', 6], ['d', 5], ['b', 3], ['e', 4], ['f', 2]])
> rdd.sortBy(v => v[0]).collect()
[ [ 'a', 1 ],
  [ 'b', 3 ],
  [ 'c', 6 ],
  [ 'd', 5 ],
  [ 'e', 4 ],
  [ 'f', 2 ] ]
> rdd.sortBy(v => v[1]).collect()
[ [ 'a', 1 ],
  [ 'f', 2 ],
  [ 'b', 3 ],
  [ 'e', 4 ],
  [ 'd', 5 ],
  [ 'c', 6 ] ]

CachedRDD<T> extends RDD<T>

unpersist(): Promise<void>

Remove cached rdd from memory and disk.

Last updated