Investigation of this problem showed that the root was spark scheduling. One of the tasks was scheduled with 6 seconds delay for no obvious reason.
So, launch time of two tasks differ in 6s. Screenshot was done with 2 executors (2 cores each) configuration. Adding more executors and cores didn’t help.
Searching similar problems in google gave result of spark jira ticket. It matched perfectly described situation: the two tasks in the single stage with different locality level. Default value for spark.locality.wait
is 3s. So, we were waiting 2 times for 3 seconds to start execution.
Setting param spark.locality.wait
to 0 seconds resolve the problem.
Data locality is used to describe how spark maps tasks and input data. It is done to minimize overhead to transfer input data to task. Here you can find detaied post how Spark calculates data locality for RDD.
according to spark documentation there are 5 types of data locality:
After some node finished current work, Spark starts look for new task for it. Going though all pending tasks Spark tries to find task with data locality not greater than the current MaxDataLocality. MaxDataLocality is calculated based on diff between current time and last launch time: curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)
.
Wait time is configured by spark job configuration:
The wait timeout for fallback between each level can be configured individually or all together in one parameter; see the spark.locality parameters on the configuration page for details.
See class org.apache.spark.scheduler.TaskSetManager for mode detailed logic of recalculating locality levels available for execution.