Livy is an open source REST interface for interacting with Spark from anywhere.
It supports executing:
in a Spark Context that runs locally or in YARN.
It's used to submit remote job.
step-by-step example of interacting with Livy in Python with the python Requests library for
curl -u admin -G "http://10.0.0.11:8998/"
export SPARK_HOME=/usr/lib/spark
export HADOOP_CONF_DIR=/etc/hadoop/conf
# Livy uses the Spark configuration under SPARK_HOME by default. You can override the Spark configuration by setting the SPARK_CONF_DIR environment variable
export SAPRK_CONF_DIR=$SPARK_HOME/conf
./bin/livy-server
# Hortonworks
# /usr/hdp/<hdp_version>/livy2/bin/livy-server start
LIVY_CONF_DIR is by default is the conf directory under the Livy installation. Example: /usr/bin/livy2/conf
The configuration of Livy goes through different file:
livy.repl.enableHiveContext=true
livy.server.port=8998
livy.server.recovery.mode=recovery
livy.server.recovery.state-store=zookeeper
livy.server.recovery.state-store.url=zk2:2181,zk3:2181,zk5:2181
livy.server.session.timeout=2073600000
livy.server.yarn.app-lookup-timeout=2m
livy.spark.master=yarn-cluster
where:
List of possible configuraton:
livy.rsc.rpc.max.size
#!/usr/bin/env bash
export SPARK_HOME=/usr/hdp/current/spark2-client
export HADOOP_CONF_DIR=/etc/hadoop/conf
export LIVY_LOG_DIR=/var/log/livy
export LIVY_MAX_LOG_FILES=30
export LIVY_PID_DIR=/var/run/livy
export LIVY_SERVER_JAVA_OPTS="-Xmx2g"
YARN_CLASSPATH=`yarn classpath`
export CLASSPATH="$YARN_CLASSPATH:/usr/lib/hdinsight-logging/*"
export LIVY_IDENT_STRING=livy
The log is set in the environment variable LIVY_LOG_DIR
export LIVY_LOG_DIR=/var/log/livy
The PID dir is found in the environment variable LIVY_PID_DIR
export LIVY_PID_DIR=/var/run/livy
ps -ef -q $(cat $LIVY_PID_DIR/livy-livy-server.pid)
ps -q $(cat $LIVY_PID_DIR/livy-livy-server.pid) -o comm=
Example of command:
/usr/lib/jvm/java-8-openjdk-amd64/bin/java -Xmx2g -cp /usr/bin/livy2/jars/*:/usr/bin/livy2/conf:/etc/hadoop/conf:/usr/hdp/2.6.2.25-1/hadoop/conf:/usr/hdp/2.6.2.25-1/hadoop/conf:/usr/hdp/2.6.2.25-1/hadoop/conf:/usr/hdp/2.6.2.25-1/hadoop/lib/*:/usr/hdp/2.6.2.25-1/hadoop/.//*:/usr/hdp/2.6.2.25-1/hadoop-hdfs/./:/usr/hdp/2.6.2.25-1/hadoop-hdfs/lib/*:/usr/hdp/2.6.2.25-1/hadoop-hdfs/.//*:/usr/hdp/2.6.2.25-1/hadoop-yarn/lib/*:/usr/hdp/2.6.2.25-1/hadoop-yarn/.//*:/usr/hdp/2.6.2.25-1/hadoop-mapreduce/lib/*:/usr/hdp/2.6.2.25-1/hadoop-mapreduce/.//*::/usr/hdp/current/hadoop-mapreduce-client/*:/usr/hdp/2.6.2.25-1/tez/*:/usr/hdp/2.6.2.25-1/tez/lib/*:/usr/hdp/2.6.2.25-1/tez/conf:/usr/hdp/current/hadoop-yarn-resourcemanager/.//*:/usr/hdp/current/hadoop-yarn-resourcemanager/lib/*:/usr/lib/hdinsight-logging/* com.cloudera.livy.server.LivyServer
See Rest API
GET /sessions
{
"from":0,
"total":2,
"sessions":
[
{"id":0,"appId":"application_1525336261063_0027","owner":null,"proxyUser":null,"state":"dead","kind":"spark","appInfo":{"driverLogUrl":null,"sparkUiUrl":"http://hn1:8088/cluster/app/application_1525336261063_0027"},"log":["YARN Diagnostics:","Application killed by user."]},
{"id":1,"appId":"application_1525370262995_5114","owner":null,"proxyUser":null,"state":"dead","kind":"spark","appInfo":{"driverLogUrl":null,"sparkUiUrl":"http://hn1:8088/proxy/application_1525370262995_5114/"},"log":[]}
]
}
GET /sessions/{sessionId}
{
"id":1,
"appId":"application_1525370262995_5114",
"owner":null,
"proxyUser":null,
"state":"dead",
"kind":"spark",
"appInfo": {
"driverLogUrl":null,
"sparkUiUrl":"http://hn1:8088/proxy/application_1525370262995_5114/"
},
"log":[]
}
GET /sessions/{sessionId}/state
{
"id":1,
"state":"dead"
}
POST /sessions
host = ‘http://localhost:8998’
# SparkShell
data = {‘kind’: ‘spark’}
headers = {‘Content-Type’: ‘application/json’}
r = requests.post(host + ‘/sessions’, data=json.dumps(data), headers=headers)
r.json()
where:
name | description | type |
---|---|---|
kind | The session kind (required) | session kind |
proxyUser | User to impersonate when starting the session | string |
jars | jars to be used in this session | List of string |
pyFiles | Python files to be used in this session | List of string |
files | files to be used in this session | List of string |
driverMemory | Amount of memory to use for the driver process | string |
driverCores | Number of cores to use for the driver process | int |
executorMemory | Amount of memory to use per executor process | string |
executorCores | Number of cores to use for each executor | int |
numExecutors | Number of executors to launch for this session | int |
archives | Archives to be used in this session | List of string |
queue | The name of the YARN queue to which submitted | string |
name | The name of this session | string |
conf | Spark configuration properties | Map of key=val |
heartbeatTimeoutInSecond | Timeout in second to which session be orphaned | int |
Example of output:
{u’state’: u’starting’, u’id’: 0, u’kind’: u’spark’}
DELETE /sessions/{sessionId}
Gets the log lines from this session.
GET /sessions/{sessionId}/log
{"id":1,"from":0,"total":0,"log":[]}
GET /sessions/{sessionId}/statements
GET /sessions/{sessionId}/statements/{statementId}
Runs a statement in a session.
POST /sessions/{sessionId}/statements
data = {‘code’: ‘sc.parallelize(1 to 10).count()’}
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
r.json()
{u’output’: None, u’state’: u’running’, u’id’: 0}
GET /sessions/{sessionId}/statements/{statementId}
statement_url = host + r.headers[‘location’]
r = requests.get(statement_url, headers=headers)
pprint.pprint(r.json())
{u’id’: 0,
u’output’: {u’data’: {u’text/plain’: u’res0: Long = 10′},
u’execution_count’: 0,
u’status’: u’ok’},
u’state’: u’available’}
POST /sessions/{sessionId}/statements/{statementId}/cancel
There is also a batch of statement. See: https://github.com/cloudera/livy#get-batches
curl -k \
--user "<user>:<user password>" \
-v \
-X GET \
"https://hostname/livy/batches"
curl -k \
--user "<user>:<user password>" \
-v \
-X GET \
"https://hostname/livy/batches/{batchId}"
curl -k \
--user "<user>:<user password>" \
-v \
-X DELETE \
"https://hostname/livy/batches/{batchId}"
master and deploy mode are not in the data because they are configured in the configuration
Example: for data, see cloudera/livy
POST /batches
{
“className”: “org.apache.spark.examples.SparkPi”,
“executorMemory”: “20g”,
“args”: [2000],
“file”: “/path/to/examples.jar”
}
With curl:
curl -k \
--user "<user>:<user password>" \
-v \
-H "Content-Type: application/json" \
-X POST \
-d '{ "file":"<path to application jar>", "className":"<classname in jar>" }' \
# If the data (file, classname) are in a file
# --data @C:\Temp\input.txt \
-H "X-Requested-By: admin" \
'https://hostname/livy/batches'
where:
Azure: