Multithreading – a method to get the number of cores executing programs on a task node?
For example I need to get a list of all available executors and their respective multithreading capacity (not the total multithreading capacity, which has been handled by sc.defaultparallelism)
Because this parameter is implementation related (yarn and spark standalone have different allocation core strategies) and context (it may fluctuate due to dynamic allocation and long-term job operation) I can't estimate this in any other way Is there a way to retrieve this information using the spark API in a distributed transformation? (for example, taskcontext, sparkenv)
As for updating spark 1.6, I tried the following methods:
1) Run a one-phase job with many partitions (> > defaultparallelism) and calculate the number of unique threadids for each executorid:
val n = sc.defaultParallelism * 16 sc.parallelize(n,n).map(v => SparkEnv.get.executorID -> Thread.currentThread().getID) .groupByKey() .mapValue(_.distinct) .collect()
However, this leads to an estimated higher than actual multithreading capacity because each spark actuator uses an over configured thread pool
2) Similar to 1, except n = defaultparallelesim, and I add a delay in each task to prevent unbalanced fragmentation of the resource Negotiator (the fast node completes its task and requests more before the slow node starts running):
val n = sc.defaultParallelism sc.parallelize(n,n).map{ v => Thread.sleep(5000) SparkEnv.get.executorID -> Thread.currentThread().getID } .groupByKey() .mapValue(_.distinct) .collect()
It can work most of the time, but the speed is much faster than necessary, and may be broken by very unbalanced cluster or task speculation
3) I haven't tried this: use java reflection to read blockmanager Numusablecores, this is obviously not a stable solution, and the internal implementation may change at any time
Please tell me if you have found something better
Solution
It's easy to use spark rest API You must obtain the application ID:
val applicationId = spark.sparkContext.applicationId
UI web address:
val baseUrl = spark.sparkContext.uiWebUrl
And query:
val url = baseUrl.map { url => s"${url}/api/v1/applications/${applicationId}/executors" }
Use the Apache HTTP Library (already in the spark dependency, from https://alvinalexander.com/scala/scala-rest-client-apache-httpclient-restful-clients Adaptation):
import org.apache.http.impl.client.DefaultHttpClient import org.apache.http.client.methods.HttpGet import scala.util.Try val client = new DefaultHttpClient() val response = url .flatMap(url => Try{client.execute(new HttpGet(url))}.toOption) .flatMap(response => Try{ val s = response.getEntity().getContent() val json = scala.io.source.fromInputStream(s).getLines.mkString s.close json }.toOption)
And json4s:
import org.json4s._ import org.json4s.jackson.JsonMethods._ implicit val formats = DefaultFormats case class ExecutorInfo(hostPort: String,totalCores: Int) val executors: Option[List[ExecutorInfo]] = response.flatMap(json => Try { parse(json).extract[List[ExecutorInfo]] }.toOption)
As long as you keep the application ID and UI URL and open the UI port to an external connection, you can perform the same operation from any task