This post is for those who already understand the concept of stream processing and basic functionality of Kafka and Spark. And this post is also for people who are just dying to try Kafka and Spark right away. :)

Purpose

  • Setup a simple pipeline for stream processing in your local machine.
  • Integrate Spark consumer to the Kafka.
  • Implement a word frequency processing pipeline.

The versions of components in this post will be:

  • Kafka: 0.10.1.0 with scala 2.11
  • Spark streaming: 2.10
  • Spark streaming kafka: 2.10

Setup

Step 1 Install Kafka:

wget http://ftp.meisei-u.ac.jp/mirror/apache/dist/kafka/0.10.1.0/kafka_2.11-0.10.1.0.tgz
tar -xzf kafka_2.11-0.10.1.0.tgz
cd kafka_2.11-0.10.1.0

Step 2 Start a zookeeper for kafka

bin/zookeeper-server-start.sh config/zookeeper.properties

Step 3 Start kafka

bin/kafka-server-start.sh config/server.properties

Step 4 Create a topic test on kafka

bin/kafka-topics.sh \
--create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic test

Step 5 Program a spark consumer for word frequency using scala:

sbt file:

name := "spark-playground"

version := "1.0"

scalaVersion := "2.11.8"


libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"

libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.1.0"

libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.1.0"

main file:

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe


object AQuickExample extends App {
  val conf = new SparkConf().setMaster("local[2]").setAppName("AQuickExample")

  val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "localhost:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "spark-playground-group",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
  )
  val ssc = new StreamingContext(conf, Seconds(1))


  val inputStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](Array("test"), kafkaParams))
  val processedStream = inputStream
    .flatMap(record => record.value.split(" "))
    .map(x => (x, 1))
    .reduceByKey((x, y) => x + y)

  processedStream.print()
  ssc.start()
  ssc.awaitTermination()

}

Step 6 Start the Spark consumer you wrote in step 5.

sbt run

Step 7 Start a console kafka producer and fire some message to the kafka using the topic test.

Start the console producer:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

Send some message to the kafka:

hello world

Step 8 Once you send the message, there will be something showing up in your Spark consumer terminal representing the word frequency:

-------------------------------------------
Time: 1485135467000 ms
-------------------------------------------
(hello,1)
(world,1)

...

In this post I’m going to show how to import data into DynamoDB table.

Before we start, you show

  • Have a AWS account with access to the services we are gonna need.
  • Massive data that you badly want to import into a DynamoDB table.

Roughly, the steps to achieve our goal is preparing a file containing the data you want to insert into the table onto S3, and using Amazon Data Pipeline to import this S3 file to your DynamoDB Table.

Pretty simple and straight.

Step 1: Prepare the data.

Basically, your data is the content of items you want to insert into the table. So, for each item you wanna insert you put one line in json style to a file like this:

{"<key>":{"<type>":"<value>"},"<key>":{"<type>":"<value>"},"<key>":{"<type>":"<value>"}...}
{"<key>":{"<type>":"<value>"},"<key>":{"<type>":"<value>"},"<key>":{"<type>":"<value>"}...}

For example:

{"BicycleType":{"s":"Hybrid"},"Brand":{"s":"Brand-Company C"},"Price":{"n":"100000"}}
{"BicycleType":{"s":"Road"},"Brand":{"s":"Brand-Company B"},"Price":{"n":"19000"}}

The data you offer here must match the schema of existing DynamoDB table you want to import.

Note: the data format has been changed recently. The correct data format is like the one offered above, not the one shown at official website.

Step 2: Upload this file to your S3

This is simple enough, but you better put this file into a folder under the bucket.

Step 3: Create a data pipeline to import

Go to the AWS Data Pipeline Console, and click Create new pipeline or Get started now if you’re new to data pipeline.

Then you will be guided to the pipeline creation page.

In this page:

  1. Input Name of your pipeline.
  2. In Source, choose Import DynamoDB backup data from S3.
  3. Under the section of Parameters, in Input S3 folder select the folder containing your data file you just uploaded.
  4. In Target DynamoDB table name, input your table name.
  5. For DynamoDB write throughput ratio input how much you want to consume your capacity. (Recommend 0.9).
  6. In Region of the DynamoDB table, input the region of your table.
  7. For the section of Schedule, change the setting to meet your need. If you just want to import data for once, choose on a schedule for Run.
  8. Change the Logging setting.
  9. Hit the button Activate, and wait for execution.

The data pipeline will create a ec2 server to execute the import task, which will cause cost on you bill, be aware of that :)

The import will take some time, you can check the status of your import task in pipeline list.

The task will fail if the format of your data file is incorrect.

Objectives

  1. Use this pattern to build a load balance Akka system.
  2. Make each node in this pattern within the docker container, so that we can easily deploy new node.

