|
Note
|
This post originally appeared on the Decodable blog. |
The SQL Gateway in Apache Flink provides a way to run SQL in Flink from places other than the SQL Client. This includes using a JDBC Driver (which opens up a multitude of clients), a Hive client via the HiveServer2 endpoint , and directly against the REST Endpoint .
As I continue my journey with learning Flink SQL one thing I wondered was how one would go about submitting a Flink SQL job in a Production scenario. It seems to me that the SQL Gateway’s REST API would be a good candidate for this. You’d put the SQL code in a file under source control, and then use a deployment pipeline to submit that SQL against the endpoint. It’s worth noting that it’s recommended to use application mode when deploying production jobs to Flink—and the SQL Client and Gateway don’t support this yet. There is a FLIP under discussion but in the meantime if you want to use application mode you’d need to wrap your SQL in a JAR to deploy it. Either that, or continue to use SQL Client or Gateway and be aware of the limitations of running the job in session mode (basically, no resource isolation).
In this article I’ll show you how to use the endpoint, including exploring it with the Postman tool, using HTTPie to call the endpoint from the shell, and finishing off with a viable proof-of-concept script to execute statements from a script.
The API is documented and has two versions, each with their own OpenAPI YAML spec.
I’m going to look at v2 here.
Note that the docs say that the spec is still experimental.
To get started, let’s bring up the Flink cluster and the SQL Gateway locally:
./bin/start-cluster.sh
./bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=localhost
Here Comes the Postman 🔗
Postman is a handy tool for doing tons of useful stuff with APIs. Here I’m just using it to create the sample calls against the endpoints and to quickly understand how they relate to each other. You can use it on the web but assuming you’re using a local instance of Flink (or at least, one that is not publicly accessible) then you’ll want the desktop version. Note that you’ll still need to sign up for a free account to access the Import feature that we’re using here.
Under your Workspace click Import and paste in the URL of the SQL Gateway’s OpenAPI YAML file (https://nightlies.apache.org/flink/flink-docs-master/generated/rest_v2_sql_gateway.yml) which is linked to on the docs page .
Import it as a Postman Collection.
You’ll now see under the Postman collection a list of all the API endpoints, with pre-created calls for each.
Go to Environments → Globals and define baseUrl with the value for your SQL Gateway.
If you’re running it locally then this is going to be http://localhost:8083
Now go back to Collections and under the Flink SQL Gateway REST API folder find the get Info call.
Open this up and hit Send.
You should see a successful response like this:
You can also click the Code icon (</> ) to see the call in various different languages and tools including cURL and HTTPie.
For now this is not ground-breaking, but once you get onto payloads it’s really handy.
Just as we manually populated the global variable baseURL above, we can also take the response from one call and use it in the making of another.
This is really useful, because there are two variables that the REST API returns that we need to use (sessionHandle and operationHandle).
To do this in Postman add the following to the Tests tab of the request pane:
var jsonData = JSON.parse(responseBody);
postman.setEnvironmentVariable("sessionHandle", jsonData.sessionHandle);
That assumes that the variable to populate is called sessionHandle and it’s returned in the root key called sessionHandle of the response.
Which it is:
$ http --follow --timeout 3600 POST 'localhost:8083/sessions' \
Content-Type:'application/json' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 56
content-type: application/json; charset=UTF-8
{
"sessionHandle": "190edef5-df00-4182-be0e-431737b1e93b"
}
Once you’ve set a variable, you can use it in other calls by referencing it in double curly braces, like this:
I’ve shared a copy of my Postman collection with the above variable configuration done for you here
Let’s now go through the workflow of how we’d actually submit a SQL statement from scratch to the gateway.
Running a SQL Statement with the Flink SQL Gateway 🔗
In essence, the minimal steps are as follows. You can see the docs for more info.
-
Establish a Session (with optional configuration parameters set)
-
Submit a SQL Statement, which generates an Operation.
-
Check the status of the Operation until it’s complete
-
Fetch the results of the Operation.
Here’s how to do each one, using HTTPie as an example client and showing the response. I’m using bash variables to hold the values of session and operation handles.
0. Check the connection and Flink version 🔗
$ http --body --follow --timeout 3600 GET 'localhost:8083/info' \
Accept:'application/json'
{
"productName": "Apache Flink",
"version": "1.18.1"
}
1. Create a session 🔗
$ printf '{
"properties": {
"execution.runtime-mode": "batch"
}
}'| http --follow --timeout 3600 POST 'localhost:8083/sessions' \
Content-Type:'application/json' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 56
content-type: application/json; charset=UTF-8
{
"sessionHandle": "e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28"
}
$ export SESSIONHANDLE="e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28"
[Optional] Validate session and read session config 🔗
Note here the runtime-mode has been set from the properties that were passed above in the session creation.
$ http --ignore-stdin --form --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 2129
content-type: application/json; charset=UTF-8
{
"properties": {
"env.java.opts.all": "--add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNN
AMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=AL
L-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-op
ens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/ja
va.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.ut
il.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED",
"execution.attached": "true",
"execution.runtime-mode": "batch",
"execution.savepoint-restore-mode": "NO_CLAIM",
"execution.savepoint.ignore-unclaimed-state": "false",
"execution.shutdown-on-attached-exit": "false",
"execution.target": "remote",
"jobmanager.bind-host": "localhost",
"jobmanager.execution.failover-strategy": "region",
"jobmanager.memory.process.size": "1600m",
"jobmanager.rpc.address": "localhost",
"jobmanager.rpc.port": "6123",
"parallelism.default": "1",
"pipeline.classpaths": "",
"pipeline.jars": "file:/Users/rmoff/flink/flink-1.18.1/opt/flink-python-1.18.1.jar",
"rest.address": "localhost",
"rest.bind-address": "localhost",
"sql-gateway.endpoint.rest.address": "localhost",
"table.catalog-store.file.path": "./conf/catalogs",
"table.catalog-store.kind": "file",
"table.resources.download-dir": "/var/folders/7x/nscwrz557vlcd_ydgt7d5wt00000gn/T/sql-gateway-e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28",
"taskmanager.bind-host": "localhost",
"taskmanager.host": "localhost",
"taskmanager.memory.process.size": "1728m",
"taskmanager.numberOfTaskSlots": "1"
}
}
2. Submit a SQL statement 🔗
$ printf '{
"statement": "CREATE TABLE t_foo WITH ( '\''connector'\'' = '\''filesystem'\'', '\''path'\'' = '\''file:///tmp/flink-test'\'', '\''format'\'' = '\''csv'\'', '\''csv.field-delimiter'\'' = '\'','\'' ) AS SELECT name, COUNT(*) AS cnt FROM (VALUES ('\''Bob'\''), ('\''Alice'\''), ('\''Greg'\''), ('\''Bob'\'')) AS NameTable(name) GROUP BY name;"
}'| http --follow --timeout 3600 POST 'localhost:8083/sessions/'$SESSIONHANDLE'/statements' \
Content-Type:'application/json' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 58
content-type: application/json; charset=UTF-8
{
"operationHandle": "ba45649c-07b2-4b1c-a190-df3631b53549"
}
$ export OPERATIONHANDLE="ba45649c-07b2-4b1c-a190-df3631b53549"
3. Get Statement Execution Status 🔗
$ http --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/status' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 21
content-type: application/json; charset=UTF-8
{
"status": "FINISHED"
}
4. Get Results 🔗
$ http --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/result/0?rowFormat=JSON' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 483
content-type: application/json; charset=UTF-8
{
"resultType": "PAYLOAD",
"isQueryResult": false,
"jobID": "fb1a5f06643364bc82a9a4e0bd3e9c10",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": [
{
"kind": "INSERT",
"fields": [
"fb1a5f06643364bc82a9a4e0bd3e9c10"
]
}
]
},
"nextResultUri": "/v2/sessions/41ec5bb8-3574-4c6b-9b47-7bf9aa021ccc/operations/9bb84ff8-89a6-4f94-8dcc-e9125091c63b/result/1?rowFormat=JSON"
}
Because resultType is not EOS and there’s a value for nextResultUri it tells us there’s more to fetch - at the location specified in nextResultUri
{
"resultType": "EOS",
"isQueryResult": false,
"jobID": "fb1a5f06643364bc82a9a4e0bd3e9c10",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": []
}
}
5. Tidy up 🔗
Good practice is to close the session once you’ve finished with it:
$ http --follow --timeout 3600 DELETE 'localhost:8083/sessions/'$SESSIONHANDLE \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 19
content-type: application/json; charset=UTF-8
{
"status": "CLOSED"
}
Using a Shell script to execute Flink SQL 🔗
We can use all of the above and a bit of bash to script this:
host='localhost:8083'
SESSIONHANDLE=$(printf '{
"properties": {
"execution.runtime-mode": "batch"
}
}'| http --follow --timeout 3600 POST $host'/sessions' \
Content-Type:'application/json' \
Accept:'application/json' | jq -r '.sessionHandle')
echo "Got session handle: "$SESSIONHANDLE
SQL_STATEMENT_ONE_LINE=$(tr '\n' ' ' < rmoff.sql)
OPERATIONHANDLE=$(printf '{
"statement": "%s"
}' "$SQL_STATEMENT_ONE_LINE" | http --follow --timeout 3600 POST $host'/sessions/'$SESSIONHANDLE'/statements' \
Content-Type:'application/json' \
Accept:'application/json' | jq -r '.operationHandle')
echo "Got operation handle: "$OPERATIONHANDLE
while [ 1 -eq 1 ]
do
STATUS=$(http --follow --timeout 3600 GET $host'/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/status' \
Accept:'application/json' | jq -r '.status')
echo $STATUS
if [ $STATUS != "RUNNING" ]; then
break
fi
sleep 2
done
echo "\n\n----- 📃 RESULTS 📃 -----\n"
URL='/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/result/0?rowFormat=JSON'
while [ 1 -eq 1 ]
do
RESULT=$(http --follow --timeout 3600 GET $host$URL \
Accept:'application/json')
echo $RESULT | jq '.'
URL=$(echo $RESULT | jq -r '.nextResultUri // ""')
if [ -z $URL ]; then
break
fi
echo "(next result chunk 👇)"
done
echo "Closing session 🗑️"
http --follow --timeout 3600 DELETE $host'/sessions/'$SESSIONHANDLE
We’ll put the actual SQL into a file called rmoff.sql:
CREATE TABLE t_foo WITH (
'connector' = 'filesystem',
'path' = 'file:///tmp/flink-test',
'format' = 'csv',
'csv.field-delimiter' = ','
) AS SELECT name, COUNT(*) AS cnt FROM (
VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')
) AS NameTable(name) GROUP BY name;
Now when we run the shell script, we get this:
Got session handle: 8d7dc671-d7aa-4ddb-ba04-706b0311aa69
Got operation handle: 3aa41360-bd21-453a-a759-b54db69c81ae
RUNNING
FINISHED
----- 📃 RESULTS 📃 -----
{
"resultType": "PAYLOAD",
"isQueryResult": false,
"jobID": "615365befee24c53d1efa195f9d72eee",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": [
{
"kind": "INSERT",
"fields": [
"615365befee24c53d1efa195f9d72eee"
]
}
]
},
"nextResultUri": "/v2/sessions/8d7dc671-d7aa-4ddb-ba04-706b0311aa69/operations/3aa41360-bd21-453a-a759-b54db69c81ae/result/1?rowFormat=JSON"
}
(next result chunk 👇)
{
"resultType": "EOS",
"isQueryResult": false,
"jobID": "615365befee24c53d1efa195f9d72eee",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": []
}
}
Closing session 🗑️
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 19
content-type: application/json; charset=UTF-8
{
"status": "CLOSED"
}
The actual SQL we ran wrote a CSV file to the /tmp folder, so let’s go and check that it worked:
$ ls -lrt /tmp/flink-test && cat /tmp/flink-test/*
-rw-r--r--@ 1 rmoff wheel 21 7 Mar 18:07 part-f50c05ae-e39e-40c1-8b00-b1a1ebfced0d-task-0-file-0
Alice,1
Bob,2
Greg,1
Nice - it is exactly as expected.
Where Next? 🔗
If you want to learn more about Flink SQL you might be interested in understanding more about the role of the Catalog , hands-on examples of using the Catalog, or a deep dive into using JARs with Flink SQL .
