SELECT pageid FROM pageviews_original LIMIT 3;
Page_59 Page_92 Page_37 Page_30 Page_69 Page_27
==== Persistent Query ====
Unlike the non-persistent query above,
* Results from this query are written to the Kafka topic PAGEVIEWS_FEMALE.
* Queries will continuously run as KSQL applications until they are manually terminated. Exiting KSQL does not terminate persistent queries.
The query below enriches the pageviews STREAM by doing a LEFT JOIN with the users_original TABLE on the user ID, where a condition is met.
CREATE STREAM pageviews_femaleASSELECTusers_original.userid AS userid, pageid, regionid, genderFROMpageviews_originalLEFT JOIN users_originalONpageviews_original.userid = users_original.useridWHEREgender = 'FEMALE';
Message
Stream created and running
SHOW STREAMS;
Stream Name | Kafka Topic | Format
PAGEVIEWS_ORIGINAL | pageviews | DELIMITEDPAGEVIEWS_FEMALE | PAGEVIEWS_FEMALE | DELIMITED
* See the result. Ctrl+c will terminate the output to the console but not the query.
SELECT * FROM pageviews_female;
1508919557785 | User_9 | User_9 | Page_12 | Region_2 | FEMALE1508919560087 | User_7 | User_7 | Page_12 | Region_4 | FEMALE
Exiting KSQL does not terminate persistent queries.. They will continuously run as KSQL applications until they are manually terminated.
* Show
SHOW QUERIES;
Query ID | Kafka Topic | Query String
1 | PAGEVIEWS_FEMALE | CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM
* Terminate the query. If you want to see the data flowing in the next section with a standard consumer, don't do that now but at the end.
TERMINATE 1
Message
Query terminated.
==== Create a persistent query with a window ====
CREATE TABLE user_timeview ASSELECTuserid, sum( viewtime ) AS numusersFROMpageviews_originalWINDOW TUMBLING( size 30 second )GROUP BYuserid;
1508933520000 | User_9 : Window{start=1508933520000 end=-} | User_9 | 105625347430561508933520000 | User_2 : Window{start=1508933520000 end=-} | User_2 | 90536012100341508933520000 | User_5 : Window{start=1508933520000 end=-} | User_5 | 120714682997901508933520000 | User_6 : Window{start=1508933520000 end=-} | User_6 | 75446676670151508933520000 | User_7 : Window{start=1508933520000 end=-} | User_7 | 90536012086791508933520000 | User_6 : Window{start=1508933520000 end=-} | User_6 | 90536012124601508933520000 | User_3 : Window{start=1508933520000 end=-} | User_3 | 6035734154953
===== Seeing the topics created =====
* The query1 has created several topics:
* PAGEVIEWS_FEMALE
* and all ''ksql_query_*'' topcis.
docker-compose exec kafka binbashkafka-topics –list –zookeeper zookeeper:32181
PAGEVIEWS_FEMALE consumer_offsets
_confluent-metrics
_schemas
ksqlcommands ksql_query_1-KSTREAM-MAP-0000000011-repartition ksql_query_1-USERS_ORIGINAL_statestore-changelog ksql_query_1-USERS_ORIGINAL_statestore-repartition ksql_transient_6823967496794798700_1508920727934-USERS_ORIGINAL_statestore-changelog ksql_transient_6823967496794798700_1508920727934-USERS_ORIGINAL_statestore-repartition pageviews users
* The PAGEVIEWS_FEMALE topic
kafka-topics –topic PAGEVIEWS_FEMALE –zookeeper zookeeper:32181 –describe
Topic: PAGEVIEWS_FEMALE Partition: 0 Leader: 1 Replicas: 1 Isr: 1Topic: PAGEVIEWS_FEMALE Partition: 1 Leader: 1 Replicas: 1 Isr: 1Topic: PAGEVIEWS_FEMALE Partition: 2 Leader: 1 Replicas: 1 Isr: 1Topic: PAGEVIEWS_FEMALE Partition: 3 Leader: 1 Replicas: 1 Isr: 1
* Get the data with Kafka - Consumer
docker-compose exec kafka binbashkafka-console-consumer –bootstrap-server localhost:29092 –topic PAGEVIEWS_FEMALE –max-messages 20
User_8,Page_15,Region_5,FEMALE User_5,Page_74,Region_3,FEMALE User_5,Page_39,Region_3,FEMALE User_5,Page_56,Region_3,FEMALE User_5,Page_60,Region_3,FEMALE User_5,Page_12,Region_3,FEMALE User_5,Page_73,Region_3,FEMALE User_8,Page_91,Region_2,FEMALE User_2,Page_87,Region_8,FEMALE User_7,Page_21,Region_3,FEMALE User_2,Page_59,Region_8,FEMALE User_7,Page_54,Region_9,FEMALE User_8,Page_62,Region_6,FEMALE User_7,Page_45,Region_9,FEMALE User_8,Page_69,Region_6,FEMALE User_8,Page_19,Region_6,FEMALE User_8,Page_32,Region_6,FEMALE User_2,Page_71,Region_8,FEMALE User_2,Page_83,Region_8,FEMALE User_8,Page_40,Region_1,FEMALE Processed a total of 20 messages
===== Client Server mode =====
From https://github.com/confluentinc/ksql/issues/272
Start one or more servers using the following command:
$ ./bin/ksql-server-start /path/to/ksql_server1.properties</code.A sample properties file is as follows:
ksql.cluster.id=ksql_app_tests
application.id=ksql_app_tests
bootstrap.servers=localhost:9092
listeners=http://localhost:8080
Note that for each server you need a properties file and the listeners value should be unique.In a new terminal window start a cli and connect to one of the servers by passing it's listener address:
$ ./bin/ksql-cli remote http://localhost:8080
Embedded
in Stream API ..https://github.com/confluentinc/ksql/blob/0.1.x/ksql-examples/src/main/java/io/confluent/ksql/embedded/EmbeddedKsql.javaDocumentation / Reference