Pattern overview

You can refer to this pattern in detail in this post.

(Or just look at this picture in that post)

But in general, this pattern has following features:

  • The system consists of one master and several workers.
  • The master receives tasks and send them to workers.
  • Workers work on and finish these tasks.
  • The master will track the status of worker**s, and only send task to *workers that is not currently working. (By doing so, we can achieve load balance.)
  • After worker finishes its task, it will send message back to the master, telling master that it is not working now.

To achieve our objectives, we are going to:

  • Make master into a container.
  • Make a group of workers into another container.
  • After we deploy above containers, we start another group of workers in another container. (To confirm the hot scale-out)

Implementation

As the features described above, writing master and worker is pretty straight forward. You can find detailed source code about this pattern at the bottom of that post.

Here is the system that I simplified:

TaskWorker

package actors

import actors.TaskMaster._
import actors.TaskWorker.WorkFinished
import akka.actor.{Actor, ActorLogging, ActorPath, ActorRef}


object TaskWorker {
  case class WorkFinished(isFinished: Boolean = true)
}

abstract class TaskWorker(masterPath: ActorPath) extends Actor with ActorLogging {

  val master = context.actorSelection(masterPath)

  override def preStart() = master ! NewWorker(self)

  // TODO: add postRestart function for better robustness

  def working(task: Any, masterRef: ActorRef): Receive = {
    case WorkFinished(isFinished) =>
      log.debug("Worker finished task.")
      masterRef ! TaskResponse(self, isFinished)
      context.become(idle)
  }



  def idle: Receive = {
    case TaskIncoming =>
      master ! TaskRequest(self)
    case TaskTicket(task, taskOwner) =>
      log.debug("Worker get task.")
      context.become(working(task, sender()))
      work(taskOwner, task)
    case TaskConsumedOut =>
      log.debug("Worker did not get any task.")

  }

  def receive = idle

  def finish(): Unit = {
    self ! WorkFinished(isFinished = true)
  }

  def fail(): Unit = {
    self ! WorkFinished(isFinished = false)
  }

  def work(taskOwner: Option[ActorRef], task: Any): Unit

  // define this method if you want to deal with the result of processing
  // def submitWork(workResult: WorkerResult): Unit

}

TaskMaster

package actors

import actors.TaskMaster._
import akka.actor.{Actor, ActorLogging, ActorRef}

import scala.collection.mutable

// companion object for protocol
object TaskMaster {
  case class NewWorker(worker: ActorRef)
  case class TaskList(taskList: List[Any])
  case class TaskIncoming()
  case class TaskResponse(worker: ActorRef, isFinished: Boolean=true)
  case class TaskRequest(worker: ActorRef)
  case class TaskConsumedOut()
  // Add original sender of task, so that worker can know who send this.
  // In that case, worker can directly send result of the task to the original sender
  // Change the type of `task` to apply more specified task, or you can define you own task.
  case class TaskTicket(task: Any, taskOwner: Option[ActorRef])
}


class TaskMaster extends Actor with ActorLogging {

  val taskQueue = mutable.Queue.empty[Object]
  val workers = mutable.Map.empty[ActorRef, Option[TaskTicket]]


  override def receive: Receive = {

    // when new worker spawns
    case NewWorker(worker) =>
      context.watch(worker)
      workers += (worker -> None)
      notifyFreeWorker()

    // when worker send task result back
    case TaskResponse(worker, isFinished) =>
      if (isFinished)
        log.debug(s"task is finished.")
      else
        log.debug(s"task failed to finish.")
      workers += (worker -> None)
      self ! TaskRequest(worker)

    // when worker want task
    case TaskRequest(worker) =>
      if (workers.contains(worker)) {
        if (taskQueue.isEmpty)
          worker ! TaskConsumedOut()
        else {
          if (workers(worker) == None) {
            val task = taskQueue.dequeue()
            assignTask(task, worker)
          } else {
            // this will never happen
            log.error("Some worker is requiring task while processing the task.")
          }
        }
      }

    // when receive tasks
    case TaskList(taskList: List[Object]) =>
      taskQueue.enqueue(taskList: _*)
      notifyFreeWorker()

  }

  def assignTask(task: Object, worker: ActorRef) = {
    workers += (worker -> Some(TaskTicket(task, Some(self))))
    worker ! TaskTicket(task, Some(self))
  }

  def notifyFreeWorker() = {
    if (taskQueue.nonEmpty)
      workers.foreach {
        case (worker, m) if m.isEmpty => worker ! TaskIncoming()
        case _ => log.error("Something wired in the worker map!")
      }
  }
}

Start the system

Our system should have two entries: one for starting the master, and the other is starting the worker to be connected to the master.

