Selenium

Retrieving Singapore housing (HDB) resale prices with Python

This post is more suited for Singapore context with the aim of retrieving the Housing Development Board (HDB) resale prices for the year 2015 grouped by different parts of Singapore. All the prices information are retrieved from the HDB main website. The website retrieves the past 1 yr records for each block or by postcode. Hence, in order to retrieve all the records, one would need to retrieve all the postcode in Singapore first. Below outline the list of information required in order to form the full picture.

  1. Retrieve the full postcode from following sg postcode database.
  2. The above only have postcode, next will have to merge the postcode to the actual address. This website also provide the search of post code and retrieve the corresponding address. You can automate using the same process with python, python pattern and pandas.
  3. Retrieve the HDB resale prices by iterating all the postcode retrieved from above.
  4. The optional steps will also be retrieving the Geocodes correspond to the post code so all the data can be put into a map. This post “Retrieving Geocodes from ZipCodes using Python and Selenium” describes the retrieval method.

The 1st code snippet will be applied to item 1, i.e.,  retrieving the post code. For item 2, it is a two steps process, first have to search the postcode, get the link and from the link, retrieve the address.


import pandas as pd
from pattern.web import  URL, extension

def retrieve_postal_code_fr_web_1(target_url, savefilelocation):
    """ 
        target_url (str): url from function.
        savefilelocation (str): full file path.
    """
    savefile = target_url.split('=')[-1] + '.csv'
    fullsavefile = os.path.join(savefilelocation,savefile)
    
    contents = URL(target_url).download()

    w = pd.read_html(contents)
    w[0].to_csv(fullsavefile, index =False)

The next snippet will describe the method to retrieve the HDB resale prices. By exploring the HDB website, the dataset are in the xml format, The url are as followed: http://services2.hdb.gov.sg/webapp/BB33RTIS/BB33SResaleTransMap?postal=<postcode>. For easy retrieval of data in xml format,  one way is to convert the xml to dict form and then convert to pandas dataframe object from the dict. This python module xmltodict will serve the required function.


import re, os, sys, datetime, time
import pandas as pd
import pattern
import xmltodict

from pattern.web import  URL, extension

class HDBResalesQuery(object):
    """ 
        For retrieving the resales prices from HDB webpage.
    """
    def __init__(self):
        """ List of url parameters -- for url formation """
        self.com_data_start_url = 'http://services2.hdb.gov.sg/webapp/BB33RTIS/BB33SResaleTransMap?postal='
        self.postal_portion_url = ''
        self.com_data_full_url = ''
        self.postal_list = [] #multiple postal code list

        ## storage
        self.single_postal_df = pd.DataFrame()
        self.multi_postal_df = pd.DataFrame()

        ## debugging
        self.en_print = 1
        
    def set_postal_code(self, postalcode):
        """ Set the postal code to url part.
            Set to self.postal_portion_url.
            Args:
                postalcode (str): can be str or int??
        """
        self.postal_portion_url = str(postalcode)

    def set_postal_code_list(self, postalcodelist):
        """ Set list of postal code. Set to self.postal_list
            Args:
                postalcodelist(list): list of postal code
        """
        self.postal_list = postalcodelist

    def form_url_str(self):
        """ Form the url str necessary to get the xml

        """           
        self.com_data_full_url = self.com_data_start_url + self.postal_portion_url
        
    def get_com_data(self):
        """ Combine the url str and get html contents
        """
        self.form_url_str()
        if self.en_print: print self.com_data_full_url
        contents = URL(self.com_data_full_url).download()
        return contents

    def process_single_postal_code(self):
        """ process single postal code and retrieve the relevant information from HDB.

        """
        contents = self.get_com_data()
        if self.en_print: print contents
        obj = xmltodict.parse(contents)

        data_dict_list = []
        if obj['Datasets'].has_key('Dataset'):
            data_set = obj['Datasets']['Dataset']
            if type(data_set) == list:
                for single_data in data_set:
                    data_dict_list.append(dict(single_data))
            else:
                data_dict_list.append(dict(data_set))
        
        #Can convert to pandas dataframe w = pd.DataFrame(data_dict_list)
        self.single_postal_df = pd.DataFrame(data_dict_list)
        if self.en_print: print self.single_postal_df

    def process_mutli_postal_code(self):
        """ for processing multiple postal code.
        """
        self.multi_postal_df = pd.DataFrame()
        
        for postalcode in self.postal_list:
            if self.en_print: print 'processing postalcode: ', postalcode
            self.set_postal_code(postalcode)
            self.process_single_postal_code()
            if len(self.single_postal_df) == 0: #no data
                continue
            if len(self.multi_postal_df) == 0:
                self.multi_postal_df = self.single_postal_df
            else:
                self.multi_postal_df = self.multi_postal_df.append(self.single_postal_df)

            

