In this post, we will use a Flink local setup with savepoint configured, consuming from a local kafka instance. We also will have a very simple kafka producer to feed sequential numbers to kafka.
To check whether the savepointing is actually working, we will crucially stop the flink program, and restore it from the last savepoint, then check the consumed events is in consistency with the producer.
Setup local Kafka
To setup a local Kafka is pretty easy. Just follow the official guide, then you will have a zookeeper and Kafka instance in your local machine.
Start zookeeper:
Start Kafka:
Create our testing topic:
Create a script to feed event
As mentioned above, the producer is very simple. I’m going to use python to create this producer.
First, the Kafka dependency:
Then create a python script producer.py
like this:
Then run this script to start feeding events:
I recommend you also use a simple script to create a consumer to check whether the current setup is working. I highly recommend using logstash to run a simple consumer with config file like:
input {
kafka {
group_id => "flink-test"
topics => ["flink-test"]
bootstrap_servers => "localhost:9092"
}
}
output {
stdout {}
}
And start to consume by
But any other kind of consumer will easily do the job.
Setup Flink
First you will need to download the flink of the version you want/need. After download the package, unpack it. Then you will have everything you need to run flink on your machine.
Assume that
Java
andmvn
are already installed.
Setup local Flink cluster
This will be the tricky part.
First we need to change the config file: ./conf/flink-conf.yaml
.
In this file, you can specify basically all the configuration for the flink cluster. Like job manager heap size, task manager heap size. But in this post, we are going to focus on save point configurations.
Add following line to this file:
With this field specified, the flink will use this folder as the storage of save point files.
In real world, I believe people usually use HDFS with hdfs:///...
or S3 to store their save points.
In this experiment, we will use a local setup for flink cluster by running ./bin/start-local.sh
.
This script will read the configuration from ./conf/flink-conf.yaml
, which we just modified.
Then the script will start a job manager along with a task manager on localhost.
You could also change the number of slots in that task manager by modify the ./conf/flink-conf.yaml
,
but that’s not something we needed for the current topic. For now we can just run:
Prepare the job
Consider flink cluster is just distributed worker system with enpowered streaming ability. It is just a bunch of starving workers (and their supervisors) waiting for the job assignment.
Although we have set up the cluster, we still need to give it a job.
Here is a simple Flink job we will use to check out the save point functionality:
It will consume the events from our kafka, and write it to local files divided by minute.
Start experiment
To submit our job, first build the above project by:
There will be a fat jar under your target folder: <project-name>.<version>.jar
To submit this jar as our job, run:
Once after it starts running, you will find files start to be generated in the basePath
.
Restart the job without save point
After the job runs for a while, cancel the job in the flink UI.
Check the lastest finished file before cancellation, and find the last line of this file. In my experiment, it’s {"idx": 2056}
.
Then start the job again:
After a few minute, check first finished file after restart, and find the first line of this file. In my experiment, it’s {"idx": 2215}
.
This means, there are events missing when we restart job without savepoin.
Finished file is the file that have been checked by flink’s check point. It is file that without prefix
in-progress
or suffixpending
Initially, a file under writing is inin-progress
state. When this file stop being written for a while(can be specified in config), this file becomepending
. A checkpoint will turnpending
files to the finished files. Only finished file should be considered as the consistant output of flink. Otherwise you will get duplication. For more info about checkpointing, please check their official document.
Restart with save point
Let’s try save pointing. This time, we will create a save point before cancel the job.
Flink allows you to make save point by executing:
The <job-id>
can be found at the header of the job page in flink web UI.
After you run this command, flink will tell you the path to your save point file. Do record this path.
Then, we cancel the job, and check the lastest finished file before cancellation, and find the last line of this file. In my experiment, it’s {"idx": 4993}
.
Then we restart the job.
Because we want to restart from a save point, we need to specify the save point when we start the job:
After a few minute, check first finished file after restart, and find the first line of this file. In my experiment, it’s {"idx": 4994}
, which is consistant with the number before cancellation.
General thoughts
Flink’s save pointing is much easier than what I expect. The only thing I should constantly keep in mind is that we need record the save point file path carefully when we use crontab to create save points. Other than that, flink seems to handle all the data consistancy.
As part of the further experiment and research, I think it could be very useful if we try flink’s save point in the cluster setup with multiple task managers.