import java.util.UUID
import java.util.concurrent.TimeUnit

import akka.actor.{ActorPath, ActorSystem, Props}
import akka.routing.RoundRobinRouter
import com.typesafe.config.ConfigFactory
import actors._

import scala.concurrent.duration.Duration


object StartNode {

  val systemName = "load-balance"
  val systemConfigKey = "load-balance"
  val systemNodeConfigKey = "node"
  val masterNodeConfigKey = "master"
  val workerNodeConfigKey = "worker"

  val workerNodePopulationConfigKey = "population"

  val configHostKey = "host"
  val configPortKey = "port"


  val masterActorName = "master"

  // pattern used to get actor location
  val masterPathPattern = s"akka.tcp://$systemName@%s:%s/user/$masterActorName"

  def main(args: Array[String]): Unit = {

    if (args.length == 1) {
      args(0) match {
        case "worker" =>
          startAsWorkerGroup()
        case "master" =>
          startAsMaster()
        case _ =>
          println(s"Can not parse start mode: ${args(0)}")
      }
    } else {
      println(s"Please choose start mode.")
    }
  }

  def startAsWorkerGroup(): Unit = {

    println("Start actor system ...")
    val system = ActorSystem(systemName, ConfigFactory.load.getConfig(systemConfigKey))

    val workerNodeConfig = ConfigFactory.load.getConfig(systemNodeConfigKey).getConfig(workerNodeConfigKey)
    val masterNodeConfig = ConfigFactory.load.getConfig(systemNodeConfigKey).getConfig(masterNodeConfigKey)

    println("Parse worker config ...")
    // get worker config
    val workerCounter = workerNodeConfig.getInt(workerNodePopulationConfigKey)
    println("Connect to master ...")
    // connect to master

    val masterHost = masterNodeConfig.getString(configHostKey)
    val masterPort = masterNodeConfig.getInt(configPortKey)
    val masterLocation = masterPathPattern.format(masterHost, masterPort)
    println(s"to location $masterLocation")

    println("Connect to agent ...")

    val masterOpt = try {
      Option(system.actorSelection(masterLocation))
    } catch {
      case x: Throwable =>
        Option.empty
    }

    if (masterOpt.isEmpty) {
      println("Can not connect to master node!")
    } else {
      println("Worker Start!")
      val workerGroup = system.actorOf(Props(new DemoWorker(
        ActorPath.fromString(masterLocation)))
        .withRouter(RoundRobinRouter(workerCounter)))
    }
  }

  def startAsMaster(): Unit = {

    println("Master Start!")
    println("Parse master config ...")

    println("Start actor system ...")
    val system = ActorSystem(systemName, ConfigFactory.load.getConfig(systemConfigKey))

    println("Spawn master ...")
    val master = system.actorOf(Props(new TaskMaster()), name = masterActorName)

    /* Test this system with a scheduled task sending tasks to master */


    val scheduler = system.scheduler
    val task = new Runnable {
      def run() {
        // create random task
        val task = s"Task:${UUID.randomUUID().toString}"
        master ! List(task)
      }
    }

    implicit val executor = system.dispatcher
    scheduler.schedule(
      initialDelay = Duration(2, TimeUnit.SECONDS),
      interval = Duration(1, TimeUnit.SECONDS),
      runnable = task)
  }
}

Configuration

There are basically two kinds of things should be set in the config file.

  • The binding information of local actor system.
  • The remote actor system that is going to be connected.

Since the master can get worker’s location when worker send message to ask for connection, we can only set the location information of master.

A example of master actor:

load-balance-system { // system name
  akka {
    // settings of logging
    loglevel = "DEBUG"
    # change log handler
    loggers = ["akka.event.slf4j.Slf4jLogger"]

    scheduler {
      tick-duration = 50ms
      ticks-per-wheel = 128
    }

    // actor type: remote
    actor {
      provider = "akka.remote.RemoteActorRefProvider"
    }

    remote {
      enabled-transports = ["akka.remote.netty.tcp"]
      log-sent-messages = on
      log-received-messages = on

      // hostname and port of current system
      netty.tcp {
        // if the config is used for worker, then change the hostname and port to worker's host and port
        hostname = 127.0.0.1
        port = 2550
      }
    }
  }
}

node {
  master {
    id = "test"
    activity-chunk-size = 300

    host = "127.0.0.1"
    port = 2550
  }

  worker {
    host = "127.0.0.1"
    port = 2552
    population = 20
  }
}

Integrate with Docker

Make native package

Add sbt-native package to your project/plugins.sbt file.

// SBT - Native Packager Plugin
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "0.8.0-M2")

Add following line to your build.sbt file to specify the package type.

packageArchetype.java_server

Run following command to build your deploy package.

sbt clean stage universal:packageZipTarball