if __name__ == '__main__':
        """ Trying out the class"""
        postallist = ['640525','180262']
        w = HDBResalesQuery()
        w.set_postal_code_list(postallist)
        w.process_mutli_postal_code()
        print w.multi_postal_df

Note that all the processes require large number of queries (110k) to the website. It is best to schedule it to retrieve in batches or the website will shut you out (identify you as a bot).

The following is the Tableau representation of all the data. It is still a prelim version.

HDB Resale Prices

Advertisements

Retrieving Geocodes from ZipCodes using Python and Selenium

Alternative to using GoogleMapAPI to retrieve the geo codes (Latitude and Longitude) from zip codes. This website allows batch processing of the zip code which make it very convenient for automated batch processing.

Below illustrate the general steps in retrieving the data from the website which involve just enter the zipcode, press the “geocode” button and get the output from secondary text box.

Batch Geocode processing website

The above tasks can be automated using Selenium and python which can emulate the users action by using just a few lines of codes. A preview of the code are as shown below. You will notice that the it calls each element [textbox, button etc] by id. This is also an advantage of this website which provide the id tag for each required element. The data retrieved are converted to Pandas object for easy processing.

Currently, the waiting time is set manually by the users.  The script can be further modified to retrieve the number of data being processed before retrieving the final output. Another issue is that this website also make use of GoogleMapAPI engine which restrict the number of query (~2500 per day).  If require massive query of data, one way is to schedule the script to run at fix interval each day or perhaps query from multiple websites that have this conversion features.

For my project, I may need to pull more than 100,000 data set. Pulling only 2500 query is relatively limited even though I can run it on multiple computers. Would welcome suggestions.


import re, os, sys, datetime, time
import pandas as pd
from selenium import webdriver
from selenium.webdriver import Firefox

from time import gmtime, strftime

def retrieve_geocode_fr_site(postcode_list):
    """ Retrieve batch of geocode based on postcode list.
        Based on site: http://www.findlatitudeandlongitude.com/batch-geocode/#.VqxHUvl96Ul
        Args:
            postcode_list (list): list of postcode.
        Returns:
            (Dataframe): dataframe containing postcode, lat, long

        NOte: need to calcute the time --. 100 entry take 94s

    """
    ## need to convert input to str
    postcode_str = '\n'.join([str(n) for n in postcode_list])

    #target website
    target_url = 'http://www.findlatitudeandlongitude.com/batch-geocode/#.VqxHUvl96Ul' 

    driver = webdriver.Firefox()
    driver.get(target_url)

    #input the query to the text box
    inputElement = driver.find_element_by_id("batch_in") 
    inputElement.send_keys(postcode_str)

    #press button
    driver.find_element_by_id("geocode_btn").click()

    #allocate enough time for data to complete
    # 100 input ard 2-3 min, adjust according
    time.sleep(60*10)

    #retrieve ooutput
    output_data = driver.find_element_by_id("batch_out").get_attribute("value")
    output_data_list = [n.split(',') for n in output_data.splitlines()]

    #processing the output
    #last part create it to a pandas dataframe object for easy processng.
    headers = output_data_list.pop(0)
    geocode_df = pd.DataFrame(output_data_list, columns = headers)
    geocode_df['Postcode'] = geocode_df['"original address"'].str.strip('"')
    geocode_df = geocode_df.drop('"original address"',1)

    ## printing a subset
    print geocode_df.head()

    driver.close()

    return geocode_df

 

 

Saving images from google search using Selenium and Python

Below is a short python script that allows users to save searched images to local drive using Image search on Google. It requires Selenium as Google requires users to press the “show more results” button and the scroll bar to move all the way to the bottom of page for more images to be displayed. Using Selenium will be an easier choice for this function.

The below python script will have the following:

  1. Enable users to input multiple search keywords either by entry or get from file. Users can leave the program to download on its own after creating a series of search keywords.
  2. Based on each keyword, form the google search url. Most of the parameters inside the google search url can be fixed. The only part that required changing is the search keyword as highlighted below in red.
  3. Run google search and obtain page source for the images. This is run using Selenium. To obtain the full set of images, Selenium will help to press the button and scroll the scrollbar to bottom of pages so that Google can load the remaining images. There seems to be a hard quota of 1000 pics for image search on Google.
  4. Use python pattern and xpath to retrieve the corresponding url for each image. The xpath will use the following tag:
    • tag_list = dom(‘a.rg_l’) #a tag with class = rg_l
  5. Based on each url, it will check the following before downloading the image file:
    • whether there is any redirect of site. This is done using Python Pattern redirect function.
    • check the extension whether it is a valid image file type.
  6. The image files are downloaded to a local folder (generated by date). Each image will be label according to the search key and a counter. There will be a corresponding text file mapping the image label to the image url for reference.
