I was playing with Apache Flink several weeks ago. For exercise, I made a simple rule to detect malicious login pattern with Complex Event Processing (CEP). The rule is very simple, 3x login activities for the same username and IP in a minute. Then for the data source, I use Kafka.


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">






public class MaliciousLoginEvent {
    private String username;
    private String ip;
    private String time;


The data will be saved in JSON format in the Kafka topic. So I created a deserializer for JSON to MaliciousLoginEvent POJO.


public class MaliciousLoginDeserializationSchema 
    implements DeserializationSchema<MaliciousLoginEvent> {
    public MaliciousLoginEvent deserialize(byte[] bytes) 
        throws IOException {
        MaliciousLoginEvent maliciousLoginEvent = null;
        try {
            JSONObject jsonData = new JSONObject(new String(bytes));
            maliciousLoginEvent = new MaliciousLoginEvent();
        } catch (JSONException e) {
            System.out.println("Unable to deserialize Malicious Login Kafka Message");
        } finally {
            return maliciousLoginEvent;

    public boolean isEndOfStream(MaliciousLoginEvent maliciousLoginEvent) {
        return false;

    public TypeInformation<MaliciousLoginEvent> getProducedType() {
        return TypeExtractor.getForClass(MaliciousLoginEvent.class);

The main code does:

  1. Add streaming data source from Kafka topic (FailLogin)
  2. Create a malicious login pattern (3x failed with the same username & IP in a minute)
  3. Match the pattern with coming data
  4. Add sink to print out the malicious pattern


public class App {
    public static void main(String[] args) throws Exception {
        //set kafka properties
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");

        //set stream execution environtment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //stream kafka
        FlinkKafkaConsumer010<MaliciousLoginEvent> flinkKafkaConsumer = new FlinkKafkaConsumer010<MaliciousLoginEvent>("FailLogin", new MaliciousLoginDeserializationSchema(), properties);
        DataStream<MaliciousLoginEvent> kafkaInputStream = env.addSource(flinkKafkaConsumer).keyBy("username", "ip");

        //create pattern
        Pattern<MaliciousLoginEvent, ?> pattern = Pattern.<MaliciousLoginEvent>begin("firstAttempt")
        PatternStream<MaliciousLoginEvent> patternStream = CEP.pattern(kafkaInputStream, pattern);

        //add simple sink alarm
        DataStream<String> result = patternStream.select(new PatternSelectFunction<MaliciousLoginEvent, String>() {
            public String select(Map<String, List<MaliciousLoginEvent>> suspects) throws Exception {
                MaliciousLoginEvent firstAttempt = suspects.get("firstAttempt").get(0);
                MaliciousLoginEvent secondAttempt = suspects.get("secondAttempt").get(0);
                MaliciousLoginEvent thirdAttempt = suspects.get("thirdAttempt").get(0);
                String message = String.format("Suspected Login Activity! \n" +
                        "First event: %s \n" +
                        "Second event: %s \n" +
                        "Third event: %s \n", firstAttempt, secondAttempt, thirdAttempt);


                return message;

        env.execute("Mallicious Login Detector");

Then for testing purpose, I made a simple ruby code in Sinatra to simulate login request. When login failed, an event will be sent to a Kafka topic in JSON format.


require "sinatra"
require "kafka"
require "json"

user = { username: "wilianto", password: "12345678" }

post "/login" do
  username = params["username"]
  password = params["password"]
  ip       = params["ip"] # should be request.ip, just for testing purpose

  if user[:username] == username && user[:password] == password
    "Login success"
    event = {
      username: username,
      ip: ip,
      time: Time.now

    # push to kafka topic FailLogin
    kafka = Kafka.new(
      seed_brokers: ["localhost:9092"]
    kafka.deliver_message(JSON.dump(event), topic: "FailLogin")

    "Login failed"

Compile the CEP Flink apps and run the JAR file on Flink. Run the ruby login code and simulate some requests.

CEP Running on Flink>

Example Request:

curl -X POST \
  http://localhost:4567/login \
  -H 'cache-control: no-cache' \
  -H 'content-type: multipart/form-data; boundary=----WebKitFormBoundary7MA4YWxkTrZu0gW' \
  -H 'postman-token: cb6cf5f1-2361-ad2d-4d9b-addd7b71b474' \
  -F username=wilianto \
  -F password=wrongpassword \
  -F ip=

Example Output:

Suspected Login Activity!
First event: MaliciousLogin[username: wilianto, ip:, time: 2017-11-04 18:55:06 +0700]
Second event: MaliciousLogin[username: wilianto, ip:, time: 2017-11-04 18:55:12 +0700]
Third event: MaliciousLogin[username: wilianto, ip:, time: 2017-11-04 18:55:13 +0700]

Thanks for reading. Feel free to share what’s on your mind in the comment form below.