Dockerfile


FROM java:7-jre
MAINTAINER Chris Kong <chris.kong.cn@gmail.com>

RUN mkdir -p /home/kong
ENV USER_HOME /home/kong

# Copy your package to container
COPY target/universal/stage/bin $USER_HOME/bin
COPY target/universal/stage/lib $USER_HOME/lib

# Copy config file
COPY /mnt/application.conf $USER_HOME/conf/application.conf

# symlink log directory
RUN ln -sfn /mnt/logs $USER_HOME/logs
RUN ln -sfn /mnt/config $USER_HOME/conf

ENV LANG C.UTF-8
WORKDIR $USER_HOME

# container entrance
ENTRYPOINT [ "scala-akka-loadbalance-pattern-with-docker" ]

Build image

sbt clean stage universal:packageZipTarball
docker build -f Dockerfile -t load-balance-pattern

Run the container

There is only one thing to remember when start the container, that is using the host’s network interface by specifying --net=host.

For master:

docker run --name=load-balance-master -d \
--net=host \
-v <your-log-folder>:/mnt/logs \
-v <your-master-config-file>:/mnt/config \
load-balance-pattern master

For worker:

docker run --name=load-balance-worker -d \
--net=host \
-v <your-log-folder>:/mnt/logs \
-v <your-worker-config-file>:/mnt/config \
load-balance-pattern worker

When above commands are executed, you will see the message indicating that workers has been connected to the master in the master’s log file.

And that is how the use docker and Akka to build a load balanced distributed system.

Please make sure your master and workers are in the same VPC network when deploy onto AWS’s EC2 servers.

Improvements

  • Currently, deployments need different config files, which is annoying. You can use --env-file and -e to forge a config file inside the container, so that role-based config is no longer needed. Only a config template will be needed.
  • It is ok that we add worker node to our master node on runtime. But when we try to disconnect worker node from master node, there will be something wrong. We can add a mechanism to detect dying worker and disconnect them.

You can find above source code at: https://github.com/visualskyrim/scala-akka-loadbalance-pattern-with-docker

UPDATE 2016-02-03: Change of the opinion of deviding table into smaller ones.

BigQuery is a very powerful data analytics service. A very heavy query on massive data may only take seconds in BigQuery. Among all the interesting features of BigQuery, one thing you should most keep in mind, is the way BigQuery charge your money.

How does BQ charge you

In general, it charges you for the size of the data you evaluate in your query. The more data your query evaluate, the poorer BQ will get you.

One thing that will draw attention is what kind of operation will be considered as EVALUATE?

Basically, every column mentioned in your query will be considered as evaluated data.

Let’s see. For example, I have a table with around 1,000,000 rows’ data, which is around 500 MB in total.

First, let’s fetch all the data from the table:

SELECT * FROM [ds.fat_table]

Because this query directly fetches all the data from the table, the size of data will be charged is, without doubt, 500MB.

If we only fetch a part of the columns, the cost will dramatically fall.

SELECT col1, col2 FROM [ds.fat_table]

According to the actual total size of data of col1 and col2, BQ will just charges you the size of data you fetched. If the data of col1 and col2 in this table is 150MB only, this query will charge you 150MB.

But the size of evaluated data is not only the size of fetched data. Any column appears in WHERE, GROUP BY, ORDER BY, PARTITION BY or any other functions, will be considered as “evaluated”.

SELECT col1, col2 FROM [ds.fat_table]
WHERE col3 IS NOT NULL

The charged size of data is not just the total size of data in col1 and col2, but the total size of col1, col2, col3, because col3 is in the WHERE clause. If the data in col3 is 30MB, then the query will charge you 180MB in total.

Note all the data in col3 will be charged, not just the part that IS NOT NULL.

Actually, BigQuery did optimize the pricing for us. If there is some columns is not necessary in the SELECT list of a sub-query, BigQuery will not charge you for that.

SELECT col1, col2
FROM
  (
  SELECT * FROM [ds.fat_table]
  ) AS sub_table
WHERE col3 IS NOT NULL

As you can see, even if we selected all the columns in the table, BQ will not charge all of them. In this case, only the data of col1, col2, col3 will be charged.

Tips on optimizing charge

After knowing how BigQuery would charge us, it will be clear on how to save our money.

Here is some tips I concluded from the experience of using BigQuery.

Tip 0: Make sure you need BigQuery

This is not actually a tip, but a very important point before you start to use BQ.

In general, BigQuery is designed to deal with massive data, and of course designed to charge from processing the massive data.

If you don’t have the data that can’t be processed by your current tech, you could just keep using MySQL, SPSS, R, or even Excel.

Because, well, they’re relatively cheap.

Tip 1: Do use wildcard function

This will be your very first step to save money on BigQuery.

Let’s say, we have a log_table to store all the access log lines on the server.

If I want to do some query on only one or two day’s log, for instance, get the users who accessed to the server in certain days, there is nothing I can do rather that:

SELECT user_id
FROM
  ds.log_table
WHERE DATE(access_time) = '2016-01-01' or DATE(access_time) = '2016-01-02'
GROUP BY user_id

Well, never do this.

If you collected years of logs in this table, this one single query will make you cry.

The better solution, the correct one, is to import the log of each day into different tables with a date suffix in the first place, like following:

log_table20160101
log_table20160102
log_table20160103
log_table20160104

By doing this, the table wildcard function will allow you to select from a certain range of table:

SELECT user_id
FROM
  TABLE_DATE_RANGE(ds.log_table, TIMESTAMP('2016-01-01'), TIMESTAMP('2016-01-02'))
GROUP BY user_id

Now you will not be charged of the data of dates that you don’t want, and even the data of access_time will not be charged.

DO USE THIS PATTERN, this is really import if you have some date will be kept importing day by day.

Tip 2: Don’t use query when you can actually use meta data

This is the most specific tip, but really should be considered before you design the tables.

For example, if you have a table to store the created users, and you want to know the number of created users in each day, don’t do following:

SELECT created_date, COUNT(user_id) AS user_number
FROM
  (
  SELECT DATE(created_time) AS created_date, user_id
  FROM ds.users
  )
GROUP BY  created_date

This would cost you tons of coin if your website have, sadly, millions of users, while you can actually achieve this free.

You can just:

  1. Import newly created users of each day into different tables following wildcard pattern.
  2. Call API to get the number of rows in each table, which is free.

There are many usecase just like this: the sessions of each day, the transaction of each day, the activities of each day.

They are all free.

Tip 3: Separate tables carefully

Maybe after the first tip, you may think: “Yeah, query against the data that I only need as much as possible right? Let’s separate our table as tiny as possible lol.”

That’s a trap.

BigQuery does charge of the size of data we use in the query, BUT, in the unit of 10MB. Meaning, even if I have query query that only uses 1kb of data, this query will be charged 10MB.

For example, if you are running a B2B service which consists hundreds of shops in your system. So, you don’t just separate your table by date, you also separate the table by shop with shop_id.

So you have:

access_log_shop001_20160101
access_log_shop002_20160101
access_log_shop001_20160102
access_log_shop002_20160102

But in each table you only have less than 1MB data. If you have to use this access_log table to generate another kind of tables separated by shop, you will be charged by 10MB * <num-of-shops> * <number-of-days>, while if you don’t separate access_log table by shop you will be charged 1MB * <num-of-shops> * <number-of-days> in the worst case.

So, is it bad idea to separate tables other than dates? NO, it depends.

As you can see, if the size of access_log for each shop on each day is around 10MB or even bigger, this design is actually perfect.

So, whether separate further depends on the size of your data. The point is to know the granularities of your data before you make the decision.

Update 2016-02-03:

Actually, dividing table by not only date but also some other id will make BigQuery not functioning well.

According to my own experience, I have 20000+ tables in one dataset separated by both dates and a id. The tables look like [table_name]_[extra_id]_[date_YYmmdd].

And following issues will truly happen:

  • BigQuery’s API call will fail with InternalException occasionally. This will happen to the API call used to create empty table with given schema.
  • There are following three kinds of time record in each query you have fired. Usually the interval between first one and second one is less than 1 second. When you have a lot of tables like my case, the gap between creationTime and startTime could last 20 seconds.
    • creationTime: The time when your request arrives the BQ server.
    • startTime: The time BQ server to start execute the Query.
    • endTime: The time that BQ server finishes you query or finds it invalid.
  • BigQuery start to be not able to find your table. There will be query failing because of errors like FROM clause with table wildcards matches no table or **Not found: Table **, while you know exactly the tables do exist. And when you run the failed query again without fixing anything, the query will dramatically succeed.

So, I STRONGLY recommend not to use too many tables in the same dataset.

Tip 4: Think, before use mid-table

The mid-table is the table you use to store the temporary result.

Do use it when:

There will be more than one queries going to use this mid-table.

Let’s say, by doing query A, you get the result you want. But there is another query B using the sub-query C within query A.

If don’t use mid-table, you will be charged by:

  • sub-query C
  • remaining query in A after sub-query C
  • sub-query C
  • remaining query in B after sub-query C

But if you use mid-table to store the result of sub-query C, you will be charged by:

  • sub-query C
  • remaining query in A after sub-query C
  • remaining query in B after sub-query C

By doing this, you can save at least the money cost by the data of the size of the mid-table.

Do NOT use it when:

If you only fear your query is too long, while there is no other query using the mid-table you generate.

