Real Estate Data Pipeline, Part 1

 This will be the first of two posts documenting the process of creating a data pipeline using real estate data. Overall, the whole adventure will include:

  • Web-scraping Toronto real estate data
  • Cleaning and transforming everything in Spark
  • Exporting and loading the data into AWS RedShift
  • Creating a visualization that puts it all into graphical form

Overall, this will be a post dealing not only with the creation of the pipeline, but things that were learned along the way.

Extract - Web-Scraping with Python





This was my first foray into web-scraping and, using BeautifulSoup, the process was a lot easier than I had anticipated. The web-scraping code can be found here. We begin with our import statements:

from bs4 import BeautifulSoup
import requests
import pandas as pd
import re
import time
BeautifulSoup, as mentioned is one of the standard web-scraping libraries, but there are also options like Selenium, Scrapy and lxml. RE will provide us with regular expression pattern-matching, and pandas will allow us to pull everything into a dataframe. Time is simply used to allow an interval between crawls, so that the server doesn't get overwhelmed.

The first real chunk of code is to establish a connection to the website and then let BeautifulSoup parse all of the html and retrieve python-workable objects.

#Iterate through the website pages
for page_number in range(1, 20):
    url = f"{website}{page_number}"
    response = requests.get(url, headers=headers)
    soup = BeautifulSoup(response.content, 'html.parser')
    listings = soup.find_all('li', {'class': 'card-group__item'})

In this case, the 'card-group__item' was key to finding specific objects, like the address and postal codes, etc. From there, I used various techniques to get the data I wanted. This would vary, depending on what it was. For example, a Canadian postal code follows a specific letter-number-letter_number-letter-number pattern, so I thought a regex would be best:

        postal_code_span = listing.find('img', alt=re.compile(r'Toronto, ON [A-Z]\d[A-Z] \d[A-Z]\d'))
        # Check if postal codes are listed
        if postal_code_span:
            postal_code_pattern = r'Toronto, ON ([A-Z]\d[A-Z] \d[A-Z]\d)'
            postal_code = re.search(postal_code_pattern, postal_code_span['alt']).group(1)
        else:
            postal_code = 'Not listed'

If something matching the pattern was found, it would be placed in the 'postal code' column, and if not, then it would appear as 'Not listed'. Finally, the afore-mentioned 'sleep' command is provided:

    # Add a delay between requests so as to not overwhelm the website
    time.sleep(1)

This gives the script an opportunity to be well-behaved in terms of information requests - an important consideration when it comes to web scraping.

Transform - Cleaning and Altering with Spark


At first, I was using Databricks Community Edition for the transformations and cleaning but opted out when I discovered that the Community Edition doesn't allow for sensitive environment variables or 'Secrets', in Databricks parlance). For those who don't know, environment variables allow one to store data items at the OS level so that the 'real' value can remain hidden while the 'symbolic' version can reside in your code. There are several uses for this, but my main purpose for them is to be able to avoid hard-coding passwords into my scripts - thereby preventing villains from extracting sensitive information!


We can see this first in the import statement:

import os

And then further on down, we call on these variables from the hidden depths of Ubuntu:

# import environment variables

aws_table_name = os.environ.get("aws_table_name")
aws_access_id = os.environ.get("aws_access_id")
aws_secret_id = os.environ.get("aws_secret_id")
region = os.environ.get("region")


In this case I'm also using "findspark" in order to use Apache Spark in conjunction with Jupyter Notebook. When the web-scraping operation was completed, it exported all of the contents to a CSV file. Now that CSV file is being imported into Spark; I'm using the 'infer schema' option because the dataset is small and the engine should be able to easily detect the data types contained within:

file_location = "/home/vs/Documents/python/real_estate_project/toronto_real_estate.csv"
file_type = "csv"

infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

Up next are the transformations. These are very humble, but I wanted to dip my toe in the waters of Spark and my main ideas were to:

  • Remove special characters from the 'price' column
  • Change the 'price' column to a numeric data type (probably INT)
  • Remove special characters from the 'address' column
  • Change all of the column names to title case, just to make things look a bit nicer

The Spark dataframe currently looks like this:

+----------+--------------------+----------+----------+-----------+
|     price|             address|  bedrooms| bathrooms|postal_code|
+----------+--------------------+----------+----------+-----------+
|$2,599,900|      599 Spadina Rd|         4|Not listed|    M5P 2X1|
|$1,999,999|       61 Cameron St|         6|Not listed|    M5T 2H1|
|$1,750,000| #106 -4750 Yonge St|Not listed|Not listed| Not listed|
|$1,688,000|      140 Bogert Ave|         3|Not listed|    M2N 1K8|
|$1,520,000|150 Harlandale Av...|         4|Not listed|    M2N 1P4|


Now here is the code to do the various transformations:

# remove all non-numerical special characters
df = df.withColumn("price", regexp_replace("price", "[^0-9]", ""))
df = df.withColumn("address", regexp_replace("address", "[^a-zA-Z0-9 -]", ""))
# change the price column data type to integer
df = df.withColumn("price", df["price"].cast(IntegerType()))
# Get the current column names
old_columns = df.columns

# Create a list of new column names in title case
new_columns = [col_name.title() for col_name in old_columns]

# Rename the columns using withColumnRenamed()
df = df.select([col(old_col_name).alias(new_col_name) for old_col_name, new_col_name in zip(old_columns, new_columns)])

One thing that's worth noting is the different ways of doing things in Spark vs Pandas. That may be something to consider for a future blog post, but for now our main transformations are done and the next step is to push all of this into AWS Redshift. Stay tuned for Part 2!

Comments

Popular posts from this blog

The Basics of IICS

Imperial to Metric Conversion (and vice-versa) Script