Kafka - Ksql

Kafka Commit Log Messaging Process

Without the LIMIT keyword, the SELECT query would run indefinitely until you stop it by pressing SELECT pageid FROM pageviews_original LIMIT 3;
Page_59           Page_92           Page_37           Page_30           Page_69           Page_27
==== Persistent Query ==== Unlike the non-persistent query above, * Results from this query are written to the Kafka topic PAGEVIEWS_FEMALE. * Queries will continuously run as KSQL applications until they are manually terminated. Exiting KSQL does not terminate persistent queries. The query below enriches the pageviews STREAM by doing a LEFT JOIN with the users_original TABLE on the user ID, where a condition is met. CREATE STREAM pageviews_femaleASSELECTusers_original.userid AS userid, pageid, regionid, genderFROMpageviews_originalLEFT JOIN users_originalONpageviews_original.userid = users_original.useridWHEREgender = 'FEMALE';
Message
Stream created and running
SHOW STREAMS;
Stream Name        | Kafka Topic      | Format
PAGEVIEWS_ORIGINAL | pageviews | DELIMITEDPAGEVIEWS_FEMALE | PAGEVIEWS_FEMALE | DELIMITED
* See the result. Ctrl+c will terminate the output to the console but not the query. SELECT * FROM pageviews_female;
1508919557785 | User_9 | User_9 | Page_12 | Region_2 | FEMALE1508919560087 | User_7 | User_7 | Page_12 | Region_4 | FEMALE
Exiting KSQL does not terminate persistent queries.. They will continuously run as KSQL applications until they are manually terminated. * Show SHOW QUERIES;
Query ID | Kafka Topic      | Query String
1 | PAGEVIEWS_FEMALE | CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM
* Terminate the query. If you want to see the data flowing in the next section with a standard consumer, don't do that now but at the end. TERMINATE 1
Message
Query terminated.
==== Create a persistent query with a window ==== CREATE TABLE user_timeview ASSELECTuserid, sum( viewtime ) AS numusersFROMpageviews_originalWINDOW TUMBLING( size 30 second )GROUP BYuserid;
1508933520000 | User_9 : Window{start=1508933520000 end=-} | User_9 | 105625347430561508933520000 | User_2 : Window{start=1508933520000 end=-} | User_2 | 90536012100341508933520000 | User_5 : Window{start=1508933520000 end=-} | User_5 | 120714682997901508933520000 | User_6 : Window{start=1508933520000 end=-} | User_6 | 75446676670151508933520000 | User_7 : Window{start=1508933520000 end=-} | User_7 | 90536012086791508933520000 | User_6 : Window{start=1508933520000 end=-} | User_6 | 90536012124601508933520000 | User_3 : Window{start=1508933520000 end=-} | User_3 | 6035734154953
===== Seeing the topics created ===== * The query1 has created several topics:
    * PAGEVIEWS_FEMALE                                                                       
    * and all ''ksql_query_*'' topcis.
docker-compose exec kafka binbashkafka-topics –list –zookeeper zookeeper:32181
PAGEVIEWS_FEMALE                                                                       consumer_offsets                                                                     
_confluent-metrics                                                                     
_schemas                                                                               
ksqlcommands                                                                         ksql_query_1-KSTREAM-MAP-0000000011-repartition                                        ksql_query_1-USERS_ORIGINAL_statestore-changelog                                       ksql_query_1-USERS_ORIGINAL_statestore-repartition                                     ksql_transient_6823967496794798700_1508920727934-USERS_ORIGINAL_statestore-changelog   ksql_transient_6823967496794798700_1508920727934-USERS_ORIGINAL_statestore-repartition pageviews                                                                              users
 * The PAGEVIEWS_FEMALE topic
kafka-topics –topic PAGEVIEWS_FEMALE –zookeeper zookeeper:32181 –describe
Topic: PAGEVIEWS_FEMALE Partition: 0    Leader: 1       Replicas: 1     Isr: 1Topic: PAGEVIEWS_FEMALE Partition: 1    Leader: 1       Replicas: 1     Isr: 1Topic: PAGEVIEWS_FEMALE Partition: 2    Leader: 1       Replicas: 1     Isr: 1Topic: PAGEVIEWS_FEMALE Partition: 3    Leader: 1       Replicas: 1     Isr: 1
* Get the data with Kafka - Consumer docker-compose exec kafka binbashkafka-console-consumer –bootstrap-server localhost:29092 –topic PAGEVIEWS_FEMALE –max-messages 20
User_8,Page_15,Region_5,FEMALE  User_5,Page_74,Region_3,FEMALE  User_5,Page_39,Region_3,FEMALE  User_5,Page_56,Region_3,FEMALE  User_5,Page_60,Region_3,FEMALE  User_5,Page_12,Region_3,FEMALE  User_5,Page_73,Region_3,FEMALE  User_8,Page_91,Region_2,FEMALE  User_2,Page_87,Region_8,FEMALE  User_7,Page_21,Region_3,FEMALE  User_2,Page_59,Region_8,FEMALE  User_7,Page_54,Region_9,FEMALE  User_8,Page_62,Region_6,FEMALE  User_7,Page_45,Region_9,FEMALE  User_8,Page_69,Region_6,FEMALE  User_8,Page_19,Region_6,FEMALE  User_8,Page_32,Region_6,FEMALE  User_2,Page_71,Region_8,FEMALE  User_2,Page_83,Region_8,FEMALE  User_8,Page_40,Region_1,FEMALE  Processed a total of 20 messages
===== Client Server mode ===== From https://github.com/confluentinc/ksql/issues/272 Start one or more servers using the following command: $ ./bin/ksql-server-start /path/to/ksql_server1.properties</code.A sample properties file is as follows:
ksql.cluster.id=ksql_app_tests
application.id=ksql_app_tests
bootstrap.servers=localhost:9092
listeners=http://localhost:8080
Note that for each server you need a properties file and the listeners value should be unique.In a new terminal window start a cli and connect to one of the servers by passing it's listener address:
$ ./bin/ksql-cli remote http://localhost:8080
Embedded
in Stream API ..https://github.com/confluentinc/ksql/blob/0.1.x/ksql-examples/src/main/java/io/confluent/ksql/embedded/EmbeddedKsql.java
Documentation / Reference





Discover More
Kafka Commit Log Messaging Process
Kafka - Client

Client in the sense of reading data from topics. Presto connector
Kafka Commit Log Messaging Process
SQL - Hopping Windows

Data is aggregated over a Hopping Window where you can define: the size of the window (for instance 5 seconds) the hop (when the window will advance) For a size of 5s and a hop of 1s, there will...



Share this page:
Follow us:
Task Runner