Not all people will agree on this. Some people may start to say “This is too nasty”, “God, this query is too fat.” or even “WTF is this?”.

Well, generate mid-table in this situation surely charging you more. I will just choose save more of my money.

And if you format your query well, no matter how long your query grows, it will still looks nice.

_

So, these tips are some thing I concluded these days, hope they help you, and save your money.

Currently I was working on analytics with BigQuery. It was quite a experience that you could accomplish any process on mass data without considering any subtle performance issue.

The greatness and the thinking of the internal implementation of BigQuery is not going to be discussed here.

Here I’m going to show how to use BigQuery to Calculate users’ access session from your access log with one query.

First, let’s define the session.

A session is a sequence of a user’s access. Each access in the sequence is no more than a specific period of time before the next access.

The length of a session is the period of time after the first access in this session and the last access in this session.

Then, we need to describe the table which stores the data from the access log. At least we should have following two column in this table:

  1. access_time: a timestamp that represents the time when access happen. In our case, let’s say 30 minute.
  2. user_id.

And now we could start to make the query step by step:

Step 1: preprocess the log table:

In this step, we do some type converting to make the calculation in the following steps easier:

SELECT
  user_id,
  TIMESTAMP_TO_SEC(access_time) AS access_time_sec, -- Convert timestamp to seconds
FROM
  ds.access_log_table -- This is the table where you put your log data into

Step 2: find the last access to the current access

After last step, we now have every access for each row. In this step, we are going to add a column to represent last access to the access in each row.

SELECT
  user_id,
  access_time_sec,
  -- The lag + partition + order combination is to get the previous access to the current access in the row
  LAG(access_time_sec, 1) OVER (PARTITION BY user_id ORDER BY access_time_sec) AS prev_access_time_sec
FROM
  (
-- previous query
-- SELECT
--   user_id,
--   TIMESTAMP_TO_SEC(access_time) AS access_time_sec, -- Convert timestamp to seconds
-- FROM
--   ds.access_log_table -- This is the table where you put your log data into
-- previous query end
  )

The LAG function is used with PARTITION BY, which is used to get the specific column of the previous row in the same partition.

The partition is separated by PARTITION BY function, while the order within the partition is specified by ORDER BY inside the OVER statement.

In this case, we separate all the accesses in the partitions for each user by

PARTITION BY user_id

Then, to make sure the order of time in each partition, we order each partition with:

PARTITION BY user_id ORDER BY access_time_sec

To get the access_time_sec of previous row, we do:

LAG(access_time_sec, 1) OVER (PARTITION BY user_id ORDER BY access_time_sec) AS prev_access_time_sec

More details about function LAG, you can refer to the Doc.

