Spark - Livy (Rest API)
About
Livy is an open source REST interface for interacting with Spark from anywhere.
It supports executing:
- snippets of code
- or programs
in a Spark Context that runs locally or in YARN.
It's used to submit remote job.
Code example
step-by-step example of interacting with Livy in Python with the python Requests library for
Endpoint
- Azure Hdinsight:
- <spark_cluster_name>.azurehdinsight.net/livy
- http://<headnode>:8998 on the same vnet than the cluster
Client
- HTTP - Client (User agents) - because of the Rest API
Server Management
Installation
Alive
curl -u admin -G "http://10.0.0.11:8998/"
Start and stop
export SPARK_HOME=/usr/lib/spark
export HADOOP_CONF_DIR=/etc/hadoop/conf
# Livy uses the Spark configuration under SPARK_HOME by default. You can override the Spark configuration by setting the SPARK_CONF_DIR environment variable
export SAPRK_CONF_DIR=$SPARK_HOME/conf
./bin/livy-server
# Hortonworks
# /usr/hdp/<hdp_version>/livy2/bin/livy-server start
Configuration
LIVY_CONF_DIR is by default is the conf directory under the Livy installation. Example: /usr/bin/livy2/conf
The configuration of Livy goes through different file:
- livy.conf: the server configuration
- spark-blacklist.conf: Spark configuration options that users are not allowed to override
- log4j.properties: configuration for Livy logging. See Java - Log4j
- livy-defaults
- livy-env-sh
livy.conf
livy.repl.enableHiveContext=true
livy.server.port=8998
livy.server.recovery.mode=recovery
livy.server.recovery.state-store=zookeeper
livy.server.recovery.state-store.url=zk2:2181,zk3:2181,zk5:2181
livy.server.session.timeout=2073600000
livy.server.yarn.app-lookup-timeout=2m
livy.spark.master=yarn-cluster
where:
List of possible configuraton:
livy.rsc.rpc.max.size
livy-env-sh
#!/usr/bin/env bash
export SPARK_HOME=/usr/hdp/current/spark2-client
export HADOOP_CONF_DIR=/etc/hadoop/conf
export LIVY_LOG_DIR=/var/log/livy
export LIVY_MAX_LOG_FILES=30
export LIVY_PID_DIR=/var/run/livy
export LIVY_SERVER_JAVA_OPTS="-Xmx2g"
YARN_CLASSPATH=`yarn classpath`
export CLASSPATH="$YARN_CLASSPATH:/usr/lib/hdinsight-logging/*"
export LIVY_IDENT_STRING=livy
Log
The log is set in the environment variable LIVY_LOG_DIR
export LIVY_LOG_DIR=/var/log/livy
Process
The PID dir is found in the environment variable LIVY_PID_DIR
export LIVY_PID_DIR=/var/run/livy
ps -ef -q $(cat $LIVY_PID_DIR/livy-livy-server.pid)
ps -q $(cat $LIVY_PID_DIR/livy-livy-server.pid) -o comm=
Example of command:
/usr/lib/jvm/java-8-openjdk-amd64/bin/java -Xmx2g -cp /usr/bin/livy2/jars/*:/usr/bin/livy2/conf:/etc/hadoop/conf:/usr/hdp/2.6.2.25-1/hadoop/conf:/usr/hdp/2.6.2.25-1/hadoop/conf:/usr/hdp/2.6.2.25-1/hadoop/conf:/usr/hdp/2.6.2.25-1/hadoop/lib/*:/usr/hdp/2.6.2.25-1/hadoop/.//*:/usr/hdp/2.6.2.25-1/hadoop-hdfs/./:/usr/hdp/2.6.2.25-1/hadoop-hdfs/lib/*:/usr/hdp/2.6.2.25-1/hadoop-hdfs/.//*:/usr/hdp/2.6.2.25-1/hadoop-yarn/lib/*:/usr/hdp/2.6.2.25-1/hadoop-yarn/.//*:/usr/hdp/2.6.2.25-1/hadoop-mapreduce/lib/*:/usr/hdp/2.6.2.25-1/hadoop-mapreduce/.//*::/usr/hdp/current/hadoop-mapreduce-client/*:/usr/hdp/2.6.2.25-1/tez/*:/usr/hdp/2.6.2.25-1/tez/lib/*:/usr/hdp/2.6.2.25-1/tez/conf:/usr/hdp/current/hadoop-yarn-resourcemanager/.//*:/usr/hdp/current/hadoop-yarn-resourcemanager/lib/*:/usr/lib/hdinsight-logging/* com.cloudera.livy.server.LivyServer
API
See Rest API
Session
List
- Get: Returns all the active interactive sessions.
GET /sessions
{
"from":0,
"total":2,
"sessions":
[
{"id":0,"appId":"application_1525336261063_0027","owner":null,"proxyUser":null,"state":"dead","kind":"spark","appInfo":{"driverLogUrl":null,"sparkUiUrl":"http://hn1:8088/cluster/app/application_1525336261063_0027"},"log":["YARN Diagnostics:","Application killed by user."]},
{"id":1,"appId":"application_1525370262995_5114","owner":null,"proxyUser":null,"state":"dead","kind":"spark","appInfo":{"driverLogUrl":null,"sparkUiUrl":"http://hn1:8088/proxy/application_1525370262995_5114/"},"log":[]}
]
}
- Returns the session information.
GET /sessions/{sessionId}
{
"id":1,
"appId":"application_1525370262995_5114",
"owner":null,
"proxyUser":null,
"state":"dead",
"kind":"spark",
"appInfo": {
"driverLogUrl":null,
"sparkUiUrl":"http://hn1:8088/proxy/application_1525370262995_5114/"
},
"log":[]
}
State
- Returns the state of session. See possible returns value: cloudera/livy
GET /sessions/{sessionId}/state
{
"id":1,
"state":"dead"
}
Create Session
- Creates a new interactive Scala, Python, or R shell in the cluster and the return the id. See all kind of session: cloudera/livy
POST /sessions
host = ‘http://localhost:8998’
# SparkShell
data = {‘kind’: ‘spark’}
headers = {‘Content-Type’: ‘application/json’}
r = requests.post(host + ‘/sessions’, data=json.dumps(data), headers=headers)
r.json()
where:
- Request parameters may be:
name | description | type |
---|---|---|
kind | The session kind (required) | session kind |
proxyUser | User to impersonate when starting the session | string |
jars | jars to be used in this session | List of string |
pyFiles | Python files to be used in this session | List of string |
files | files to be used in this session | List of string |
driverMemory | Amount of memory to use for the driver process | string |
driverCores | Number of cores to use for the driver process | int |
executorMemory | Amount of memory to use per executor process | string |
executorCores | Number of cores to use for each executor | int |
numExecutors | Number of executors to launch for this session | int |
archives | Archives to be used in this session | List of string |
queue | The name of the YARN queue to which submitted | string |
name | The name of this session | string |
conf | Spark configuration properties | Map of key=val |
heartbeatTimeoutInSecond | Timeout in second to which session be orphaned | int |
Example of output:
{u’state’: u’starting’, u’id’: 0, u’kind’: u’spark’}
Kill
DELETE /sessions/{sessionId}
Log
Gets the log lines from this session.
GET /sessions/{sessionId}/log
{"id":1,"from":0,"total":0,"log":[]}
Statement
List
- All: Returns all the statements in a session.
GET /sessions/{sessionId}/statements
- one
GET /sessions/{sessionId}/statements/{statementId}
Submit
Runs a statement in a session.
POST /sessions/{sessionId}/statements
data = {‘code’: ‘sc.parallelize(1 to 10).count()’}
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
r.json()
{u’output’: None, u’state’: u’running’, u’id’: 0}
Get the result
GET /sessions/{sessionId}/statements/{statementId}
statement_url = host + r.headers[‘location’]
r = requests.get(statement_url, headers=headers)
pprint.pprint(r.json())
{u’id’: 0,
u’output’: {u’data’: {u’text/plain’: u’res0: Long = 10′},
u’execution_count’: 0,
u’status’: u’ok’},
u’state’: u’available’}
Cancel
POST /sessions/{sessionId}/statements/{statementId}/cancel
Batch
There is also a batch of statement. See: https://github.com/cloudera/livy#get-batches
Get
- All
curl -k \
--user "<user>:<user password>" \
-v \
-X GET \
"https://hostname/livy/batches"
- One
curl -k \
--user "<user>:<user password>" \
-v \
-X GET \
"https://hostname/livy/batches/{batchId}"
Delete
curl -k \
--user "<user>:<user password>" \
-v \
-X DELETE \
"https://hostname/livy/batches/{batchId}"
Submit application
master and deploy mode are not in the data because they are configured in the configuration
Example: for data, see cloudera/livy
POST /batches
{
“className”: “org.apache.spark.examples.SparkPi”,
“executorMemory”: “20g”,
“args”: [2000],
“file”: “/path/to/examples.jar”
}
With curl:
curl -k \
--user "<user>:<user password>" \
-v \
-H "Content-Type: application/json" \
-X POST \
-d '{ "file":"<path to application jar>", "className":"<classname in jar>" }' \
# If the data (file, classname) are in a file
# --data @C:\Temp\input.txt \
-H "X-Requested-By: admin" \
'https://hostname/livy/batches'
where:
- file example for Azure wasb: wasb://[email protected]/data/SparkSimpleTest.jar
Documentation / Reference
Azure: