Heap Map for discrepancy check

Monitoring counts discrepancy

In one aspect of my work, we have a group of samples undergoing several rounds of modifications with same set of tests being performed at each round. For each test, parameters for each sample are collected. For some samples, a particular test may fail in certain rounds resulting in no/missing parameters being collected for that test.

When we compare the performance of the samples especially grouping as a mean, missing parameters from certain samples at certain rounds may skew the results. To ensure accuracy, we need to ensure matching samples data. As there are multiple tests and few hundreds parameters being tracked, we need a way to keep track of the parameters that have mismatch parameters between rounds.

A simple way will be to use the heat map to highlight parameters that have discrepancy in number of counts (this will mean that some samples are missing in data) between rounds. The script is generated using mainly Pandas and Seaborn.

Steps

1. Group the counts for each parameter for each round.
2. Use one round as reference (default 1st round), take the differences in counts for each parameter for each round.
3. Display as heat map for only rounds that have discrepancy.
import os, sys, datetime, re
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

# retrieve zone data
rawfile = 'raw_data.csv'

# count of data in group
cnt_df = raw_df.groupby(['round']).count()

# Substract the first to the rest
diff_df = cnt_df.subtract(cnt_df.iloc, axis = 1)

# drop columns where it is all zeros, meaning exclude data that are matched.
diff_df.loc[:, diff_df.any()]

fig, ax = plt.subplots(figsize=(10,10))

sns.heatmap(diff_df.loc[:, diff_df.any()].T,  xticklabels=True, yticklabels=True, ax =ax , annot=True, fmt="d", center= 0 ,  cmap="coolwarm")
plt.tight_layout() Extra

Quick view of missing data using seaborn heatmap

sns.heatmap(df.isnull(), yticklabels=False, cbar = False, cmap = 'viridis') Retrieving short sell qty for SG stocks from SGX using python

SGX usually releases short sell information for each stock at the end of each trading day. This information are found in their website. The daily short sell of all stocks are compiled into a  report classified by day. We are interested in getting the short qty ranked by stocks per day.

If we examine the link, each report is in the form of a table format. To extract the information, we can use python pattern for web content download and Pandas for table extraction. Pandas has a function “pandas.io.html.read_html” that can retrieve table like data from the html string easily.

The following lists the steps to retrieve the short sell information.

1. URL formation: As the link are joined by the date, need to retrieve the date str to join to the fixed url string. However, not all the date will be present, eg , during weekends. A better way is keep looping the the date back from current to get the latest date avaliable.
2. HTML data download: This can be done using python pattern.
3. Converting the table to data frame: This can be done using Pandas function “pandas.io.html.read_html”. Also Pandas provides a rank function so that the results can be ranked accordingly. Converting into Pandas database make it easy.
4. Ranking by absolute qty may tend to mislead as it will also depends on the shares relative volume. Combining with the actual shares traded will give  a more representative data. For this case, the data frame retrieved can be joined to the current price df created from the previous post “Retrieving stock news and Ex-date from SGX using python“.
5. The last will be to set the alerts which can be done easily using PushBullet as describe as the following post “Sending alerts to iphone or Android phone using python“. You can customize to send the alert at the end of each trading day to determine the top 10 short sell stocks.

Below show the short sell info retrieval portion of the code found in the  “SGX_stock_announcement_extract.py”  for retrieving the short sell qty for each stocks. The updated code is found in Github.

def retrieve_shortsell_info(self):
""" Retrieve the shortsell information.
will form the url and retrieved the information using pandas to make into table.
The function will set to self_shortsell_info_df.
make it iterat over the days to get the latest data
"""
for last_effective_date in range(7):
self.form_shortsell_url(last_effective_date)
url = URL(self.shortsell_full_url)
try:
#see data is available for that current date
self.shortsell_info_df =shortsell_list
except:
continue

#continue if there is no data
if len(self.shortsell_info_df) == 0: continue

self.shortsell_info_df.rename(columns={0:'Security',1:'Short Sale Volume',
2:'Currency',3:'Short Sale Value',
},inplace =True)
self.shortsell_info_df = self.shortsell_info_df[1:-3]
#change type of the columns
self.shortsell_info_df[['Short Sale Volume', 'Short Sale Value']] = self.shortsell_info_df[['Short Sale Volume', 'Short Sale Value']].astype(float)
#need a rank on the short sell
self.shortsell_info_df['ranked_shortsell'] = self.shortsell_info_df['Short Sale Volume'].rank(method='min',ascending=False)
self.shortsell_info_df['shortsell_lastdate'] = self.set_last_desired_date(last_effective_date)
#need percentage as well

