Source code for src.preprocess.utils
import pickle
import shutil
import re
import hashlib
import base64
from datetime import datetime
import pandas as pd
[docs]def df_to_records(df: pd.DataFrame, dataset: str, drop_columns = []):
"""
Convert dataframe to a list of record oriented dicts.
Parameters
----------
df : pd.DataFrame
Input dataset.
dataset : str
Name of provider dataset.
drop_columns : type
Which columns (if any) to drop.
Returns
-------
list
List of row-wise dicts.
"""
if dataset == 'OXCGRT':
records = oxcgrt_records(df, dataset, drop_columns)
else:
records = df.to_dict(orient="records")
# ensure that the dataset doesn't have a `dataset` column
# if so, we will have to change this
try:
assert "dataset" not in records[0].keys()
except Exception as e:
raise ValueError('Input dataset contains "dataset" column name.')
# assign a dataset key to each record
for x in records:
x["dataset"] = dataset
return records
[docs]def write_records(records: list, dir: str, fn: str):
"""
Write records to a pickle file.
Parameters
----------
records : list
List of preprocessed records.
dir : str
Output directory.
fn : str
Output file name.
Returns
-------
None
"""
try:
print("Writing records.pickle...")
pickle.dump(records, open(dir + "/" + "records.pickle", "wb"))
except Exception as e:
shutil.rmtree(dir)
raise e("Unable to write tmp/preprocess/records.p.")
[docs]def oxcgrt_records(ox: pd.DataFrame, dataset: str, drop_columns: list = []):
"""
Function to convert OXCGRT data to list of record dicts.
This presents an additional challenge because of the wide format of the OXCGRT data.
Parameters
----------
ox : pd.DataFrame
Input OXCGRT data.
dataset : str
Name of provider dataset.
drop_columns : list
Which columns (if any) to drop.
Returns
-------
list
List of record dicts.
"""
full_value_names, value_names, stub_names = get_names(ox)
id_columns = [x for x in list(set(ox.columns).difference(set(full_value_names))) if x not in drop_columns]
records = ox.to_dict(orient="records")
rs = [x for x in [get_measure_records(r, stub_names, id_columns, full_value_names) for r in records] if x != []]
rs = [item for sublist in rs for item in sublist]
return(rs)
[docs]def get_names(ox: pd.DataFrame):
"""
Get the names of columns holding measure information.
These columns begin with the prefix "A1\_" etc.
returns:
full_value_names: the names of all columns with measure information
value_names: the names of measure columns
stub_names: the measure column prefixes (i.e. "A1")
Parameters
----------
ox : pd.DataFrame
Input OXCGRT dataset.
Returns
-------
full_value_names: list
The names of all columns with measure information.
value_names: list
The names of measure columns.
stub_names: list
The measure column prefixes (i.e. "A1").
"""
stub_exp = r'[A-Z][0-9]+_'
full_value_names = [match for match in ox.columns if re.findall(stub_exp , match) != []]
value_names = [x for x in full_value_names if 'Flag' not in x]
value_names = [x for x in value_names if 'Notes' not in x]
stub_names = [x.split('_')[0] for x in value_names]
return(full_value_names, value_names, stub_names)
[docs]def get_measure_records(combined_record, stub_names, id_columns, full_value_names):
"""
Function to break rows into individual records by stub group.
i.e. subset a row for only C4 records and other information, repeat for all possible measures.
Also drops records where notes column is blank i.e. sum(notes columns) == 0.
Parameters
----------
combined_record : type
Dict of a single OXCGRT row.
stub_names : type
List of names of each stub group.
id_columns : type
List of columns to be retained as IDs.
full_value_names : type
List of full names of value columns.
Returns
-------
list
List of dicts containing all records extracted from a given row.
"""
records = []
for stub in stub_names:
stub_keys = [x for x in full_value_names if stub in x]
keys = id_columns + stub_keys
try:
flag_key = [x for x in stub_keys if '_Flag' in x][0]
except Exception:
pass
try:
notes_key = [x for x in stub_keys if '_Notes' in x][0]
except Exception:
pass
subset = {key: value for key, value in combined_record.items() if key in keys}
# Pass record if notes are blank
try:
if sum([subset[notes_key]]) == 0:
continue
except Exception:
pass
try:
subset['flag'] = subset.pop(flag_key)
except Exception:
subset['flag'] = 0.0
pass
try:
subset['notes'] = subset.pop(notes_key)
except Exception:
pass
#replace 0.0 in id columns with None
for col in id_columns:
if subset[col] == 0.0:
subset[col] = None
measure_key = list(set(list(subset.keys())).difference(set(id_columns + ['measure_name', 'flag', 'notes'])))
subset['measure'] = subset.pop(measure_key[0])
subset['measure_name'] = measure_key[0]
records.append(subset)
return(records)
[docs]def split_df_by_group(data: pd.DataFrame, group: str):
"""
Split a dataframe by group and return a named dictionary.
Parameters
----------
data : pd.DataFrame
Input dataset.
group : str
Name of column to be used as group.
Returns
-------
dict
Dict of dataset slices named by group.
"""
grouped = data.groupby(group)
groups = grouped.groups
grouped = [grouped.get_group(x) for x in grouped.groups]
return(dict(zip(groups, grouped)))
[docs]def filter_new_hashes(data: pd.DataFrame,
ingested_path: str,
date_now: str = datetime.now().strftime('%Y_%m_%d'),
save_ingestion_hashes: bool = False) -> pd.DataFrame:
"""
Filter records by the row-wise hashes of their content.
Reduces the number of records that need to be processed from each dataset.
Will not filter hashes that were ingested on the same day as the function is called.
Parameters
----------
data : pd.DataFrame
Input data.
ingested_path : str
Path to ingested hash reference.
date_now : str
String of current date.
save_ingestion_hashes: bool
Should ingestion hashes be saved?
Returns
-------
pd.DataFrame
Filtered data.
"""
# Read the reference file for ingested hashes
ingested_hash_ref = pd.read_csv(ingested_path)
# Filter for hashes that were not processed today
ingested_hash_ref.loc[ingested_hash_ref['date_processed'] != date_now, :]
# Define row-wise hashes for the input dataset
data['_hash'] = get_row_hashes(data)
# Filter for only hash values that have not been processed on a different day
data = data.loc[[x not in ingested_hash_ref['hash'] for x in data['_hash']]]
# Get the hashes that were just ingested
new_hashes = pd.DataFrame({'hash': data['_hash'], 'date_processed': date_now})
# Remove _hash column from new data
data = data.drop(columns=['_hash'])
# Combine previous hash ref with new hash ref
ingested_hash_ref = pd.concat([ingested_hash_ref, new_hashes]).drop_duplicates()
# Write combined hash ref to csv file if save_ingestion_hashes is True
if save_ingestion_hashes:
ingested_hash_ref.to_csv(ingested_path, index=False)
return(data)
[docs]def get_row_hashes(data: pd.DataFrame) -> list:
"""
Get row-wise base64 encoded hashes for a dataframe.
Parameters
----------
data : pd.DataFrame
Input data.
Returns
-------
list
list of hashes.
"""
# Combine row values into a single string
hash_strings = list(data.apply(lambda x: ''.join([str(x) for x in tuple(x)]), axis = 1))
# Hash and base64 encode string
hashes = [base64.b64encode(hashlib.sha1(x.encode("UTF-8")).digest()) for x in hash_strings]
return(hashes)