Data
Data Pipeline
The data pipeline serves to clean, structure, and process the raw observations sourced from iNaturalist.
Pipeline Details
Pipeline
Pipeline to clean raw data into interim data source.
Attributes:
Name | Type | Description |
---|---|---|
df_whole |
DataFrame
|
Contains all aggregated observations |
df |
DataFrame
|
Dataframe containing the current batch of observations |
datasets |
list
|
A list of individual observation csv files to be aggregated as raw data |
resource_path |
str
|
Path to raw data resources, from project root directory |
write_path |
str
|
Path to interim data resources, from project root directory |
interim_exists |
bool
|
A flag representing if an existing interim_data.csv file exists in the project. |
row_sum |
int
|
Contains the sum of aggregate observations. Value only initialized after dataset aggregation. |
start_time |
DateTime
|
Records the start time of pipeline processing |
TEST |
bool
|
A flag indicating values should be initialized for testing purposes. |
test_df |
DataFrame
|
A direct dataframe insert for pipeline testing purposes |
interim_file |
str
|
Specification of the file to write data to after cleaning process. The naming convention is |
bad_file |
str
|
Specification of the file to write identified potentially bad observation images to. |
batch_size |
int
|
Size of individual batches that aggregate observations are broken down into. |
Source code in src/data/DataCleanPipeline.py
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 |
|
__init__(datasets=['observations_sample.csv'], test_df=None)
Initializes the observation pipeline.
Note, it is recommended to process each data file one-by-one, but the pipeline has the capability to process several if required.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
datasets |
list
|
The list of datasets to process. The list should contain csv file names pointing to the dataset files. |
['observations_sample.csv']
|
test_df |
DataFrame
|
The capability to pass a pre-generated dataframe for pipeline testing purposes |
None
|
Source code in src/data/DataCleanPipeline.py
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
|
activate_flow()
Method details and executes the flow of the cleaning pipeline
Source code in src/data/DataCleanPipeline.py
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
|
aggregate_observations()
Method aggregates all observations from separate files, placing them within a df for manipulation
Method will check if dataframe is empty. This is to accommodate test cases which preloads the dataframe into the dataframe
Source code in src/data/DataCleanPipeline.py
85 86 87 88 89 90 91 92 93 94 |
|
bad_data_separation()
Method performs the sub-process of bad data separation, formatting, and writing to the bad_data file
Source code in src/data/DataCleanPipeline.py
265 266 267 268 269 |
|
batching()
This method creates observation batches from the aggregate observations in order to iteratively process and clean data.
This method processes each batch, iteratively writing to interim_data.csv. The DataFrame df_whole contains aggregate observations, while df contains the current batch. Each batch once generated is removed from df_whole until this dataframe is empty, concluding the batching process.
Returns:
Type | Description |
---|---|
bool
|
Method returns a boolean value. True if there are still observations to be batched and processes. False if df_whole is empty |
bool
|
indicating the batching process has concluded. |
Source code in src/data/DataCleanPipeline.py
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
|
continuation(test_interim_df=None, test_bad_df=None)
Method determines the status of the Data Cleaning Pipeline, enabling continuation without redundancies if the cleaning process is interrupted.
This method enables testing through the test_interim_df dataframe that can be passed to it. However, within the pipeline this is automated, and no parameter is required. Method accesses the interim_data.csv file, and identifies already processes observations, removing them from the current cleaning proces.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
test_interim_df |
DataFrame
|
This dataframe is None during Pipeline cleaning process, however it allows for the creation of an interim dataframe for testing purposes (Not running the entire pipeline. |
None
|
test_bad_df |
DataFrame
|
This dataframe is None during the Pipeline process cleaning process, however is allows for the creation of a test_bad_df for testing purposes. |
None
|
Source code in src/data/DataCleanPipeline.py
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
|
coordinate_to_country_rate_limited()
Method takes data coordinates, and identifies the country of origin, creating a Country column within Interim data
Note, this method is currently left out of the pipeline due to Nominitim rate limits.
The Nomanitim geocoding API is utilized. Due to rate limiting of 1 request per second, a rate limiter has been introduced in order to respect the limits.
This method modifies the current batch to include a country column with the country of observation specified for each observation.
Source code in src/data/DataCleanPipeline.py
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
|
enforce_unique_ids()
Removal of any duplicate observations utilizing their observation id
In place duplicate removal such that changes are effected directly within df
Source code in src/data/DataCleanPipeline.py
96 97 98 99 100 101 |
|
format_bad_data(bad_df)
Method creates a sub-dataframe of only the image_url and the image quality (Index is observation ID)
This subset creates the format that the bad quality images will be written to bad_quality.csv
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bad_df |
DataFrame
|
DataFrame containing all bad observations filtered from df |
required |
Returns:
Type | Description |
---|---|
DataFrame
|
The method returns a DataFrame containing only the id, image_url, and image_quality columns (bad_data.csv format) |
Source code in src/data/DataCleanPipeline.py
303 304 305 306 307 308 309 310 311 312 313 314 315 |
|
format_observation_dates()
Method ensures that raw data dates follow format yyyy-mm-dd. If the dates deviate they are removed from the dataframe.
In place changes are effected within the classes dataframe df. Any date errors produced by incorrect date formats are transformed into NaT values, and the row removed from df Removal of the rows requires an index reset in order to facilitate testing operations on the cleaning pipeline.
Source code in src/data/DataCleanPipeline.py
199 200 201 202 203 204 205 206 207 208 209 210 211 |
|
generate_local_times()
Method converts UTC time to correct local time utilizing the specified sighting timezone.
The new column generated is labelled "local_time_observed_at" and can be found in interim data folder.
The date conversion utilizes both date and time in the UTC format. This means, any day change (midnight -> next day) are accounted for. The observed_on column is correct.
Source code in src/data/DataCleanPipeline.py
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
|
identify_bad_observations()
Method identifies probable bad quality images based on keyword extraction from the descriptions.
The observations with identified keywords are excluded from the interim data, and instead written to the bad_data.csv file for manual filtering. Keywords list is created based on inspection of observation images and the descriptions of each observation.
Keyword pattern identification is accomplished through the use of regex pattern matching
Returns:
Type | Description |
---|---|
DataFrame
|
The method returns a DataFrame containing the identified potentially bad observations from within the current batch. |
Source code in src/data/DataCleanPipeline.py
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
|
percentage(rows_remaining)
Method generates and updates a status bar based on the progress of batching.
Both percentage complete and running time metrics are displayed Method inspiration: Inspiration: https://www.geeksforgeeks.org/progress-bars-in-python/
Parameters:
Name | Type | Description | Default |
---|---|---|---|
rows_remaining |
int
|
The number of rows remaining to be processed in the df_whole DataFrame. |
required |
Source code in src/data/DataCleanPipeline.py
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
|
remove_na_working_columns()
This method removes all rows with NaN values, specifically located within columns used for computation that require values.
The 'working columns' include date, time, time zone, and coordinates. If the removal creates an empty dataframe, the method exists execution, displaying an exit message.
Source code in src/data/DataCleanPipeline.py
140 141 142 143 144 145 146 147 148 149 150 151 |
|
remove_peripheral_columns()
Method removes all peripheral columns before writing dataframe to interim_data.csv
Source code in src/data/DataCleanPipeline.py
295 296 297 298 299 300 301 |
|
standardize_timezones()
Method generated timezones in a format accepted by the pytz library for use in the local time zone conversion
This method utilizes the observation coordinates to return the time zone of the sighting. This timezone overwrites the "time_zone" column
Source code in src/data/DataCleanPipeline.py
254 255 256 257 258 259 260 261 262 263 |
|
write_bad_data(bad_df)
Method performs similar operation to the write_interim_data() method, in this case specifically writing bad data
This method should be refactored in conjunction with write_interim_data in order to minimize code repetition
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bad_df |
DataFrame
|
DataFrame containing the sub-dataframe of only id, image_url, and image_quality columns |
required |
Source code in src/data/DataCleanPipeline.py
317 318 319 320 321 322 323 324 325 326 327 328 329 330 |
|
write_interim_data()
Method writes current state of df into interim data folder in csv format
Source code in src/data/DataCleanPipeline.py
332 333 334 335 336 337 338 339 |
|