Data Modelling in Apache Cassandra NoSQL Database

For answering specific types of queries requested by analytics team on user's activity on a music streaming app

Import Python packages

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

Creating list of filepaths to process original event csv data files

In [2]:
# checking current working directory
print("Current working directory:", os.getcwd())

# Get current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Creating a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):    
    # join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    #print(file_path_list)
Current working directory: /home/workspace

Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            

print("Total number of rows in files", len(full_data_rows_list))

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))
Total number of rows in files 8056
In [4]:
#checking number of rows in csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print("Number of rows in event_datafile_new.csv:", sum(1 for line in f))
Number of rows in event_datafile_new.csv: 6821

The **event_datafile_new.csv** contains the following columns:

  • artist
  • firstName of user
  • gender of user
  • item number in session
  • last name of user
  • length of the song
  • level (paid or free song)
  • location of the user
  • sessionId
  • song title
  • userId

The image below is a screenshot of what the denormalized data appears like in the **event_datafile_new.csv** after the code above is run:

Creating and loading tables in Apache Cassandra

Creating a Cluster

In [5]:
# Making connection to Cassandra on local machine
# (127.0.0.1)

from cassandra.cluster import Cluster
cluster = Cluster()

# Starting a session to establish connection and begin executing queries
session = cluster.connect()

Setting up Keyspace

In [6]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS ath 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
    )
    session.set_keyspace('ath')
except Exception as e:
    print(e)

Create tables and queries to ask the following three questions of the data

1. Get artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4
2. Get only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
3. Get every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

Task 1

Get the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4

  • For handling this type of queries, I created a table with session_id set as partition key and (session_id, item_in_session) as composite primary key. While session_id alone is not sufficient to uniquely identify the rows in this, the composite of session_id and item_in_session will serve purpose.
  • I included other attributes related to song like title, artist and song's length into table as they are expected by this type of query in the results.
In [7]:
# Creating table modeled to handle this specific type of query
create_table_query = """
    CREATE TABLE IF NOT EXISTS songs_played_in_session (
        session_id int,
        item_in_session int,
        song_title text,
        artist text,
        song_length float,
        PRIMARY KEY (session_id, item_in_session)
    );
"""
try:
    session.execute(create_table_query)
except Exception as e:
    print(e)     

# Loading data into table from csv file 
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO songs_played_in_session (session_id, item_in_session, song_title, artist, song_length) "
        query = query + "VALUES (%s, %s, %s, %s, %s)"        
        session.execute(query, (int(line[8]), int(line[3]), line[9], line[0], float(line[5])))
        
# Executing a SELECT query to verify that the data have been inserted into each table        
query = """
    SELECT artist, song_title, song_length 
    FROM songs_played_in_session
    WHERE session_id=%s AND item_in_session = %s
"""
try:
    rows = session.execute(query, (338, 4))
except Exception as e:
    print(e)
    
for row in rows:
    print ("Artist:", row.artist, ", Song:", row.song_title, ", Song length:", row.song_length)
Artist: Faithless , Song: Music Matters (Mark Knight Dub) , Song length: 495.30731201171875

Task 2

Get only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

  • For handling this type of queries, I created a table with (user_id, session_id) set as partition key and (session_id, item_in_session, item_in_session) as composite primary key.
  • As this type of query is expecting results from from one specific session_id that is always supposed to be passed to the query as an argument, it was unnecessary to use it as clustering column. Therefore I chose to use it in partition key along with user_id, to help the database distribute the data across larger number of partitions through wider pool of hash values that would be generated from partition keys composed of user_id and session_id.
  • As the query expects the results sorted by item_in_session attribute, I included it as clustering column in the primary key.
  • I included other attributes related to song like title, artist and listener's first and last name into table as they are expected by this type of query in the results.
In [8]:
# Creating table modeled to handle this specific type of query
create_table_query = """
    CREATE TABLE IF NOT EXISTS song_playlist_session (
        user_id text,
        session_id int,
        item_in_session int,
        artist text,
        song_title text,
        user_first_name text,
        user_last_name text,
        PRIMARY KEY ((user_id, session_id), item_in_session)
    );
"""
try:
    session.execute(create_table_query)
except Exception as e:
    print(e)             
    
# Loading data into table from csv file 
file = 'event_datafile_new.csv'
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO song_playlist_session (session_id, item_in_session, song_title, artist, user_first_name, user_last_name, user_id) "
        query = query + "VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (int(line[8]), int(line[3]), line[9], line[0], line[1], line[4], line[10]))

# Executing a SELECT query to verify that the data have been inserted into each table                
query = """
    SELECT artist, song_title, user_first_name, user_last_name 
    FROM song_playlist_session
    WHERE user_id=%s AND session_id=%s
"""
try:
    rows = session.execute(query, ("10", 182))
except Exception as e:
    print(e)
    
for row in rows:
    print ("Artist:", row.artist, ", Song:", row.song_title, ", User first name:", row.user_first_name, ", User last name:", row.user_last_name)

                    
Artist: Down To The Bone , Song: Keep On Keepin' On , User first name: Sylvie , User last name: Cruz
Artist: Three Drives , Song: Greece 2000 , User first name: Sylvie , User last name: Cruz
Artist: Sebastien Tellier , Song: Kilometer , User first name: Sylvie , User last name: Cruz
Artist: Lonnie Gordon , Song: Catch You Baby (Steve Pitron & Max Sanna Radio Edit) , User first name: Sylvie , User last name: Cruz

Task 3

Get every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

  • For handling this type of queries, I created a table with song_title set as partition key and (song_title, user_id) as composite primary key. The combination of song_title and user_id will help uniquely identify the rows for table that contains data about the users who have listened each of the songs.
  • As the query expects to get names of all the users who listen to specified song title and therefore does not receive user_id as an argument to query, user_id can not be included in partition key.
  • I included other attributes related to song like title, artist and listener's first and last name into table as they are expected by this type of query in the results.
In [9]:
# Creating table modeled to handle this specific type of query
create_table_query = """
    CREATE TABLE IF NOT EXISTS song_listeners (
        song_title text,
        user_id text,        
        user_first_name text,
        user_last_name text,
        PRIMARY KEY (song_title, user_id)
    );
"""
try:
    session.execute(create_table_query)
except Exception as e:
    print(e)                   

# Loading data into table from csv file 
file = 'event_datafile_new.csv'   

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO song_listeners (song_title, user_id,  user_first_name, user_last_name) "
        query = query + "VALUES (%s, %s, %s, %s)"
        session.execute(query, ( line[9], line[10],  line[1], line[4]))
        
# Executing a SELECT query to verify that the data have been inserted into each table                        
query = """
    SELECT user_first_name, user_last_name 
    FROM song_listeners
    WHERE song_title=%s
"""
try:
    rows = session.execute(query, ("All Hands Against His Own",))
except Exception as e:
    print(e)
    
for row in rows:
    print ("User first name:", row.user_first_name, ", User last name:", row.user_last_name)
                    
User first name: Jacqueline , User last name: Lynch
User first name: Tegan , User last name: Levine
User first name: Sara , User last name: Johnson

Dropping the tables before closing out the sessions

In [10]:
try:
    session.execute("DROP TABLE IF EXISTS table1")
    session.execute("DROP TABLE IF EXISTS table2")
    session.execute("DROP TABLE IF EXISTS table3")
except Exception as e:
    print (e)
        

Close the session and cluster connection¶

In [11]:
session.shutdown()
cluster.shutdown()