import re, os, sys, datetime, time
import pandas
from selenium import webdriver
from contextlib import closing
from selenium.webdriver import Firefox
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC

from pattern.web import URL, extension, cache, plaintext, Newsfeed, DOM

class GoogleImageExtractor(object):

    def __init__(self, search_key = '' ):
        """ Google image search class
            Args:
                search_key to be entered.

        """
        if type(search_key) == str:
            ## convert to list even for one search keyword to standalize the pulling.
            self.g_search_key_list = [search_key]
        elif type(search_key) == list:
            self.g_search_key_list = search_key
        else:
            print 'google_search_keyword not of type str or list'
            raise

        self.g_search_key = ''

        ## user options
        self.image_dl_per_search = 200

        ## url construct string text
        self.prefix_of_search_url = "https://www.google.com.sg/search?q="
        self.postfix_of_search_url = '&source=lnms&tbm=isch&sa=X&ei=0eZEVbj3IJG5uATalICQAQ&ved=0CAcQ_AUoAQ&biw=939&bih=591'# non changable text
        self.target_url_str = ''

        ## storage
        self.pic_url_list = []
        self.pic_info_list = []

        ## file and folder path
        self.folder_main_dir_prefix = r'C:\data\temp\gimage_pic'

    def reformat_search_for_spaces(self):
        """
            Method call immediately at the initialization stages
            get rid of the spaces and replace by the "+"
            Use in search term. Eg: "Cookie fast" to "Cookie+fast"

            steps:
            strip any lagging spaces if present
            replace the self.g_search_key
        """
        self.g_search_key = self.g_search_key.rstrip().replace(' ', '+')

    def set_num_image_to_dl(self, num_image):
        """ Set the number of image to download. Set to self.image_dl_per_search.
            Args:
                num_image (int): num of image to download.
        """
        self.image_dl_per_search = num_image

    def get_searchlist_fr_file(self, filename):
        """Get search list from filename. Ability to add in a lot of phrases.
            Will replace the self.g_search_key_list
            Args:
                filename (str): full file path
        """
        with open(filename,'r') as f:
            self.g_search_key_list = f.readlines()

    def formed_search_url(self):
        ''' Form the url either one selected key phrases or multiple search items.
            Get the url from the self.g_search_key_list
            Set to self.sp_search_url_list
        '''
        self.reformat_search_for_spaces()
        self.target_url_str = self.prefix_of_search_url + self.g_search_key +\
                                self.postfix_of_search_url

    def retrieve_source_fr_html(self):
        """ Make use of selenium. Retrieve from html table using pandas table.

        """
        driver = webdriver.Firefox()
        driver.get(self.target_url_str)

        ## wait for log in then get the page source.
        try:
            driver.execute_script("window.scrollTo(0, 30000)")
            time.sleep(2)
            self.temp_page_source = driver.page_source
            #driver.find_element_by_css_selector('ksb _kvc').click()#cant find the class
            driver.find_element_by_id('smb').click() #ok
            time.sleep(2)
            driver.execute_script("window.scrollTo(0, 60000)")
            time.sleep(2)
            driver.execute_script("window.scrollTo(0, 60000)")

        except:
            print 'not able to find'
            driver.quit()

        self.page_source = driver.page_source

        driver.close()

    def extract_pic_url(self):
        """ extract all the raw pic url in list

        """
        dom = DOM(self.page_source)
        tag_list = dom('a.rg_l')

        for tag in tag_list[:self.image_dl_per_search]:
            tar_str = re.search('imgurl=(.*)&imgrefurl', tag.attributes['href'])
            try:
                self.pic_url_list.append(tar_str.group(1))
            except:
                print 'error parsing', tag

    def multi_search_download(self):
        """ Mutli search download"""
        for indiv_search in self.g_search_key_list:
            self.pic_url_list = []
            self.pic_info_list = []

            self.g_search_key = indiv_search

            self.formed_search_url()
            self.retrieve_source_fr_html()
            self.extract_pic_url()
            self.downloading_all_photos() #some download might not be jpg?? use selnium to download??
            self.save_infolist_to_file()

    def downloading_all_photos(self):
        """ download all photos to particular folder

        """
        self.create_folder()
        pic_counter = 1
        for url_link in self.pic_url_list:
            print pic_counter
            pic_prefix_str = self.g_search_key  + str(pic_counter)
            self.download_single_image(url_link.encode(), pic_prefix_str)
            pic_counter = pic_counter +1

    def download_single_image(self, url_link, pic_prefix_str):
        """ Download data according to the url link given.
            Args:
                url_link (str): url str.
                pic_prefix_str (str): pic_prefix_str for unique label the pic
        """
        self.download_fault = 0
        file_ext = os.path.splitext(url_link)[1] #use for checking valid pic ext
        temp_filename = pic_prefix_str + file_ext
        temp_filename_full_path = os.path.join(self.gs_raw_dirpath, temp_filename )

        valid_image_ext_list = ['.png','.jpg','.jpeg', '.gif', '.bmp', '.tiff'] #not comprehensive

        url = URL(url_link)
        if url.redirect:
            return # if there is re-direct, return

        if file_ext not in valid_image_ext_list:
            return #return if not valid image extension

        f = open(temp_filename_full_path, 'wb') # save as test.gif
        print url_link
        self.pic_info_list.append(pic_prefix_str + ': ' + url_link )
        try:
            f.write(url.download())#if have problem skip
        except:
            #if self.__print_download_fault:
            print 'Problem with processing this data: ', url_link
            self.download_fault =1
        f.close()

    def create_folder(self):
        """
            Create a folder to put the log data segregate by date

        """
        self.gs_raw_dirpath = os.path.join(self.folder_main_dir_prefix, time.strftime("_%d_%b%y", time.localtime()))
        if not os.path.exists(self.gs_raw_dirpath):
            os.makedirs(self.gs_raw_dirpath)

    def save_infolist_to_file(self):
        """ Save the info list to file.

        """
        temp_filename_full_path = os.path.join(self.gs_raw_dirpath, self.g_search_key + '_info.txt' )

        with  open(temp_filename_full_path, 'w') as f:
            for n in self.pic_info_list:
                f.write(n)
                f.write('\n')

