Real-time Malicious Login Detector with Apache Flink CEP
I was playing with Apache Flink several weeks ago. For exercise, I made a simple rule to detect malicious login pattern with Complex Event Processing (CEP). The rule is very simple, 3x login activities for the same username and IP in a minute. Then for the data source, I use Kafka.
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.wilianto.blog.login</groupId>
    <artifactId>cep-malicious-login</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.10</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.10</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_2.10</artifactId>
            <version>1.3.2</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.3</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.wilianto.blog.login.App</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
MaliciousLoginEvent.java
public class MaliciousLoginEvent {
    private String username;
    private String ip;
    private String time;
    //setter
    //getter
} 
The data will be saved in JSON format in the Kafka topic. So I created a deserializer for JSON to MaliciousLoginEvent POJO.
MaliciousLoginDeserializationSchema.java
public class MaliciousLoginDeserializationSchema 
    implements DeserializationSchema<MaliciousLoginEvent> {
    public MaliciousLoginEvent deserialize(byte[] bytes) 
        throws IOException {
        MaliciousLoginEvent maliciousLoginEvent = null;
        try {
            JSONObject jsonData = new JSONObject(new String(bytes));
            maliciousLoginEvent = new MaliciousLoginEvent();
            maliciousLoginEvent.setUsername(jsonData.getString("username"));
            maliciousLoginEvent.setIp(jsonData.getString("ip"));
            maliciousLoginEvent.setTime(jsonData.getString("time"));
        } catch (JSONException e) {
            System.out.println("Unable to deserialize Malicious Login Kafka Message");
        } finally {
            return maliciousLoginEvent;
        }
    }
    public boolean isEndOfStream(MaliciousLoginEvent maliciousLoginEvent) {
        return false;
    }
    public TypeInformation<MaliciousLoginEvent> getProducedType() {
        return TypeExtractor.getForClass(MaliciousLoginEvent.class);
    }
}
The main code does:
- Add streaming data source from Kafka topic (FailLogin)
- Create a malicious login pattern (3x failed with the same username & IP in a minute)
- Match the pattern with coming data
- Add sink to print out the malicious pattern
App.java
public class App {
    public static void main(String[] args) throws Exception {
        //set kafka properties
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        //set stream execution environtment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //stream kafka
        FlinkKafkaConsumer010<MaliciousLoginEvent> flinkKafkaConsumer = new FlinkKafkaConsumer010<MaliciousLoginEvent>("FailLogin", new MaliciousLoginDeserializationSchema(), properties);
        flinkKafkaConsumer.setStartFromLatest();
        DataStream<MaliciousLoginEvent> kafkaInputStream = env.addSource(flinkKafkaConsumer).keyBy("username", "ip");
        //create pattern
        Pattern<MaliciousLoginEvent, ?> pattern = Pattern.<MaliciousLoginEvent>begin("firstAttempt")
                .next("secondAttempt")
                .next("thirdAttempt")
                .within(Time.minutes(1));
        PatternStream<MaliciousLoginEvent> patternStream = CEP.pattern(kafkaInputStream, pattern);
        //add simple sink alarm
        DataStream<String> result = patternStream.select(new PatternSelectFunction<MaliciousLoginEvent, String>() {
            public String select(Map<String, List<MaliciousLoginEvent>> suspects) throws Exception {
                MaliciousLoginEvent firstAttempt = suspects.get("firstAttempt").get(0);
                MaliciousLoginEvent secondAttempt = suspects.get("secondAttempt").get(0);
                MaliciousLoginEvent thirdAttempt = suspects.get("thirdAttempt").get(0);
                String message = String.format("Suspected Login Activity! \n" +
                        "First event: %s \n" +
                        "Second event: %s \n" +
                        "Third event: %s \n", firstAttempt, secondAttempt, thirdAttempt);
                System.out.println(message);
                return message;
            }
        });
        //execute
        env.execute("Mallicious Login Detector");
    }
}
Then for testing purpose, I made a simple ruby code in Sinatra to simulate login request. When login failed, an event will be sent to a Kafka topic in JSON format.
login.rb
require "sinatra"
require "kafka"
require "json"
user = { username: "wilianto", password: "12345678" }
post "/login" do
  username = params["username"]
  password = params["password"]
  ip       = params["ip"] # should be request.ip, just for testing purpose
  if user[:username] == username && user[:password] == password
    "Login success"
  else
    event = {
      username: username,
      ip: ip,
      time: Time.now
    }
    # push to kafka topic FailLogin
    kafka = Kafka.new(
      seed_brokers: ["localhost:9092"]
    )
    kafka.deliver_message(JSON.dump(event), topic: "FailLogin")
    "Login failed"
  end
end
Compile the CEP Flink apps and run the JAR file on Flink. Run the ruby login code and simulate some requests.
 >
>
Example Request:
curl -X POST \
  http://localhost:4567/login \
  -H 'cache-control: no-cache' \
  -H 'content-type: multipart/form-data; boundary=----WebKitFormBoundary7MA4YWxkTrZu0gW' \
  -H 'postman-token: cb6cf5f1-2361-ad2d-4d9b-addd7b71b474' \
  -F username=wilianto \
  -F password=wrongpassword \
  -F ip=127.0.0.1
Example Output:
Suspected Login Activity!
First event: MaliciousLogin[username: wilianto, ip: 127.0.0.1, time: 2017-11-04 18:55:06 +0700]
Second event: MaliciousLogin[username: wilianto, ip: 127.0.0.1, time: 2017-11-04 18:55:12 +0700]
Third event: MaliciousLogin[username: wilianto, ip: 127.0.0.1, time: 2017-11-04 18:55:13 +0700]
Thanks for reading. Feel free to share what’s on your mind in the comment form below.