Table of Contents

Spark - Accumulator

About

Accumulators can only be written by workers and read by the driver program.

They allow us to aggregate values from workers back to the driver.

Now only the driver can access the value of the accumulator for the tasks the accumulators are basically write-only.

And we can use this as an example to count errors that are seen in an RDD across workers.

Variables that can only be “added” to by associative operations.

Types: integers, double, long, float

Accumulator in Action vs Transformation

In an action, each tasks update to the accumulator is guaranteed by spark to only be applied once. When you perform transformations, there's no guarantee because a transformation might have to be run multiple times if there are slow nodes or a node fails. So you should only use accumulators for debugging purposes when you have transformations.

Example

Sum

accum = sc.accumulator(0) 
rdd = sc.parallelize([1, 2, 3, 4]) 
def f(x): 
	global accum  
	accum += x  

rdd.foreach(f)  
accum.value 
Value: 10

Counting empty lines

file = sc.textFile(inputFile) 
# Create Accumulator[Int] initialized to 0 
blankLines = sc.accumulator(0) 

def extractCallSigns(line): 
	global blankLines # Make the global variable accessible 
	if (line == ""): 
		blankLines += 1 
	return line.split(" ")
	
callSigns = file.flatMap(extractCallSigns) 
print "Blank lines: %d" % blankLines.value