if __name__ == '__main__':

    choice =4

    if choice ==4:
        """test the downloading of files"""
        w = GoogleImageExtractor('')#leave blanks if get the search list from file
        searchlist_filename = r'C:\data\temp\gimage_pic\imgsearch_list.txt'
        w.set_num_image_to_dl(200)
        w.get_searchlist_fr_file(searchlist_filename)#replace the searclist
        w.multi_search_download()

Google Image Search with Python (part 1)

Google has a image  search feature that allows users to input a image and search for related web pages that embed the image (reverse image search). Google also shows related images that are similar to the targeted image.

There are multiple ways to input the image into Google search such as drag-and-drop to the search input box, upload the file or provide an url link of the image. Note that Google will store all the images that have been uploaded for its own internal use.

The project here will try to make use of the image url link to pull the Google results automatically. The overall flow is as below:

  1. Upload image to a fixed location that can provide a public link of the image url.
  2. Combined the image url to the Google image search url
  3. Google image search url is of the following format
  4. Scrape the Google Result page returned from the combined url for the results.

Item 1 is difficult as it would required a place to upload and store the new image and at the same time return the correct url. The concept is to use cloud storage such as Dropbox or BOX which allow public to view the file if provide the url link and at the same time acts as regular folder on the local computer.

This project will use BOX to perform item 1. It requires an BOX account and installation of BOX to local computer. After which, the following steps are required.

  1. Create a temp folder and a dummy image (.jpg)
  2. Note the image file name. This should not be changed as it will affect the final url.
  3. Copy the public link and paste to browser. The public link will be used in script for subsequent pulling.
  4. The browser will re-direct to the BOX image viewer. The manual way to retrieve the image url can be by right clicking on the image and select image url.
  5. The image will be of the following format.
  6. If the image is subsequently be overwritten, the filename should not change BUT the file_version  will be updated hence the url will change with the new file version (highlighted in blue)

The script for this part will be to automatically get the url from the BOX page given the public link. Note that inputting the url and direct scraping of the webpage will not get the image url as it need to wait for the javascript execution.

One way to overcome this is to use Selenium (Web browser Automation). This will automatically execute any Javascript and retrieved the final html of the page. With the final html, we can use the Python pattern DOM object to parse the image url.

Below is the class for the getting the image url to be inputted to Google search. For this post, only this portion is displayed.

import re, os, sys, math, time, datetime, shutil
from pattern.web import URL, DOM, plaintext, extension, Element, find_urls
from contextlib import closing
from selenium.webdriver import Firefox
from selenium.webdriver.support.ui import WebDriverWait

