Home » Programming Languages » Python

Apache Flink in 10 Minutes


What is Flink?

Apache Flink is an open-source stream processing framework. It is widely used by a lot of companies like Uber, ResearchGate, Zalando. At its core, it is all about the processing of stream data coming from external sources. It may operate with state-of-the-art messaging frameworks like Apache Kafka, Apache NiFi, Amazon Kinesis Streams, RabbitMQ.


Let’s explore a simple Scala example of stream processing with Apache Flink. We’ll ingest sensor data from Apache Kafka in JSON format, parse it, filter, calculate the distance that sensor has passed over the last 5 seconds, and send the processed data back to Kafka to a different topic.

We’ll need to get data from Kafka – we’ll create a simple python-based Kafka producer. The code is in the appendix.


Apache Kafka 1.1.0, Apache Flink 1.4.2, Python 3.6, Kafka-python 1.4.2, SBT 1.1.0

Dive into code 

Now, let’s start with the skeleton of our Flink program. Every Apache Flink program needs an execution environment. In our case, it’s a StreamExecutionEnvironment.

val env = StreamExecutionEnvironment.getExecutionEnvironment

Then we need to create a Kafka Consumer. We’ll use FlinkKafkaConsumer010, but it is compatible with Kafka 1.1.0 we’re using. We’ll also need a Properties object to set the bootstrap server parameter.

val kafkaProperties = new Properties()kafkaProperties.setProperty("bootstrap.servers", "localhost:9092")val kafkaConsumer = new FlinkKafkaConsumer010(
new SimpleStringSchema,

Now, as we have a consumer, we will create a stream.

val kafkaStream = env.addSource(kafkaConsumer)

Now we need a way to parse JSON string. As Scala has no inbuilt functionality for that, we’ll use Play Framework.  First, we need a case class to parse our json strings into.

For simplicity, we will use automatic conversion from JSON strings to the JsonMessage. To transform elements in the stream we need to use .map transformation. The map transformation simply takes a single element as input and provides a single output. We’ll also have to filter the elements that failed to parse. The filter keeps only elements that satisfy a certain condition and then gets their values.

 Json.fromJson[JsonMessage](Json.parse(entry)))  .filter(_.isInstanceOf[JsSuccess[JsonMessage]])  .map(_.get)

 Now we want to discard all the entries having a bad status code. This means we need to filter the stream again.

To print the messages in the console, you may print() them.

Now we can key the messages by their IDs. We’ll use keyBy method.

Windowing splits the stream into pieces of the specified duration. We’ll split our stream into windows of 10 seconds.

Now we will start our calculations. First – we merge elements for a single key into a List using fold method with the initial value of empty List. The fold function simply returns a new List with another element added.

Now we need to calculate distances passed by each sensor. We’ll use a haversine formula borrowed from RosettaCode with a slight modification to calculate the distance between two points. We’ll also need a function that will calculate a sum of distances between pairs of points. We’ll call it reduceListOfJson.

 haversine(elem._1, elem._2)) // compute distance between two points    .fold(0D)(_ + _) // sum elements of list   Tuple2(data(0).id, distance)}

val reduced = folded.map(reduceListOfJson(_))


Now we can write the results to a different Kafka topic. So we need a producer.

Now we need to convert our filtered case classes back to JSON.

 Json.toJson(elem))  .map(Json.stringify(_))

Now we can add a sink for our data to pour into.

Now we can finally execute our code.

We’ll start Kafka with no prior setup as described in Kafka QuickStart documentation.

Terminal commands:

./bin/zookeeper-server-start.sh /config/zookeeper.properties./bin/kafka-server-start /config/server.properties
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
bin\windows\kafka-server-start.bat config\server.properties

We’ll run the example directly in SBT (Scala Build Tool). 

Terminal commands:

The output will look something like this – (id, distance_passed):

[info] (93,0.9212856531822278)

[info] (92,0.491515149785388)

[info] (66,0.1850938727551842)