# have a sorting of data?
return

print 'No suitable data found within time frame.'
return

def form_shortsell_url(self, last_effective_date):
""" Based on the current date to set the shorsell url.
Set to self.shortsell_full_url
Args:
last_effective_date (int): last desired date in yyyymmdd.
"""
#retrieve the current date in yyyymmdd format
self.shortsell_date_url = self.set_last_desired_date(num_days = last_effective_date)
self.shortsell_full_url = self.shortsell_info_start_url + self.shortsell_date_url + self.shortsell_end_url

def set_last_desired_date(self, num_days = 0):
""" Return the last date in which the results will be displayed.
It is set to be the current date - num of days as set by users.
Affect only self.print_feeds function.
Kwargs:
num_days (int): num of days prior to the current date.
Setting to 0 will only retrieve the current date
Returns:
(int): datekey as yyyyymmdd.
"""
last_eff_date_list = list((datetime.date.today() - datetime.timedelta(num_days)).timetuple()[0:3])

if len(str(last_eff_date_list)) == 1:
last_eff_date_list = '0' + str(last_eff_date_list)

if len(str(last_eff_date_list)) == 1:
last_eff_date_list = '0' + str(last_eff_date_list)

return str(last_eff_date_list) + str(last_eff_date_list) + str(last_eff_date_list)

""" Use for alerts on shortsell information.
Identify top ten short sell plus target stock short sell information.

"""
## get the current price df so can combined with the shortsell info
self.process_all_data()
merged_shortsell_df = pandas.merge(self.shortsell_info_df,self.sgx_curr_price_df,left_on = 'Security', right_on = 'CompanyName' )

merged_shortsell_df['shortsell_vol_per'] = merged_shortsell_df['Short Sale Volume']/merged_shortsell_df['DailyVolume']
merged_shortsell_df['ranked_percent_vol_shortsell'] = merged_shortsell_df['shortsell_vol_per'].rank(method='min',ascending=False)

top_shortsell_df = merged_shortsell_df[merged_shortsell_df['ranked_shortsell'].isin(range(1,16))]
top_shortsell_df  = top_shortsell_df.sort(columns = 'ranked_shortsell', ascending =True)
top_shortsell_df = top_shortsell_df[['Security','Short Sale Volume','shortsell_lastdate']]
shortsell_top15_shtver = top_shortsell_df.to_string()

api_key_path = r'C:\Users\356039\Desktop\running bat\pushbullet_api\key.txt'
with open(api_key_path,'r') as f:

p = PushBullet(apiKey)

if shortsell_top15_shtver:
p.pushNote('all', 'Shortsell top10', shortsell_top15_shtver,recipient_type="random1")

## display for target watchlist
tar_watchlist_shortsell_df = merged_shortsell_df[merged_shortsell_df['Security'].isin(self.companyname_watchlist)]
tar_watchlist_shortsell_df = tar_watchlist_shortsell_df[['Security','Short Sale Volume','ranked_shortsell','shortsell_vol_per','ranked_percent_vol_shortsell']]
tar_watchlist_shortsell_df =tar_watchlist_shortsell_df[tar_watchlist_shortsell_df['ranked_shortsell'].isin(range(1,100))]
tar_watchlist_shortsell_df  = tar_watchlist_shortsell_df.sort(columns = 'ranked_shortsell', ascending =True)
tar_watchlist_shortsell_shtver = tar_watchlist_shortsell_df.to_string()

if tar_watchlist_shortsell_shtver:
p.pushNote('all', 'Shortsell targetwatchlist', tar_watchlist_shortsell_shtver,recipient_type="random1")

Sample output as followed:
Security | Short Sale Volume|  ranked_shortsell | shortsell_vol_per | ranked_percent_vol_shortsell
Sembcorp Ind | 3529600 | 6 | 0.437422 | 4
CapitaLand | 3313300 | 7 | 0.354216|  7
SingTel | 2809000 | 8 | 0.276471 | 16
Lippo Malls Tr | 2073800 | 11 | 0.492531 | 2

1. Ranked_shortsell –> rank according to the absolute volume
2. Shortsell_vol_per –> short sell qty as ratio of transacted vol
3. ranked_percent_vol_shortsell –> rank according to Shortsell_vol_per

Rapid input data from list of files to SQLite DB

Suppose you wish to input all the data from a series of CSV files residing in a particular folder to a SQLite database, the following tools and and commands will make it a relatively fast and painless.

Each of the CSV file are assumed to be in the format where the first line is the headers and the subsequent lines are series of data. The headers need not be the same for each CSV file.

The following python modules are required.

1. CSV module
2. Pandas (0.15)
3. SQLite3