class BoxImageUrl(object):
    """ Fetch the url of a public share link pic.
        Can write a image to that particular file and get the latest url of that file
        Need to wait for sometime for the image to load --> can use before and after to see any chnage in the words
        Need to wait for the box image to load up.

        Note:
        self.share_folder_url  --> public folder link of BOX. Set by user.
        self.local_image_store_path --> placeholder for all new image. All new image is to overwrite this file.
                                        Set by user.

    """
    def __init__(self):
        ## url parametesr
        self.share_folder_url = 'https://app.box.com/s/jlwchpjfcpueq1gshij7' #use to go to box to get the image url
        self.box_image_full_url = ''
        self.box_image_start_url = 'https://app.box.com/representation/file_version_'
        self.box_image_end_url =''

        ## local placeholder location.
        self.local_image_store_path = r'C:\Users\Tan Kok Hua\Box Sync\temp\stock2.jpg'
        self.image_version = '0' #current version that exists
        self.image_version_history = '0' # Use to check version or whether file has already uploaded.

        ## general use
        self.dom_object = object()

        ## Error/ debug / monitor
        self.url_query_timeout = 0
        self.new_image_upload_check_cntdn = 10 # number of times before the while loop break for checking.

    def set_box_public_link_of_image(self, image_public_link):
        """ Set the public link of image based on BOX.
            To get the public link. Go to Box Sync folder, navigate to image, right click and select Share Box link.
            Args:
                image_public_link (str): http string of the image public link.
       """
        self.share_folder_url = image_public_link

    def fetch_image_url_fr_box(self):
        """ Fetch Image url for Box.com.
            Set to self.image_url.
            Make use of selenium.

        """
        with closing(Firefox()) as browser:
             browser.get(self.share_folder_url)
             time.sleep(3)
             page_source = browser.page_source

        self.set_box_image_end_url(page_source)
        self.set_final_image_box_url()

    def set_box_image_end_url(self, box_page_source):
        """ From the box page source, get the box_image end url.
            Note the image version number will change with each upload of the same filename.
            Args:
                box_page_source (str): source in html.
            Returns:
                (str): inside file_version_x where x is the digit str required.
        """
        dom = DOM(box_page_source)

        ## pic will be in the img tag. For box only one img tag return
        img_element = dom("img")[0]
        ## text str will be inside this attribute or the img tag --> src.
        ## encode to get rid of the unicode
        txt_str = img_element.attributes['src'].encode()
        ## Get the image version --> mainly to use whether the image is already uploaded.
        self.image_version = re.search('file_version_(.*)/image', txt_str).group(1)
        ## extract the file version from the text str.
        self.box_image_end_url = re.search('file_version_(.*)', txt_str).group(1)

    def set_final_image_box_url(self):
        """ Get final image box url by joining the start and end url.

        """
        self.box_image_full_url = self.box_image_start_url + self.box_image_end_url

    def set_image_version_history(self):
        """ Set the image version history by scanning the website before uploading new image.
        """
        self.fetch_image_url_fr_box() # will also set the image version history
        self.image_version_history = self.image_version
        print 'Image version history', self.image_version_history

    def upload_new_image(self, target_image_path):
        """ Move the target image to the place holder defined by self.local_image_store_path
            Args:
                target_image_path (str): file path of image to be searched.
        """
        print 'uploading images'
        shutil.copy2(target_image_path, self.local_image_store_path)
        if self.has_img_uploaded():
            print 'Successful'
        else:
            print 'new image not found'

    def has_img_uploaded(self):
        """ Checked whether image has uploaded by repeatly calling the image url get.
            if self.image_version_history is changed.

        """
        for n in range(self.new_image_upload_check_cntdn):
            time.sleep(10)
            self.fetch_image_url_fr_box()
            if not self.image_version == self.image_version_history:
                ## means new version already uploaded
                return True
        return False

if __name__ == '__main__':
    choice  = 3

    if choice ==3:
        ## initialize the class
        hh = BoxImageUrl()

        ## Set the image public link from the BOX sync folder
        hh.set_box_public_link_of_image('https://app.box.com/s/jlwchpjfcpueq1gshij7')

        ## Go the public link and get the previous true image url.
        ## As the image file is continuously upload with new image, this is used to check for version.
        hh.set_image_version_history()

        ## Upload the new image to perform the google search.
        ## Time is allocated for the image to upload fully by monitoring the change in file version.
        hh.upload_new_image(r'C:\data\temp\person.jpg')

        ## Latest image url is obtained. This will eventually pass to google for image search.
        print hh.box_image_full_url