[info] (50,0.7373954175193448)

[info] (40,0.7423638247950217)

[info] (33,0.6666665938578272)

[info] (30,0.5772569488523053)

[info] (28,0.7432206059930924)

[info] (24,0.3979697551971809)

[info] (14,0.7400059177386296)

The data received by Python Consumer looks like this:

ConsumerRecord(topic='distances', partition=0, offset=30411, timestamp=1524566590000, timestamp_type=0, key=None, value=b'[9,0.5525408534953374]', checksum=None, serialized_key_size=-1, serialized_value_size=22)ConsumerRecord(topic='distances', partition=0, offset=30412, timestamp=1524566590000, timestamp_type=0, key=None, value=b'[6,0.4918031179984027]', checksum=None, serialized_key_size=-1, serialized_value_size=22)ConsumerRecord(topic='distances', partition=0, offset=30413, timestamp=1524566590000, timestamp_type=0, key=None, value=b'[266,0.3624183394725044]', checksum=None, serialized_key_size=-1, serialized_value_size=24)ConsumerRecord(topic='distances', partition=0, offset=30414, timestamp=1524566590000, timestamp_type=0, key=None, value=b'[2,0.7131895333407061]', checksum=None, serialized_key_size=-1, serialized_value_size=22)

Complete code 

 haversine(elem._1, elem._2)) // compute distance between two points      .fold(0D)(_ + _) // sum elements of list    Tuple2(data(0).id, distance)  }

case class JsonMessage(lat: Double, long: Double, status: String, id: Int)
implicit val jsonMessageReads = Json.reads[JsonMessage]

def main(args: Array[String]) {

val env = StreamExecutionEnvironment.getExecutionEnvironment

val kafkaProperties = new Properties()
kafkaProperties.setProperty("bootstrap.servers", "localhost:9092")

val kafkaConsumer = new FlinkKafkaConsumer010(
new SimpleStringSchema,

val kafkaStream = env.addSource(kafkaConsumer)

val jsonStream = kafkaStream
.map(entry => Json.fromJson[JsonMessage](Json.parse(entry)))

val filteredJsonStream = jsonStream.filter(_.status == "ok")

val keyedJson = filteredJsonStream.keyBy(_.id)

val keyedWindowed = keyedJson.timeWindow(Time.seconds(10))

val folded = keyedWindowed.fold(initialValue = List[JsonMessage]())( _ :+ _ ).filter(_.nonEmpty)

val reduced = folded.map(reduceListOfJson(_))


val kafkaProducer = new FlinkKafkaProducer010[String](
new SimpleStringSchema,

reduced.map(elem => Json.toJson(elem)).map(Json.stringify(_)).addSink(kafkaProducer)


Python Kafka producer
import json
import requests
import time
import numpy

class message():
def __init__(self, identifier):
self.long = numpy.random.uniform(-180.0, 180.0)
self.lat = numpy.random.uniform(-90.0, 90.0)
self.id = identifier

def next_point(self):
return { # new points will be around the start
"long": self.long + numpy.random.uniform(-0.001, 0.001),
"lat": self.lat + numpy.random.uniform(-0.001, 0.001),
"status": numpy.random.choice(['ok', 'failed'], p = (0.9, 0.1)),
"id" : self.id
def __repr__(self):
return str({
"long": self.long,
"lat": self.lat,
"id" : self.id

messages = [message(i) for i in range(0, 500)] # create 500 messages

producer = KafkaProducer(
bootstrap_servers = ["localhost: 9092"],
value_serializer = lambda v: json.dumps(v).encode('utf-8')

while True:
for x in messages:
producer.send('sensors', x.next_point())

Python Kafka Consumer 

consumer = KafkaConsumer('geopoints_clean')
for msg in consumer:


In this post, we’ve made a brief overview of what Apache Flink is. We explored an example pipeline which included getting data from Apache Kafka, performing some data cleaning and aggregations, and then sending processed data to a different Kafka topic.