Multithreading – a method to get the number of cores executing programs on a task node?

For example I need to get a list of all available executors and their respective multithreading capacity (not the total multithreading capacity, which has been handled by sc.defaultparallelism)

Because this parameter is implementation related (yarn and spark standalone have different allocation core strategies) and context (it may fluctuate due to dynamic allocation and long-term job operation) I can't estimate this in any other way Is there a way to retrieve this information using the spark API in a distributed transformation? (for example, taskcontext, sparkenv)

As for updating spark 1.6, I tried the following methods:

1) Run a one-phase job with many partitions (> > defaultparallelism) and calculate the number of unique threadids for each executorid:

val n = sc.defaultParallelism * 16
sc.parallelize(n,n).map(v => SparkEnv.get.executorID -> Thread.currentThread().getID)
.groupByKey()
.mapValue(_.distinct)
.collect()

However, this leads to an estimated higher than actual multithreading capacity because each spark actuator uses an over configured thread pool

2) Similar to 1, except n = defaultparallelesim, and I add a delay in each task to prevent unbalanced fragmentation of the resource Negotiator (the fast node completes its task and requests more before the slow node starts running):

val n = sc.defaultParallelism
sc.parallelize(n,n).map{
  v =>
    Thread.sleep(5000)
    SparkEnv.get.executorID -> Thread.currentThread().getID
}
.groupByKey()
.mapValue(_.distinct)
.collect()

It can work most of the time, but the speed is much faster than necessary, and may be broken by very unbalanced cluster or task speculation

3) I haven't tried this: use java reflection to read blockmanager Numusablecores, this is obviously not a stable solution, and the internal implementation may change at any time

Please tell me if you have found something better

Solution

It's easy to use spark rest API You must obtain the application ID:

val applicationId = spark.sparkContext.applicationId

UI web address:

val baseUrl = spark.sparkContext.uiWebUrl

And query:

val url = baseUrl.map { url => 
  s"${url}/api/v1/applications/${applicationId}/executors"
}

Use the Apache HTTP Library (already in the spark dependency, from https://alvinalexander.com/scala/scala-rest-client-apache-httpclient-restful-clients Adaptation):

import org.apache.http.impl.client.DefaultHttpClient
import org.apache.http.client.methods.HttpGet
import scala.util.Try

val client = new DefaultHttpClient()

val response = url
  .flatMap(url => Try{client.execute(new HttpGet(url))}.toOption)
  .flatMap(response => Try{
    val s = response.getEntity().getContent()
    val json = scala.io.source.fromInputStream(s).getLines.mkString
    s.close
    json
  }.toOption)

And json4s:

import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats

case class ExecutorInfo(hostPort: String,totalCores: Int)

val executors: Option[List[ExecutorInfo]] = response.flatMap(json => Try {
  parse(json).extract[List[ExecutorInfo]]
}.toOption)

As long as you keep the application ID and UI URL and open the UI port to an external connection, you can perform the same operation from any task

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>