Accumulators can only be written by workers and read by the driver program.
They allow us to aggregate values from workers back to the driver.
Now only the driver can access the value of the accumulator for the tasks the accumulators are basically write-only.
And we can use this as an example to count errors that are seen in an RDD across workers.
Variables that can only be “added” to by associative operations.
Types: integers, double, long, float
In an action, each tasks update to the accumulator is guaranteed by spark to only be applied once. When you perform transformations, there's no guarantee because a transformation might have to be run multiple times if there are slow nodes or a node fails. So you should only use accumulators for debugging purposes when you have transformations.
accum = sc.accumulator(0)
rdd = sc.parallelize([1, 2, 3, 4])
def f(x):
global accum
accum += x
rdd.foreach(f)
accum.value
Value: 10
file = sc.textFile(inputFile)
# Create Accumulator[Int] initialized to 0
blankLines = sc.accumulator(0)
def extractCallSigns(line):
global blankLines # Make the global variable accessible
if (line == ""):
blankLines += 1
return line.split(" ")
callSigns = file.flatMap(extractCallSigns)
print "Blank lines: %d" % blankLines.value