The following lines of commands will input execute the following actions.

1. Use list comprehension to get the list of file path from a particular folder.
2. Convert the data from each CSV file into dict format using CSV DictReader and iterate over all the files, joining them to a list.
3. Convert the list of dicts to a single Pandas Dataframe.
4. Create a connection to a particular SQLite Database.
5. Use Pandas to_sql() function to pass all the data to SQLite database.
6. The code is displayed below.
<pre>import os
import csv
import pandas
import sqlite3 as lite

path = r'C:\folderpath\to\list\of\files'
## Get the full path of all the csv files.
full_path_list = [os.path.join(path,f) for\
f in os.listdir(path) if os.path.isfile(os.path.join(path,f)) ]

## Convert all data to list of dicts.
full_data_list =  [n for f in full_path_list for n in csv.DictReader(open(f,'r'))]

## SQL database name and initialize the sql connection.
db_filename = r'c:\data\sbrtemp3.db'
con = lite.connect(db_filename)

## Convert to dataframe and write to sql database.
pandas.DataFrame(full_data_list).to_sql('test', con, flavor='sqlite',
schema=None, if_exists='replace', index=True,
index_label=None, chunksize=None, dtype=None)

## Close the SQL connection
con.close()

The Pandas to_sql() function is able to handle issues of duplicates and can be called multiple times if users required to add additional data. In addition, converting to a dataframe from a list of dicts also allows the headers to be different for different CSV files. Besides saving to database, user can also choose to consolidate to a single csv file by using Pandas to_csv() function.

Basic Stock Technical Analysis with python

Simple technical analysis for stocks can be performed using the python pandas module with graphical display. Example of  basic analysis including simple moving averages, Moving Average Convergence Divergence (MACD) and Bollinger bands and width.

For the tech analysis to be performed, daily prices need to be collected for each stock. The Yahoo Finance API can retrieve the required data. The previous post described the method to link the YF API to python. After the historical prices are retrieved, the method for getting the various technical analysis can be easily done using the Pandas rolling mean method and plots can be done using Pandas plot function and additional help from Matplotlib.

Below is snippet of the script that initialize the hist data pulling and display the Bollinger Bands and Bollinger Width for a particular stock (Keppel Corp: BN4.SI).

import os, re, sys, time, datetime, copy, shutil
import pandas
from yahoo_finance_historical_data_extract import YFHistDataExtr
import matplotlib.pyplot as plt

if __name__ == '__main__':
data_ext = YFHistDataExtr()
data_ext.set_interval_to_retrieve(200)#in days
data_ext.set_multiple_stock_list(['BN4.SI'])
data_ext.get_hist_data_of_all_target_stocks()
# convert the date column to date object
data_ext.all_stock_df['Date'] =  pandas.to_datetime( data_ext.all_stock_df['Date'])
temp_data_set = data_ext.all_stock_df.sort('Date',ascending = True ) #sort to calculate the rolling mean

temp_data_set['20d_ma'] = pandas.rolling_mean(temp_data_set['Adj Close'], window=20)
temp_data_set['50d_ma'] = pandas.rolling_mean(temp_data_set['Adj Close'], window=50)
temp_data_set['Bol_upper'] = pandas.rolling_mean(temp_data_set['Adj Close'], window=20) + 2* pandas.rolling_std(temp_data_set['Adj Close'], 20, min_periods=20)
temp_data_set['Bol_lower'] = pandas.rolling_mean(temp_data_set['Adj Close'], window=20) - 2* pandas.rolling_std(temp_data_set['Adj Close'], 20, min_periods=20)
temp_data_set['Bol_BW'] = ((temp_data_set['Bol_upper'] - temp_data_set['Bol_lower'])/temp_data_set['20d_ma'])*100
temp_data_set['Bol_BW_200MA'] = pandas.rolling_mean(temp_data_set['Bol_BW'], window=50)#cant get the 200 daa
temp_data_set['Bol_BW_200MA'] = temp_data_set['Bol_BW_200MA'].fillna(method='backfill')##?? ,may not be good
temp_data_set['20d_exma'] = pandas.ewma(temp_data_set['Adj Close'], span=20)
temp_data_set['50d_exma'] = pandas.ewma(temp_data_set['Adj Close'], span=50)
data_ext.all_stock_df = temp_data_set.sort('Date',ascending = False ) #revese back to original

data_ext.all_stock_df.plot(x='Date', y=['Adj Close','20d_ma','50d_ma','Bol_upper','Bol_lower' ])
data_ext.all_stock_df.plot(x='Date', y=['Bol_BW','Bol_BW_200MA' ])
plt.show() 