Kafka - Ksql
Table of Contents
About
Ksql is the Streaming SQL of Kafka.
You put your Ksql in a file and execute it as an application through KSQL
With KSQL, you can read data as either:
- a stream, where every update is independent of all others,
- or as a table, where every update is probably an update to a previous update into the stream.
Once you have streaming tables, you can join them together or do aggregations on them or at some point in the near future query these tables. As streams come in, the queries either produce more events or update the tables.” Relational databases have a similar concept called materialized views.
Kafka stores its tabular overlay in a distributed RocksDB datastore.
Articles Related
Concept
- A Stream is the raw data
- A table is a structure where each result will often get updated later by another message with the same key. If you aggregated raw data into a session windows, the results would be emitted as a table.
Joining together pure events (what's happening in the world) & tables (current state of the world) is at the heart of stream processing
Getting Started
Prerequisites
- Docker Compose above 1.9 to avoid ERROR: Interactive mode is not yet supported on Windows.
$ docker-compose version
docker-compose version 1.16.1, build 6d1ac219
docker-py version: 2.5.1
CPython version: 2.7.13
OpenSSL version: OpenSSL 1.0.2j 26 Sep 2016
Clone and start
git clone https://github.com/confluentinc/ksql.git
cd ksql/docs/quickstart/
- The docker compose start several machine and have in its configuration file moby as the name of the docker host machine.
docker-machine ls
NAME ACTIVE DRIVER STATE URL SWARM DOCKER ERRORS
default virtualbox Timeout
- If the name is the not the same as above, you need to edit the docker-compose.yml file to add or replace moby by your name. Below is an example with default
extra_hosts:
- "moby:127.0.0.1"
- "default:127.0.0.1"
- Start the services (docker machines).
docker-compose up -d # d to start it as a daemon
- Listing the services
docker-compose ps
Name Command State Ports
-------------------------------------------------------------------------------------------------------------------------------------
quickstart_kafka_1 /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp
quickstart_ksql-cli_1 perl -e while(1){ sleep 99 ... Up
quickstart_ksql-datagen-pageviews_1 bash -c echo Waiting for K ... Up
quickstart_ksql-datagen-users_1 bash -c echo Waiting for K ... Up
quickstart_schema-registry_1 /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp
quickstart_zookeeper_1 /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 0.0.0.0:32181->32181/tcp, 3888/tcp
- All the confluent command except for ksql are run inside the kafka service with bash
docker-compose exec kafka //bin//bash
[email protected]:/#
Data Model and Data Generation
The Compose file automatically runs a data generator that continuously produces data to two Kafka topics:
- pageviews
- and users
kafka-topics --zookeeper zookeeper:32181 --list | grep -iE 'users|pageviews'
pageviews
users
The topic
- pageviews has a key that is a mock time stamp and a value that is in DELIMITED format.
- users has a key that is the user ID and a value that is in Json format.
Pageviews with the standard consumer:
kafka-console-consumer \
--topic pageviews \
--bootstrap-server kafka:29092 \
--from-beginning \
--max-messages 3 \
--property print.key=true
1 1508932966835,User_6,Page_17
11 1508932967385,User_9,Page_15
21 1508932967416,User_3,Page_30
Processed a total of 3 messages
Starting the Ksql console
- In another shell:
docker-compose exec ksql-cli ksql-cli local --bootstrap-server kafka:29092
where:
- the first ksql-cli is the host name of the ksql-cli service as seen in the docker-compose.yml file
- the second ksql-cli is the console
======================================
= _ __ _____ ____ _ =
= | |/ // ____|/ __ \| | =
= | ' /| (___ | | | | | =
= | < \___ \| | | | | =
= | . \ ____) | |__| | |____ =
= |_|\_\_____/ \___\_\______| =
= =
= Streaming SQL Engine for Kafka =
Copyright 2017 Confluent Inc.
CLI v0.1, Server v0.1 located at http://localhost:9098
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
KSQL
Create a stream
- Create a stream
CREATE STREAM pageviews_original (
viewtime bigint,
userid varchar,
pageid varchar)
WITH (
kafka_topic='pageviews',
value_format='DELIMITED');
Message
----------------
Stream created
- describe it
DESCRIBE pageviews_original;
Field | Type
---------------------------
ROWTIME | BIGINT
ROWKEY | VARCHAR(STRING)
VIEWTIME | BIGINT
USERID | VARCHAR(STRING)
PAGEID | VARCHAR(STRING)
- Shows all stream
SHOW STREAMS;
Stream Name | Kafka Topic | Format
----------------------------------------------
PAGEVIEWS_ORIGINAL | pageviews | DELIMITED
Create a table
- Create a table
CREATE TABLE users_original (
registertime bigint,
gender varchar,
regionid varchar,
userid varchar)
WITH (
kafka_topic='users',
value_format='JSON');
Message
---------------
Table created
- Describe it
DESCRIBE users_original;
Field | Type
--------------------------------
ROWTIME | BIGINT
ROWKEY | VARCHAR(STRING)
REGISTERTIME | BIGINT
GENDER | VARCHAR(STRING)
REGIONID | VARCHAR(STRING)
USERID | VARCHAR(STRING)
- Shows all tables
SHOW TABLES;
Table Name | Kafka Topic | Format | Windowed
--------------------------------------------------
USERS_ORIGINAL | users | JSON | false
Query
By default KSQL reads the topics for streams and tables from the latest offset.
ksql> SELECT pageid FROM pageviews_original LIMIT 3;
Page_49
Page_93
Page_20
- Without the LIMIT keyword, the SELECT query would run indefinitely until you stop it by pressing <ctrl-c>
SELECT pageid FROM pageviews_original LIMIT 3;
Page_59
Page_92
Page_37
Page_30
Page_69
Page_27
^CQuery terminated
Persistent Query
Unlike the non-persistent query above,
- Results from this query are written to the Kafka topic PAGEVIEWS_FEMALE.
- Queries will continuously run as KSQL applications until they are manually terminated. Exiting KSQL does not terminate persistent queries.
The query below enriches the pageviews STREAM by doing a LEFT JOIN with the users_original TABLE on the user ID, where a condition is met.
CREATE STREAM pageviews_female
AS
SELECT
users_original.userid AS userid
, pageid
, regionid
, gender
FROM
pageviews_original
LEFT JOIN users_original
ON
pageviews_original.userid = users_original.userid
WHERE
gender = 'FEMALE';
Message
----------------------------
Stream created and running
SHOW STREAMS;
Stream Name | Kafka Topic | Format
---------------------------------------------------
PAGEVIEWS_ORIGINAL | pageviews | DELIMITED
PAGEVIEWS_FEMALE | PAGEVIEWS_FEMALE | DELIMITED
- See the result. Ctrl+c will terminate the output to the console but not the query.
SELECT * FROM pageviews_female;
1508919557785 | User_9 | User_9 | Page_12 | Region_2 | FEMALE
1508919560087 | User_7 | User_7 | Page_12 | Region_4 | FEMALE
^CQuery terminated
Exiting KSQL does not terminate persistent queries.. They will continuously run as KSQL applications until they are manually terminated.
- Show
SHOW QUERIES;
Query ID | Kafka Topic | Query String
------------------------------------------------------------------------------------------------------------------------------------------------------------
1 | PAGEVIEWS_FEMALE | CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM
- Terminate the query. <note tip>If you want to see the data flowing in the next section with a standard consumer, don't do that now but at the end.</note>
TERMINATE 1
Message
-------------------
Query terminated.
Create a persistent query with a window
CREATE TABLE user_timeview AS
SELECT
userid
, sum( viewtime ) AS numusers
FROM
pageviews_original
WINDOW TUMBLING( size 30 second )
GROUP BY
userid;
1508933520000 | User_9 : Window{start=1508933520000 end=-} | User_9 | 10562534743056
1508933520000 | User_2 : Window{start=1508933520000 end=-} | User_2 | 9053601210034
1508933520000 | User_5 : Window{start=1508933520000 end=-} | User_5 | 12071468299790
1508933520000 | User_6 : Window{start=1508933520000 end=-} | User_6 | 7544667667015
1508933520000 | User_7 : Window{start=1508933520000 end=-} | User_7 | 9053601208679
1508933520000 | User_6 : Window{start=1508933520000 end=-} | User_6 | 9053601212460
1508933520000 | User_3 : Window{start=1508933520000 end=-} | User_3 | 6035734154953
^CQuery terminated
Seeing the topics created
- The query1 has created several topics:
- PAGEVIEWS_FEMALE
- and all ksql_query_* topcis.
docker-compose exec kafka //bin//bash
kafka-topics --list --zookeeper zookeeper:32181
PAGEVIEWS_FEMALE
__consumer_offsets
_confluent-metrics
_schemas
ksql__commands
ksql_query_1-KSTREAM-MAP-0000000011-repartition
ksql_query_1-USERS_ORIGINAL_statestore-changelog
ksql_query_1-USERS_ORIGINAL_statestore-repartition
ksql_transient_6823967496794798700_1508920727934-USERS_ORIGINAL_statestore-changelog
ksql_transient_6823967496794798700_1508920727934-USERS_ORIGINAL_statestore-repartition
pageviews
users
- The PAGEVIEWS_FEMALE topic
kafka-topics --topic PAGEVIEWS_FEMALE --zookeeper zookeeper:32181 --describe
Topic: PAGEVIEWS_FEMALE Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: PAGEVIEWS_FEMALE Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: PAGEVIEWS_FEMALE Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: PAGEVIEWS_FEMALE Partition: 3 Leader: 1 Replicas: 1 Isr: 1
- Get the data with Kafka - Consumer
docker-compose exec kafka //bin//bash
kafka-console-consumer --bootstrap-server localhost:29092 --topic PAGEVIEWS_FEMALE --max-messages 20
User_8,Page_15,Region_5,FEMALE
User_5,Page_74,Region_3,FEMALE
User_5,Page_39,Region_3,FEMALE
User_5,Page_56,Region_3,FEMALE
User_5,Page_60,Region_3,FEMALE
User_5,Page_12,Region_3,FEMALE
User_5,Page_73,Region_3,FEMALE
User_8,Page_91,Region_2,FEMALE
User_2,Page_87,Region_8,FEMALE
User_7,Page_21,Region_3,FEMALE
User_2,Page_59,Region_8,FEMALE
User_7,Page_54,Region_9,FEMALE
User_8,Page_62,Region_6,FEMALE
User_7,Page_45,Region_9,FEMALE
User_8,Page_69,Region_6,FEMALE
User_8,Page_19,Region_6,FEMALE
User_8,Page_32,Region_6,FEMALE
User_2,Page_71,Region_8,FEMALE
User_2,Page_83,Region_8,FEMALE
User_8,Page_40,Region_1,FEMALE
Processed a total of 20 messages
Client Server mode
From https://github.com/confluentinc/ksql/issues/272
Start one or more servers using the following command:
$ ./bin/ksql-server-start /path/to/ksql_server1.properties
</code.
A sample properties file is as follows:
<code ini>
ksql.cluster.id=ksql_app_tests
application.id=ksql_app_tests
bootstrap.servers=localhost:9092
listeners=http://localhost:8080
Note that for each server you need a properties file and the listeners value should be unique.
In a new terminal window start a cli and connect to one of the servers by passing it's listener address:
$ ./bin/ksql-cli remote http://localhost:8080
Embedded
in Stream API ..
Documentation / Reference
- See also a precusor: http://www.landoop.com/kafka/kcql/