Of course, like what you might be thinking of right now, for each partition, the first row in each partition doesn’t have the prev_access_time_sec`. In the result, it will be **null at this point.

Leave it like that, and we will deal with it later.

After this step, we will get the result like:

| user_id | access_time_sec | prev_access_time_sec |
|---------|-----------------|----------------------|
|         |                 |                      |

Step 3: Decide whether an access is the beginning of a session

In this step, we are gonna tag each access whether it is the beginning of sessions. And as we said already, a session will break if the next session is 30 minute after current session.

To achieve that, we do:

SELECT
  user_id,
  access_time_sec,
  prev_access_time_sec,
  IFNULL -- The first access of each partition is the beginning of session by default.
    (
      access_time_sec - prev_access_time_sec >= 30 * 60,
      TRUE
    ) AS start_of_session
FROM
  (
--  previous query
--  SELECT
--    user_id,
--    access_time_sec,
--    -- The lag + partition + order combination is to get the previous access to the current access in the row
--    LAG(access_time_sec, 1) OVER (PARTITION BY user_id ORDER BY access_time_sec) AS prev_access_time_sec
--  FROM
--    (
--    SELECT
--    user_id,
--    TIMESTAMP_TO_SEC(access_time) AS access_time_sec, -- Convert timestamp to seconds
--    FROM
--      ds.access_log_table -- This is the table where you put your log data into
--      previous query end
--    )
  )

As we just said, the first access in each partition can only have prev_access_time_sec as null. So they’re considered as the beginning of session by default.

Now we have result like:

| user_id | access_time_sec | prev_access_time_sec | start_of_session |
|---------|-----------------|----------------------|------------------|
|         |                 |                      |                  |

Step 4: Decide whether the access is the end of session

Things become complex from here.

To achieve the goal of this step, we take two select s.

First we tag each row(access) with whether the next access is the beginning of the session in the partition with the same user_id:

SELECT
  user_id,
  access_time_sec,
  prev_access_time_sec,
  LEAD(start_of_session, 1) OVER (PARTITION BY user_id ORDER BY access_time_sec, prev_access_time_sec) is_next_access_sos
FROM
  (
-- previous query
-- SELECT
--   user_id,
--   access_time_sec,
--   prev_access_time_sec,
--   IFNULL -- The first access of each partition is the beginning of session by default.
--     (
--       access_time_sec - prev_access_time_sec >= 30 * 60,
--       TRUE
--     ) AS start_of_session
-- FROM
--   (
--   SELECT
--     user_id,
--     access_time_sec,
--     -- The lag + partition + order combination is to get the previous access to the current access in the row
--     LAG(access_time_sec, 1) OVER (PARTITION BY user_id ORDER BY access_time_sec) AS prev_access_time_sec
--   FROM
--     (
--     SELECT
--     user_id,
--     TIMESTAMP_TO_SEC(access_time) AS access_time_sec, -- Convert timestamp to seconds
--     FROM
--       ds.access_log_table -- This is the table where you put your log data into
--       previous query end
--     )
--   )
  )

Now we know for each access of each user, whether the next access is the beginning of the next session.

The reason why we need to know this, is, let’s consider this:

Let’s say there is one partition, which is already ordered by access_time, for a user like this following

| user_id | access_time_sec | prev_access_time_sec | start_of_session | is_next_access_sos |
|---------|-----------------|----------------------|------------------|--------------------|
|         |                 |                      | true             | false              |
|         |                 |                      | false            | true               |
|         |                 |                      | true             | true               |
|         |                 |                      | false            | false              |
|         |                 |                      | false            | false              |
|         |                 |                      | false            | true               |
|         |                 |                      | true             | null               |

The combination of start_of_session and is_next_access_sos and the meaning behind at this point must by one of the following:

| start_of_session | is_next_access_sos | this_access_must_be                                                                                        |
|------------------|--------------------|------------------------------------------------------------------------------------------------------------|
| true             | false              | the first access in the session with number of access >= 2                                                 |
| true             | true               | the only access in the session                                                                             |
| true             | null               | the only access in the session, and the last access in the partition                                       |
| false            | true               | the last access in the session with number of access >= 2                                                  |
| false            | false              | this access is not the first access nor the last in the session with number of access >= 3                 |
| false            | null               | the last access in the session with number of access >=2, and this access is the last one in the partition |

Knowing this, we can easily get whether an access is the last access in the partition.

SELECT
  user_id,
  access_time_sec,
  prev_access_time_sec,
  start_of_session,
  ISNULL(is_next_access_sos, TRUE) AS end_of_session -- if an access is the end of the partition, it must be the end of the session
FROM
  (
  SELECT
    user_id,
    access_time_sec,
    prev_access_time_sec,
    start_of_session,
    LEAD(start_of_session, 1) OVER (PARTITION BY user_id ORDER BY access_time_sec, prev_access_time_sec) is_next_access_sos
  FROM
    (
--  previous query
--  SELECT
--    user_id,
--    access_time_sec,
--    prev_access_time_sec,
--    IFNULL -- The first access of each partition is the beginning of session by default.
--      (
--        access_time_sec - prev_access_time_sec >= 30 * 60,
--        TRUE
--      ) AS start_of_session
--  FROM
--    (
--    SELECT
--      user_id,
--      access_time_sec,
--      -- The lag + partition + order combination is to get the previous access to the current access in the row
--      LAG(access_time_sec, 1) OVER (PARTITION BY user_id ORDER BY access_time_sec) AS prev_access_time_sec
--    FROM
--      (
--      SELECT
--      user_id,
--      TIMESTAMP_TO_SEC(access_time) AS access_time_sec, -- Convert timestamp to seconds
--      FROM
--        ds.access_log_table -- This is the table where you put your log data into
--        previous query end
--      )
--    )
    )
  )
WHERE
  start_of_session OR is_next_access_sos IS NULL OR is_next_access_sos -- only get the start or the end of session in the result

After this step, we get all the accesses that are either the start or the end of a session in the result.

| user_id | access_time_sec | prev_access_time_sec | start_of_session | end_of_session |
|---------|-----------------|----------------------|------------------|----------------|
|         |                 |                      |                  |                |

Step 5: Get sessions

We are really close now.

We have all the start and end access of sessions in our result set (some of them are both start and end).

It’s already very clear about how to get all the sessions now:

  • A session must have a start access
  • Then we rule out accesses that are not start access
  • We get all the sessions

But that’s not enough, nor fun. Now let’s say we also need to know the duration of sessions. That sounds fun enough.

Let’s do the same trick again. Remember, now we only have start and end access in the result of previous query.

So let’s get the access time of previous access for each access in each partition with same user_id:

SELECT
  user_id,
  access_time_sec,
  LAG(access_time_sec, 1) OVER (PARTITION BY user_id ORDER BY access_time_sec, prev_access_time_sec) AS prev_access_time_sec_2
  start_of_session,
  end_of_session
FROM
  (
--  previous query
--  SELECT
--    user_id,
--    access_time_sec,
--    prev_access_time_sec,
--    start_of_session,
--    ISNULL(is_next_access_sos, TRUE) AS end_of_session -- if an access is the end of the partition, it must be the end of the session
--  FROM
--    (
--    SELECT
--      user_id,
--      access_time_sec,
--      prev_access_time_sec,
--      start_of_session,
--      LEAD(start_of_session, 1) OVER (PARTITION BY user_id ORDER BY access_time_sec, prev_access_time_sec) is_next_access_sos
--    FROM
--      (
--      SELECT
--        user_id,
--        access_time_sec,
--        prev_access_time_sec,
--        IFNULL -- The first access of each partition is the beginning of session by default.
--          (
--            access_time_sec - prev_access_time_sec >= 30 * 60,
--            TRUE
--          ) AS start_of_session
--      FROM
--        (
--        SELECT
--          user_id,
--          access_time_sec,
--          -- The lag + partition + order combination is to get the previous access to the current access in the row
--          LAG(access_time_sec, 1) OVER (PARTITION BY user_id ORDER BY access_time_sec) AS prev_access_time_sec
--        FROM
--          (
--          SELECT
--          user_id,
--          TIMESTAMP_TO_SEC(access_time) AS access_time_sec, -- Convert timestamp to seconds
--          FROM
--            ds.access_log_table -- This is the table where you put your log data into
--            previous query end
--          )
--        )
--      )
--    )
--  WHERE
--    start_of_session OR is_next_access_sos IS NULL OR is_next_access_sos -- only get the start or the end of session in the result
  )

Because of we only have start and end access in the result of previous query, the previous access time of each access must be either of following:

  • If the access is the start of session: this previous access time must be the end time of the previous session
  • If the access is not the start of session: this previous access time must be the the start time of the current session <– this is what we need

Be careful when think this through, there might be access that is both the start and the end of the session.

When we are clear with this, we can get all the sessions with duration:

SELECT
  user_id,
  CASE
    WHEN NOT start_of_session AND end_of_session THEN access_time_sec - session_start_sec
    WHEN start_of_session AND end_of_session THEN 0  -- for sessions that only have one access
  END AS duration,
  CASE
    WHEN NOT start_of_session AND end_of_session THEN SEC_TO_TIMESTAMP(session_start_sec)
    WHEN start_of_session AND end_of_session THEN SEC_TO_TIMESTAMP(access_time_sec)  -- for sessions that only have one access
  END AS session_start_time
FROM
  (
  previous query
  SELECT
    user_id,
    access_time_sec,
    LAG(access_time_sec, 1) OVER (PARTITION BY user_id ORDER BY access_time_sec, prev_access_time_sec) AS session_start_sec
    start_of_session,
    end_of_session
  FROM
    (
    SELECT
      user_id,
      access_time_sec,
      prev_access_time_sec,
      start_of_session,
      ISNULL(is_next_access_sos, TRUE) AS end_of_session -- if an access is the end of the partition, it must be the end of the session
    FROM
      (
      SELECT
        user_id,
        access_time_sec,
        prev_access_time_sec,
        start_of_session,
        LEAD(start_of_session, 1) OVER (PARTITION BY user_id ORDER BY access_time_sec, prev_access_time_sec) is_next_access_sos
      FROM
        (
        SELECT
          user_id,
          access_time_sec,
          prev_access_time_sec,
          IFNULL -- The first access of each partition is the beginning of session by default.
            (
              access_time_sec - prev_access_time_sec >= 30 * 60,
              TRUE
            ) AS start_of_session
        FROM
          (
          SELECT
            user_id,
            access_time_sec,
            -- The lag + partition + order combination is to get the previous access to the current access in the row
            LAG(access_time_sec, 1) OVER (PARTITION BY user_id ORDER BY access_time_sec) AS prev_access_time_sec
          FROM
            (
            SELECT
            user_id,
            TIMESTAMP_TO_SEC(access_time) AS access_time_sec, -- Convert timestamp to seconds
            FROM
              ds.access_log_table -- This is the table where you put your log data into
              previous query end
            )
          )
        )
      )
    WHERE
      start_of_session OR is_next_access_sos IS NULL OR is_next_access_sos -- only get the start or the end of session in the result
    )
  )
WHERE NOT (start_of_session AND NOT end_of_session) -- rule out the accesses that are only start of the sessions

Now we have all the sessions with user_id, duration and session_start_time.

The main points in constructing this query are:

  • use partition function to folder the table.
  • rule out the intervals between start and end of the session.
  • use partition function again to calculate the duration.