Streaming data from sources
Some data is really streaming through your computer when you have a generative process that transmits data, which you can process on the fly or just discard, but not recall afterward unless you have stored it away in some data archival repository somewhere. It is like dragging water from a flowing river—the river keeps on flowing but you can filter and process all the water as it goes. It's a completely different strategy from processing all the data at once, which is more like putting all the water in a dam (an analogy for working with all the data in-memory).
As an example of streaming, we could quote the data flow produced instant by instant by a sensor or, even more simply, a Twitter streamline of tweets. Generally, the main sources of data streams are as follows:
- Environment sensors measuring temperature, pressure, and humidity
- GPS tracking sensors recording the location (latitude/longitude)
- Satellites recording image data
- Surveillance videos and sound records
- Web traffic
However, you won't often work on real streams of data but on static records left stored in a repository or file. In such cases, a stream can be recreated according to certain criteria, for example, extracting sequentially or randomly a single record at a time. If, for instance, our data is contained in a TXT or CSV file, all we need to do is fetch a single row of the file at a time and pass it to the learning algorithm.
For the examples in the present and following chapter, we will be working on files stored on your local hard disk and prepare the Python code necessary for its extraction as a stream. We won't use a toy dataset but we won't clutter your local hard drive with too much data for tests and demonstrations.
Datasets to try the real thing yourself
Since 1987, at University of California, Irvine (UCI), the UCI Machine Learning Repository has been hosted, which is a large repository of datasets for the empirical testing of machine learning algorithms by the machine learning community. At the time of writing this, the repository contains about 350 datasets from very different domains and purposes, from supervised regression and classification to unsupervised tasks. You can have a look at the available dataset at https://archive.ics.uci.edu/ml/.
From our side, we have selected a few datasets that will turn useful throughout the book, proposing challenging problems to you with an unusual, but still manageable, 2 GB RAM computer and a high number of rows or columns:
In order to download and use the dataset from the UCI repository, you have to go to the page dedicated to the dataset and follow the link under the title: Download: Data Folder. We have prepared some scripts for automatic downloading of the data that will be placed exactly in the directory that you are working with in Python, thus rendering the data access easier.
Here are some functions that we have prepared and will recall throughout the chapters when we need to download any of the datasets from UCI:
In: import urllib2 # import urllib.request as urllib2 in Python3 import requests, io, os, StringIO import numpy as np import tarfile, zipfile, gzip def unzip_from_UCI(UCI_url, dest=''): """ Downloads and unpacks datasets from UCI in zip format """ response = requests.get(UCI_url) compressed_file = io.BytesIO(response.content) z = zipfile.ZipFile(compressed_file) print ('Extracting in %s' % os.getcwd()+'\\'+dest) for name in z.namelist(): if '.csv' in name: print ('\tunzipping %s' %name) z.extract(name, path=os.getcwd()+'\\'+dest) def gzip_from_UCI(UCI_url, dest=''): """ Downloads and unpacks datasets from UCI in gzip format """ response = urllib2.urlopen(UCI_url) compressed_file = io.BytesIO(response.read()) decompressed_file = gzip.GzipFile(fileobj=compressed_file) filename = UCI_url.split('/')[-1][:-3] with open(os.getcwd()+'\\'+filename, 'wb') as outfile: outfile.write(decompressed_file.read()) print ('File %s decompressed' % filename) def targzip_from_UCI(UCI_url, dest='.'): """ Downloads and unpacks datasets from UCI in tar.gz format """ response = urllib2.urlopen(UCI_url) compressed_file = StringIO.StringIO(response.read()) tar = tarfile.open(mode="r:gz", fileobj = compressed_file) tar.extractall(path=dest) datasets = tar.getnames() for dataset in datasets: size = os.path.getsize(dest+'\\'+dataset) print ('File %s is %i bytes' % (dataset,size)) tar.close() def load_matrix(UCI_url): """ Downloads datasets from UCI in matrix form """ return np.loadtxt(urllib2.urlopen(UCI_url))
Tip
Downloading the example code
Detailed steps to download the code bundle are mentioned in the Preface of this book. Please have a look.
The code bundle for the book is also hosted on GitHub at https://github.com/PacktPublishing/Large-Scale-Machine-Learning-With-Python. We also have other code bundles from our rich catalog of books and videos available at https://github.com/PacktPublishing/. Check them out!
The functions are just convenient wrappers built around various packages working with compressed data such as tarfile
, zipfile
, and gzip
. The file is opened using the urllib2
module, which generates a handle to the remote system and allows the sequential transmission of data and being stored in memory as a string (StringIO
) or in binary mode (BytesIO
) from the io
module—a module devoted to stream handling (https://docs.python.org/2/library/io.html). After being stored in memory, it is recalled just as a file would be from functions specialized in deflating the compressed files from disk.
The four provided functions should conveniently help you download the datasets quickly, no matter if they are zipped, tarred, gzipped, or just plain text in matrix form, avoiding the hassle of manual downloading and extraction operations.
The first example – streaming the bike-sharing dataset
As the first example, we will be working with the bike-sharing dataset. The dataset comprises of two CSV files containing the hourly and daily count of bikes rented in the years between 2011 and 2012 within the Capital Bike-share system in Washington D.C., USA. The data features the corresponding weather and seasonal information regarding the day of rental. The dataset is connected with a publication by Fanaee-T, Hadi, and Gama, Joao, Event labeling combining ensemble detectors and background knowledge, Progress in Artificial Intelligence (2013): pp. 1-15, Springer Berlin Heidelberg.
Our first target will be to save the dataset on the local hard disk using the convenient wrapper functions defined just a few paragraphs earlier:
In: UCI_url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/00275/Bike-Sharing-Dataset.zip' unzip_from_UCI(UCI_url, dest='bikesharing') Out: Extracting in C:\scisoft\WinPython-64bit-2.7.9.4\notebooks\bikesharing unzipping day.csv unzipping hour.csv
If run successfully, the code will indicate in what directory the CSV files have been saved and print the names of both the unzipped files.
At this point, having saved the information on a physical device, we will write a script constituting the core of our out-of-core learning system, providing the data streaming from the file. We will first use the csv
library, offering us a double choice: to recover the data as a list or Python dictionary. We will start with a list:
In: import os, csv local_path = os.getcwd() source = 'bikesharing\\hour.csv' SEP = ',' # We define this for being able to easily change it as required by the file with open(local_path+'\\'+source, 'rb') as R: iterator = csv.reader(R, delimiter=SEP) for n, row in enumerate(iterator): if n==0: header = row else: # DATA PROCESSING placeholder # MACHINE LEARNING placeholder pass print ('Total rows: %i' % (n+1)) print ('Header: %s' % ', '.join(header)) print ('Sample values: %s' % ', '.join(row)) Out: Total rows: 17380 Header: instant, dteday, season, yr, mnth, hr, holiday, weekday, workingday, weathersit, temp, atemp, hum, windspeed, casual, registered, cnt Sample values: 17379, 2012-12-31, 1, 1, 12, 23, 0, 1, 1, 1, 0.26, 0.2727, 0.65, 0.1343, 12, 37, 49
The output will report to us how many rows have been read, the content of the header—the first row of the CSV file (stored in a list)—and the content of a row (for convenience, we printed the last seen one). The csv.reader
function creates an iterator
that, thanks to a for
loop, will release each row of the file one by one. Note that we have placed two remarks internally in the code snippet, pointing out where, throughout the chapter, we will place the other code to handle data preprocessing and machine learning.
Features in this case have to be handled using a positional approach, which is indexing the position of the label in the header. This can be a slight nuisance if you have to manipulate your features extensively. A solution could be to use csv.DictReader
that produces a Python dictionary as an output (which is unordered but the features may be easily recalled by their labels):
In: with open(local_path+'\\'+source, 'rb') as R: iterator = csv.DictReader(R, delimiter=SEP) for n, row in enumerate(iterator): # DATA PROCESSING placeholder # MACHINE LEARNING placeholder pass print ('Total rows: %i' % (n+1)) print ('Sample values: %s' % str(row)) Out: Total rows: 17379 Sample values: {'mnth': '12', 'cnt': '49', 'holiday': '0', 'instant': '17379', 'temp': '0.26', 'dteday': '2012-12-31', 'hr': '23', 'season': '1', 'registered': '37', 'windspeed': '0.1343', 'atemp': '0.2727', 'workingday': '1', 'weathersit': '1', 'weekday': '1', 'hum': '0.65', 'yr': '1', 'casual': '12'}
Using pandas I/O tools
As an alternative to the csv
module, we can use pandas' read_csv
function. Such a function, specialized in uploading CSV files, is part of quite a large range of functions devoted to input/output on different file formats, as specified by the pandas documentation at http://pandas.pydata.org/pandas-docs/stable/io.html.
The great advantages of using pandas I/O functions are as follows:
- You can keep your code consistent if you change your source type, that is, you need to redefine just the streaming iterator
- You can access a large number of different formats such as CSV, plain TXT, HDF, JSON, and SQL query for a specific database
- The data is streamed into chunks of the desired size as DataFrame data structures so that you can access the features in a positional way or by recalling their label, thanks to
.loc
,.iloc
,.ix
methods typical of slicing and dicing in a pandas dataframe
Here is an example using the same approach as before, this time built around pandas' read_csv
function:
In: import pandas as pd CHUNK_SIZE = 1000 with open(local_path+'\\'+source, 'rb') as R: iterator = pd.read_csv(R, chunksize=CHUNK_SIZE) for n, data_chunk in enumerate(iterator): print ('Size of uploaded chunk: %i instances, %i features' % (data_chunk.shape)) # DATA PROCESSING placeholder # MACHINE LEARNING placeholder pass print ('Sample values: \n%s' % str(data_chunk.iloc[0])) Out: Size of uploaded chunk: 2379 instances, 17 features Size of uploaded chunk: 2379 instances, 17 features Size of uploaded chunk: 2379 instances, 17 features Size of uploaded chunk: 2379 instances, 17 features Size of uploaded chunk: 2379 instances, 17 features Size of uploaded chunk: 2379 instances, 17 features Size of uploaded chunk: 2379 instances, 17 features Sample values: instant 15001 dteday 2012-09-22 season 3 yr 1 mnth 9 hr 5 holiday 0 weekday 6 workingday 0 weathersit 1 temp 0.56 atemp 0.5303 hum 0.83 windspeed 0.3284 casual 2 registered 15 cnt 17 Name: 0, dtype: object
Here, it is very important to notice that the iterator is instantiated by specifying a chunk size, that is, the number of rows the iterator has to return at every iteration. The chunksize
parameter can assume values from 1 to any value, though clearly the size of the mini-batch (the chunk retrieved) is strictly connected to your available memory to store and manipulate it in the following preprocessing phase.
Bringing larger chunks into memory offers an advantage only in terms of disk access. Smaller chunks require multiple access to the disk and, depending on the characteristics of your physical storage, a longer time to pass through the data. Nevertheless, from a machine learning point of view, smaller or larger chunks make little difference for Scikit-learn out-of-core functions as they learn taking into account only one instance at a time, making them truly linear in computational cost.
Working with databases
As an example of the flexibility of the pandas I/O tools, we will provide a further example using a SQLite3 database where data is streamed from a simple query, chunk by chunk. The example is not proposed for just a didactical use. Working with a large data store in databases can indeed bring advantages from the disk space and processing time point of view.
Data arranged into tables in a SQL database can be normalized, thus removing redundancies and repetitions and saving disk storage. Database normalization is a way to arrange columns and tables in a database in a way to reduce their dimensions without losing any information. Often, this is accomplished by splitting tables and recoding repeated data into keys. Moreover, a relational database, being optimized on memory and operations and multiprocessing, can speed up and anticipate part of those preprocessing activities otherwise dealt within the Python scripting.
Using Python, SQLite ( reasons:
- It is open source
- It can handle large amounts of data (theoretically up to 140 TB per database, though it is unlikely to see any SQLite application dealing with such amounts of data)
- It operates on macOS and both Linux and Windows 32- and 64-bit environments
- It does not require any server infrastructure or particular installation (zero configuration) as all the data is stored in a single file on disk
- It can be easily extended using Python code to be turned into a stored procedure
Moreover, the Python standard library includes a sqlite3
module providing all the functions to create a database from scratch and work with it.
In our example, we will first upload the CSV file containing the bike-sharing dataset on both a daily and hourly basis to a SQLite database and then we will stream from it as we did from a CSV file. The database uploading code that we provide can be reusable throughout the book and for your own applications, not being tied to the specific example we provide (you just have to change the input and output parameters, that's all):
In : import os, sys import sqlite3, csv,glob SEP = ',' def define_field(s): try: int(s) return 'integer' except ValueError: try: float(s) return 'real' except: return 'text' def create_sqlite_db(db='database.sqlite', file_pattern=''): conn = sqlite3.connect(db) conn.text_factory = str # allows utf-8 data to be stored c = conn.cursor() # traverse the directory and process each .csv file useful for building the db target_files = glob.glob(file_pattern) print ('Creating %i table(s) into %s from file(s): %s' % (len(target_files), db, ', '.join(target_files))) for k,csvfile in enumerate(target_files): # remove the path and extension and use what's left as a table name tablename = os.path.splitext(os.path.basename(csvfile))[0] with open(csvfile, "rb") as f: reader = csv.reader(f, delimiter=SEP) f.seek(0) for n,row in enumerate(reader): if n==11: types = map(define_field,row) else: if n>11: break f.seek(0) for n,row in enumerate(reader): if n==0: sql = "DROP TABLE IF EXISTS %s" % tablename c.execute(sql) sql = "CREATE TABLE %s (%s)" % (tablename,\ ", ".join([ "%s %s" % (col, ct) \ for col, ct in zip(row, types)])) print ('%i) %s' % (k+1,sql)) c.execute(sql) # Creating indexes for faster joins on long strings for column in row: if column.endswith("_ID_hash"): index = "%s__%s" % \ ( tablename, column ) sql = "CREATE INDEX %s on %s (%s)" % \ ( index, tablename, column ) c.execute(sql) insertsql = "INSERT INTO %s VALUES (%s)" % (tablename, ", ".join([ "?" for column in row ])) rowlen = len(row) else: # raise an error if there are rows that don't have the right number of fields if len(row) == rowlen: c.execute(insertsql, row) else: print ('Error at line %i in file %s') % (n,csvfile) raise ValueError('Houston, we\'ve had a problem at row %i' % n) conn.commit() print ('* Inserted %i rows' % n) c.close() conn.close()
The script provides a valid database name and pattern to locate the files that you want to import (wildcards such as *
are accepted) and creates from scratch a new database and table that you need, filling them afterwards with all the data available:
In: create_sqlite_db(db='bikesharing.sqlite', file_pattern='bikesharing\\*.csv') Out: Creating 2 table(s) into bikesharing.sqlite from file(s): bikesharing\day.csv, bikesharing\hour.csv 1) CREATE TABLE day (instant integer, dteday text, season integer, yr integer, mnth integer, holiday integer, weekday integer, workingday integer, weathersit integer, temp real, atemp real, hum real, windspeed real, casual integer, registered integer, cnt integer) * Inserted 731 rows 2) CREATE TABLE hour (instant integer, dteday text, season integer, yr integer, mnth integer, hr integer, holiday integer, weekday integer, workingday integer, weathersit integer, temp real, atemp real, hum real, windspeed real, casual integer, registered integer, cnt integer) * Inserted 17379 rows
The script also reports the data types for the created fields and number of rows, so it is quite easy to verify that everything has gone smoothly during the importation. Now it is easy to stream from the database. In our example, we will create an inner join between the hour and day tables and extract data on an hourly base with information about the total rentals of the day:
In: import os, sqlite3 import pandas as pd DB_NAME = 'bikesharing.sqlite' DIR_PATH = os.getcwd() CHUNK_SIZE = 2500 conn = sqlite3.connect(DIR_PATH+'\\'+DB_NAME) conn.text_factory = str # allows utf-8 data to be stored sql = "SELECT H.*, D.cnt AS day_cnt FROM hour AS H INNER JOIN day as D ON (H.dteday = D.dteday)" DB_stream = pd.io.sql.read_sql(sql, conn, chunksize=CHUNK_SIZE) for j,data_chunk in enumerate(DB_stream): print ('Chunk %i -' % (j+1)), print ('Size of uploaded chunk: %i instances, %i features' % (data_chunk.shape)) # DATA PROCESSING placeholder # MACHINE LEARNING placeholder Out: Chunk 1 - Size of uploaded chunk: 2500 instances, 18 features Chunk 2 - Size of uploaded chunk: 2500 instances, 18 features Chunk 3 - Size of uploaded chunk: 2500 instances, 18 features Chunk 4 - Size of uploaded chunk: 2500 instances, 18 features Chunk 5 - Size of uploaded chunk: 2500 instances, 18 features Chunk 6 - Size of uploaded chunk: 2500 instances, 18 features Chunk 7 - Size of uploaded chunk: 2379 instances, 18 features
If you need to speed up the streaming, you just have to optimize the database, first of all building the right indexes for the relational query that you intend to use.
Tip
conn.text_factory = str
is a very important part of the script; it allows UTF-8 data to be stored. If such a command is ignored, you may experience strange errors when inputting data.
Paying attention to the ordering of instances
As a concluding remark for the streaming data topic, we have to warn you about the fact that, when streaming, you are actually including hidden information in your learning process because of the order of the examples you are basing your learning on.
In fact, online learners optimize their parameters based on each instance that they evaluate. Each instance will lead the learner toward a certain direction in the optimization process. Globally, the learner should take the right optimization direction, given a large enough number of evaluated instances. However, if the learner is instead trained by biased observations (for instance, observations ordered by time or grouped in a meaningful way), the algorithm will also learn the bias. Something can be done during training in order to not remember previously seen instances, but some bias will be introduced anyway. If you are learning time series—the response to the flow of time often being part of the model—such a bias is quite useful, but in most other cases, it acts as some kind of overfitting and translates into a certain lack of generalization in the final model.
If your data has some kind of ordering which you don't want to be learned by the machine learning algorithm (such as an ID order), as a cautionary measure, you can shuffle its rows before streaming the data and obtain a random order more suitable for online stochastic learning.
The fastest way, and the one occupying less space on disk, is to stream the dataset in memory and shrink it by compression. In most cases, but not all, this will work thanks to the compression algorithm applied and the relative sparsity and redundancy of the data that you are using for the training. In the cases where it doesn't work, you have to shuffle the data directly on the disk implying more disk space consumption.
Here, we first present a fast way to shuffle in-memory, thanks to the zlib
package that can rapidly compress the rows into memory, and the shuffle
function from the random
module:
In: import zlib from random import shuffle def ram_shuffle(filename_in, filename_out, header=True): with open(filename_in, 'rb') as f: zlines = [zlib.compress(line, 9) for line in f] if header: first_row = zlines.pop(0) shuffle(zlines) with open(filename_out, 'wb') as f: if header: f.write(zlib.decompress(first_row)) for zline in zlines: f.write(zlib.decompress(zline)) import os local_path = os.getcwd() source = 'bikesharing\\hour.csv' ram_shuffle(filename_in=local_path+'\\'+source, \ filename_out=local_path+'\\bikesharing\\shuffled_hour.csv', header=True)
Tip
For Unix users, the sort
command, which can be easily used with a single invocation (the -R
parameter), shuffles huge amounts of text files very easily and much more efficiently than any Python implementation. It can be combined with decompression and compression steps using pipes.
So something like the following command should do the trick:
zcat sorted.gz | sort -R | gzip - > shuffled.gz
In case the RAM is not enough to store all the compressed data, the only viable solution is to operate on the file as it is on the disk itself. The following snippet of code defines a function that will repeatedly split your file into increasingly smaller files, shuffle them internally, and arrange them again randomly in a larger file. The result is not a perfect random rearrangement, but the rows are scattered around enough to destroy any previous order that could influence online learning:
In: from random import shuffle import pandas as pd import numpy as np import os def disk_shuffle(filename_in, filename_out, header=True, iterations = 3, CHUNK_SIZE = 2500, SEP=','): for i in range(iterations): with open(filename_in, 'rb') as R: iterator = pd.read_csv(R, chunksize=CHUNK_SIZE) for n, df in enumerate(iterator): if n==0 and header: header_cols =SEP.join(df.columns)+'\n' df.iloc[np.random.permutation(len(df))].to_csv(str(n)+'_chunk.csv', index=False, header=False, sep=SEP) ordering = list(range(0,n+1)) shuffle(ordering) with open(filename_out, 'wb') as W: if header: W.write(header_cols) for f in ordering: with open(str(f)+'_chunk.csv', 'r') as R: for line in R: W.write(line) os.remove(str(f)+'_chunk.csv') filename_in = filename_out CHUNK_SIZE = int(CHUNK_SIZE / 2) import os local_path = os.getcwd() source = 'bikesharing\\hour.csv' disk_shuffle(filename_in=local_path+'\\'+source, \ filename_out=local_path+'\\bikesharing\\shuffled_hour.csv', header=True)