Table of Contents

Spark - Livy (Rest API)

About

Livy is an open source REST interface for interacting with Spark from anywhere.

It supports executing:

in a Spark Context that runs locally or in YARN.

It's used to submit remote job.

Code example

step-by-step example of interacting with Livy in Python with the python Requests library for

Endpoint

Client

Server Management

Installation

Alive

curl -u admin -G "http://10.0.0.11:8998/"

Start and stop

export SPARK_HOME=/usr/lib/spark
export HADOOP_CONF_DIR=/etc/hadoop/conf
#  Livy uses the Spark configuration under SPARK_HOME by default. You can override the Spark configuration by setting the SPARK_CONF_DIR environment variable 
export SAPRK_CONF_DIR=$SPARK_HOME/conf

./bin/livy-server

# Hortonworks
# /usr/hdp/<hdp_version>/livy2/bin/livy-server start

Configuration

LIVY_CONF_DIR is by default is the conf directory under the Livy installation. Example: /usr/bin/livy2/conf

The configuration of Livy goes through different file:

livy.conf

livy.repl.enableHiveContext=true
livy.server.port=8998
livy.server.recovery.mode=recovery
livy.server.recovery.state-store=zookeeper
livy.server.recovery.state-store.url=zk2:2181,zk3:2181,zk5:2181
livy.server.session.timeout=2073600000
livy.server.yarn.app-lookup-timeout=2m
livy.spark.master=yarn-cluster

where:

List of possible configuraton:

livy.rsc.rpc.max.size

livy-env-sh

 
#!/usr/bin/env bash

export SPARK_HOME=/usr/hdp/current/spark2-client
export HADOOP_CONF_DIR=/etc/hadoop/conf
export LIVY_LOG_DIR=/var/log/livy
export LIVY_MAX_LOG_FILES=30
export LIVY_PID_DIR=/var/run/livy
export LIVY_SERVER_JAVA_OPTS="-Xmx2g"
YARN_CLASSPATH=`yarn classpath`
export CLASSPATH="$YARN_CLASSPATH:/usr/lib/hdinsight-logging/*"
export LIVY_IDENT_STRING=livy

Log

The log is set in the environment variable LIVY_LOG_DIR

export LIVY_LOG_DIR=/var/log/livy

Process

The PID dir is found in the environment variable LIVY_PID_DIR

export LIVY_PID_DIR=/var/run/livy
ps -ef -q $(cat $LIVY_PID_DIR/livy-livy-server.pid)

ps -q $(cat $LIVY_PID_DIR/livy-livy-server.pid) -o comm=

Example of command:

/usr/lib/jvm/java-8-openjdk-amd64/bin/java -Xmx2g -cp /usr/bin/livy2/jars/*:/usr/bin/livy2/conf:/etc/hadoop/conf:/usr/hdp/2.6.2.25-1/hadoop/conf:/usr/hdp/2.6.2.25-1/hadoop/conf:/usr/hdp/2.6.2.25-1/hadoop/conf:/usr/hdp/2.6.2.25-1/hadoop/lib/*:/usr/hdp/2.6.2.25-1/hadoop/.//*:/usr/hdp/2.6.2.25-1/hadoop-hdfs/./:/usr/hdp/2.6.2.25-1/hadoop-hdfs/lib/*:/usr/hdp/2.6.2.25-1/hadoop-hdfs/.//*:/usr/hdp/2.6.2.25-1/hadoop-yarn/lib/*:/usr/hdp/2.6.2.25-1/hadoop-yarn/.//*:/usr/hdp/2.6.2.25-1/hadoop-mapreduce/lib/*:/usr/hdp/2.6.2.25-1/hadoop-mapreduce/.//*::/usr/hdp/current/hadoop-mapreduce-client/*:/usr/hdp/2.6.2.25-1/tez/*:/usr/hdp/2.6.2.25-1/tez/lib/*:/usr/hdp/2.6.2.25-1/tez/conf:/usr/hdp/current/hadoop-yarn-resourcemanager/.//*:/usr/hdp/current/hadoop-yarn-resourcemanager/lib/*:/usr/lib/hdinsight-logging/* com.cloudera.livy.server.LivyServer

API

See Rest API

Session

List

GET /sessions
{
	"from":0,
	"total":2,
	"sessions":
		[
			{"id":0,"appId":"application_1525336261063_0027","owner":null,"proxyUser":null,"state":"dead","kind":"spark","appInfo":{"driverLogUrl":null,"sparkUiUrl":"http://hn1:8088/cluster/app/application_1525336261063_0027"},"log":["YARN Diagnostics:","Application killed by user."]},
			{"id":1,"appId":"application_1525370262995_5114","owner":null,"proxyUser":null,"state":"dead","kind":"spark","appInfo":{"driverLogUrl":null,"sparkUiUrl":"http://hn1:8088/proxy/application_1525370262995_5114/"},"log":[]}
		]
}

GET /sessions/{sessionId}
{
  "id":1,
  "appId":"application_1525370262995_5114",
  "owner":null,
  "proxyUser":null,
  "state":"dead",
  "kind":"spark",
  "appInfo": {
    "driverLogUrl":null,
    "sparkUiUrl":"http://hn1:8088/proxy/application_1525370262995_5114/"
    },
  "log":[]
}

State

GET /sessions/{sessionId}/state
{
  "id":1,
  "state":"dead"
}

Create Session

POST /sessions
host = ‘http://localhost:8998’
# SparkShell
data = {‘kind’: ‘spark’}
headers = {‘Content-Type’: ‘application/json’}
r = requests.post(host + ‘/sessions’, data=json.dumps(data), headers=headers)
r.json()

where:

name description type
kind The session kind (required) session kind
proxyUser User to impersonate when starting the session string
jars jars to be used in this session List of string
pyFiles Python files to be used in this session List of string
files files to be used in this session List of string
driverMemory Amount of memory to use for the driver process string
driverCores Number of cores to use for the driver process int
executorMemory Amount of memory to use per executor process string
executorCores Number of cores to use for each executor int
numExecutors Number of executors to launch for this session int
archives Archives to be used in this session List of string
queue The name of the YARN queue to which submitted string
name The name of this session string
conf Spark configuration properties Map of key=val
heartbeatTimeoutInSecond Timeout in second to which session be orphaned int

Example of output:

{u’state’: u’starting’, u’id’: 0, u’kind’: u’spark’}

Kill

DELETE /sessions/{sessionId}

Log

Gets the log lines from this session.

GET /sessions/{sessionId}/log
{"id":1,"from":0,"total":0,"log":[]}

Statement

List

GET /sessions/{sessionId}/statements
GET /sessions/{sessionId}/statements/{statementId}

Submit

Runs a statement in a session.

POST /sessions/{sessionId}/statements
data = {‘code’: ‘sc.parallelize(1 to 10).count()’}
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
r.json()
{u’output’: None, u’state’: u’running’, u’id’: 0}

Get the result

GET /sessions/{sessionId}/statements/{statementId}
statement_url = host + r.headers[‘location’]

r = requests.get(statement_url, headers=headers)
pprint.pprint(r.json())
{u’id’: 0,
 u’output’: {u’data’: {u’text/plain’: u’res0: Long = 10′},
             u’execution_count’: 0,
             u’status’: u’ok’},
 u’state’: u’available’}

Cancel

POST /sessions/{sessionId}/statements/{statementId}/cancel

Batch

There is also a batch of statement. See: https://github.com/cloudera/livy#get-batches

Get

curl -k \
  --user "<user>:<user password>" \
  -v \
  -X GET \
  "https://hostname/livy/batches"
curl -k \
  --user "<user>:<user password>" \
  -v \
  -X GET \
  "https://hostname/livy/batches/{batchId}"

Delete

curl -k \
  --user "<user>:<user password>" \
  -v \
  -X DELETE \
  "https://hostname/livy/batches/{batchId}"

Submit application

Spark - Spark-submit

master and deploy mode are not in the data because they are configured in the configuration

Example: for data, see cloudera/livy

POST /batches
{
  “className”: “org.apache.spark.examples.SparkPi”,
  “executorMemory”: “20g”,
  “args”: [2000],
  “file”: “/path/to/examples.jar”
}

With curl:

curl -k \
  --user "<user>:<user password>" \
  -v \
  -H "Content-Type: application/json" \
  -X POST \
  -d '{ "file":"<path to application jar>", "className":"<classname in jar>" }' \
  # If the data (file, classname) are in a file
  # --data @C:\Temp\input.txt \
  -H "X-Requested-By: admin" \
  'https://hostname/livy/batches' 

where:

Documentation / Reference

Azure: