Spark - Livy (Rest API)

About

Livy is an open source REST interface for interacting with Spark from anywhere.

It supports executing:

  • snippets of code
  • or programs

in a Spark Context that runs locally or in YARN.

It's used to submit remote job.

Code example

step-by-step example of interacting with Livy in Python with the python Requests library for

Endpoint

  • Azure Hdinsight:
    • <spark_cluster_name>.azurehdinsight.net/livy
    • http://<headnode>:8998 on the same vnet than the cluster

Client

Server Management

Installation

Alive

curl -u admin -G "http://10.0.0.11:8998/"

Start and stop

export SPARK_HOME=/usr/lib/spark
export HADOOP_CONF_DIR=/etc/hadoop/conf
#  Livy uses the Spark configuration under SPARK_HOME by default. You can override the Spark configuration by setting the SPARK_CONF_DIR environment variable 
export SAPRK_CONF_DIR=$SPARK_HOME/conf

./bin/livy-server

# Hortonworks
# /usr/hdp/<hdp_version>/livy2/bin/livy-server start

Configuration

LIVY_CONF_DIR is by default is the conf directory under the Livy installation. Example: /usr/bin/livy2/conf

The configuration of Livy goes through different file:

  • livy.conf: the server configuration
  • spark-blacklist.conf: Spark configuration options that users are not allowed to override
  • log4j.properties: configuration for Livy logging. See Java - Log4j
  • livy-defaults
  • livy-env-sh

livy.conf

livy.repl.enableHiveContext=true
livy.server.port=8998
livy.server.recovery.mode=recovery
livy.server.recovery.state-store=zookeeper
livy.server.recovery.state-store.url=zk2:2181,zk3:2181,zk5:2181
livy.server.session.timeout=2073600000
livy.server.yarn.app-lookup-timeout=2m
livy.spark.master=yarn-cluster

where:

List of possible configuraton:

livy.rsc.rpc.max.size

livy-env-sh

 
#!/usr/bin/env bash

export SPARK_HOME=/usr/hdp/current/spark2-client
export HADOOP_CONF_DIR=/etc/hadoop/conf
export LIVY_LOG_DIR=/var/log/livy
export LIVY_MAX_LOG_FILES=30
export LIVY_PID_DIR=/var/run/livy
export LIVY_SERVER_JAVA_OPTS="-Xmx2g"
YARN_CLASSPATH=`yarn classpath`
export CLASSPATH="$YARN_CLASSPATH:/usr/lib/hdinsight-logging/*"
export LIVY_IDENT_STRING=livy

Log

The log is set in the environment variable LIVY_LOG_DIR

export LIVY_LOG_DIR=/var/log/livy

Process

The PID dir is found in the environment variable LIVY_PID_DIR

export LIVY_PID_DIR=/var/run/livy
ps -ef -q $(cat $LIVY_PID_DIR/livy-livy-server.pid)

ps -q $(cat $LIVY_PID_DIR/livy-livy-server.pid) -o comm=

Example of command:

/usr/lib/jvm/java-8-openjdk-amd64/bin/java -Xmx2g -cp /usr/bin/livy2/jars/*:/usr/bin/livy2/conf:/etc/hadoop/conf:/usr/hdp/2.6.2.25-1/hadoop/conf:/usr/hdp/2.6.2.25-1/hadoop/conf:/usr/hdp/2.6.2.25-1/hadoop/conf:/usr/hdp/2.6.2.25-1/hadoop/lib/*:/usr/hdp/2.6.2.25-1/hadoop/.//*:/usr/hdp/2.6.2.25-1/hadoop-hdfs/./:/usr/hdp/2.6.2.25-1/hadoop-hdfs/lib/*:/usr/hdp/2.6.2.25-1/hadoop-hdfs/.//*:/usr/hdp/2.6.2.25-1/hadoop-yarn/lib/*:/usr/hdp/2.6.2.25-1/hadoop-yarn/.//*:/usr/hdp/2.6.2.25-1/hadoop-mapreduce/lib/*:/usr/hdp/2.6.2.25-1/hadoop-mapreduce/.//*::/usr/hdp/current/hadoop-mapreduce-client/*:/usr/hdp/2.6.2.25-1/tez/*:/usr/hdp/2.6.2.25-1/tez/lib/*:/usr/hdp/2.6.2.25-1/tez/conf:/usr/hdp/current/hadoop-yarn-resourcemanager/.//*:/usr/hdp/current/hadoop-yarn-resourcemanager/lib/*:/usr/lib/hdinsight-logging/* com.cloudera.livy.server.LivyServer

API

See Rest API

Session

List

  • Get: Returns all the active interactive sessions.
GET /sessions
{
	"from":0,
	"total":2,
	"sessions":
		[
			{"id":0,"appId":"application_1525336261063_0027","owner":null,"proxyUser":null,"state":"dead","kind":"spark","appInfo":{"driverLogUrl":null,"sparkUiUrl":"http://hn1:8088/cluster/app/application_1525336261063_0027"},"log":["YARN Diagnostics:","Application killed by user."]},
			{"id":1,"appId":"application_1525370262995_5114","owner":null,"proxyUser":null,"state":"dead","kind":"spark","appInfo":{"driverLogUrl":null,"sparkUiUrl":"http://hn1:8088/proxy/application_1525370262995_5114/"},"log":[]}
		]
}
  • Returns the session information.
GET /sessions/{sessionId}
{
  "id":1,
  "appId":"application_1525370262995_5114",
  "owner":null,
  "proxyUser":null,
  "state":"dead",
  "kind":"spark",
  "appInfo": {
    "driverLogUrl":null,
    "sparkUiUrl":"http://hn1:8088/proxy/application_1525370262995_5114/"
    },
  "log":[]
}

State

  • Returns the state of session. See possible returns value: cloudera/livy
GET /sessions/{sessionId}/state
{
  "id":1,
  "state":"dead"
}

Create Session

  • Creates a new interactive Scala, Python, or R shell in the cluster and the return the id. See all kind of session: cloudera/livy
POST /sessions
host = ‘http://localhost:8998’
# SparkShell
data = {‘kind’: ‘spark’}
headers = {‘Content-Type’: ‘application/json’}
r = requests.post(host + ‘/sessions’, data=json.dumps(data), headers=headers)
r.json()

where:

name description type
kind The session kind (required) session kind
proxyUser User to impersonate when starting the session string
jars jars to be used in this session List of string
pyFiles Python files to be used in this session List of string
files files to be used in this session List of string
driverMemory Amount of memory to use for the driver process string
driverCores Number of cores to use for the driver process int
executorMemory Amount of memory to use per executor process string
executorCores Number of cores to use for each executor int
numExecutors Number of executors to launch for this session int
archives Archives to be used in this session List of string
queue The name of the YARN queue to which submitted string
name The name of this session string
conf Spark configuration properties Map of key=val
heartbeatTimeoutInSecond Timeout in second to which session be orphaned int

Example of output:

{u’state’: u’starting’, u’id’: 0, u’kind’: u’spark’}

Kill

DELETE /sessions/{sessionId}

Log

Gets the log lines from this session.

GET /sessions/{sessionId}/log
{"id":1,"from":0,"total":0,"log":[]}

Statement

List

  • All: Returns all the statements in a session.
GET /sessions/{sessionId}/statements
  • one
GET /sessions/{sessionId}/statements/{statementId}

Submit

Runs a statement in a session.

POST /sessions/{sessionId}/statements
data = {‘code’: ‘sc.parallelize(1 to 10).count()’}
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
r.json()
{u’output’: None, u’state’: u’running’, u’id’: 0}

Get the result

GET /sessions/{sessionId}/statements/{statementId}
statement_url = host + r.headers[‘location’]

r = requests.get(statement_url, headers=headers)
pprint.pprint(r.json())
{u’id’: 0,
 u’output’: {u’data’: {u’text/plain’: u’res0: Long = 10′},
             u’execution_count’: 0,
             u’status’: u’ok’},
 u’state’: u’available’}

Cancel

POST /sessions/{sessionId}/statements/{statementId}/cancel

Batch

There is also a batch of statement. See: https://github.com/cloudera/livy#get-batches

Get

  • All
curl -k \
  --user "<user>:<user password>" \
  -v \
  -X GET \
  "https://hostname/livy/batches"
  • One
curl -k \
  --user "<user>:<user password>" \
  -v \
  -X GET \
  "https://hostname/livy/batches/{batchId}"

Delete

curl -k \
  --user "<user>:<user password>" \
  -v \
  -X DELETE \
  "https://hostname/livy/batches/{batchId}"

Submit application

Spark - Spark-submit

master and deploy mode are not in the data because they are configured in the configuration

Example: for data, see cloudera/livy

POST /batches
{
  “className”: “org.apache.spark.examples.SparkPi”,
  “executorMemory”: “20g”,
  “args”: [2000],
  “file”: “/path/to/examples.jar”
}

With curl:

curl -k \
  --user "<user>:<user password>" \
  -v \
  -H "Content-Type: application/json" \
  -X POST \
  -d '{ "file":"<path to application jar>", "className":"<classname in jar>" }' \
  # If the data (file, classname) are in a file
  # --data @C:\Temp\input.txt \
  -H "X-Requested-By: admin" \
  'https://hostname/livy/batches' 

where:

Documentation / Reference


Powered by ComboStrap