Skip to content

Data

Data Pipeline

The data pipeline serves to clean, structure, and process the raw observations sourced from iNaturalist.

Pipeline Details

Pipeline

Pipeline to clean raw data into interim data source.

Attributes:

Name Type Description
df_whole DataFrame

Contains all aggregated observations

df DataFrame

Dataframe containing the current batch of observations

datasets list

A list of individual observation csv files to be aggregated as raw data

resource_path str

Path to raw data resources, from project root directory

write_path str

Path to interim data resources, from project root directory

interim_exists bool

A flag representing if an existing interim_data.csv file exists in the project.

row_sum int

Contains the sum of aggregate observations. Value only initialized after dataset aggregation.

start_time DateTime

Records the start time of pipeline processing

TEST bool

A flag indicating values should be initialized for testing purposes.

test_df DataFrame

A direct dataframe insert for pipeline testing purposes

interim_file str

Specification of the file to write data to after cleaning process. The naming convention is <taxon_parent_name>_interim.csv

bad_file str

Specification of the file to write identified potentially bad observation images to.

batch_size int

Size of individual batches that aggregate observations are broken down into.

Source code in src/data/DataCleanPipeline.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
class Pipeline:
    """ Pipeline to clean raw data into interim data source.

    Attributes:
        df_whole (DataFrame): Contains all aggregated observations
        df (DataFrame): Dataframe containing the current batch of observations
        datasets (list): A list of individual observation csv files to be aggregated as raw data
        resource_path (str): Path to raw data resources, from project root directory
        write_path (str): Path to interim data resources, from project root directory
        interim_exists (bool): A flag representing if an existing interim_data.csv file exists in the project.
        row_sum (int): Contains the sum of aggregate observations. Value only initialized after dataset aggregation.
        start_time (DateTime): Records the start time of pipeline processing
        TEST (bool): A flag indicating values should be initialized for testing purposes.
        test_df (DataFrame): A direct dataframe insert for pipeline testing purposes
        interim_file (str): Specification of the file to write data to after cleaning process. The naming convention is `<taxon_parent_name>_interim.csv`
        bad_file (str): Specification of the file to write identified potentially bad observation images to.
        batch_size (int): Size of individual batches that aggregate observations are broken down into.
    """

    interim_file = "felidae_interim.csv"
    bad_file = 'bad_quality.csv'
    batch_size = 1000

    def __init__(self, datasets=['observations_sample.csv'], test_df=None):
        """ Initializes the observation pipeline.

        Note, it is recommended to process each data file one-by-one, but the pipeline has the capability to process several if required.

        Args:
            datasets (list): The list of datasets to process. The list should contain csv file names pointing to the dataset files.
            test_df (DataFrame): The capability to pass a pre-generated dataframe for pipeline testing purposes

        """
        if test_df is None:
            self.df_whole = pd.DataFrame()
            self.df = pd.DataFrame()
            self.datasets = datasets
            self.resource_path = root_dir() + "/data/obs_and_meta/raw/"
            self.write_path = root_dir() + "/data/obs_and_meta/interim/"
            self.interim_exists = os.path.isfile(self.write_path + self.interim_file)
            self.bad_data_exists = os.path.isfile(self.write_path + self.bad_file)
            self.row_sum = 0
            self.TEST = False
        else:
            self.df_whole = test_df.copy(deep=True)
            self.TEST = True
            self.row_sum = len(self.df_whole.index)

        self.start_time = datetime.now()

    def activate_flow(self):
        """ Method details and executes the flow of the cleaning pipeline"""

        self.aggregate_observations()  # Aggregate all observation files

        self.enforce_unique_ids()  # No duplicate observations

        self.continuation()  # Continuation from interrupt/ start from scratch

        self.remove_na_working_columns()  # Remove any NaN types from columns undergoing computation

        while self.batching():  # Batching loop
            self.bad_data_separation()  # Detect bad quality observations from batch

            self.format_observation_dates()  # Format sighting dates

            self.generate_local_times()  # Generate local observation times

            self.remove_peripheral_columns()  # Remove peripheral columns

            self.write_interim_data()  # Write to interim data

    def aggregate_observations(self):
        """Method aggregates all observations from separate files, placing them within a df for manipulation

        Method will check if dataframe is empty. This is to accommodate test cases which preloads the dataframe into
         the dataframe
        """
        if not self.TEST:
            for dataset in self.datasets:
                df_temp = pd.read_csv(self.resource_path + dataset)
                self.df_whole = pd.concat([self.df_whole, df_temp])  # Merge temp df into container df

    def enforce_unique_ids(self):
        """Removal of any duplicate observations utilizing their observation id

        In place duplicate removal such that changes are effected directly within df
        """
        self.df_whole.drop_duplicates(subset=['id'], keep='first', inplace=True)

    def continuation(self, test_interim_df=None, test_bad_df=None):
        """ Method determines the status of the Data Cleaning Pipeline, enabling continuation without redundancies if
        the cleaning process is interrupted.

        This method enables testing through the test_interim_df dataframe that can be passed to it.
        However, within the pipeline this is automated, and no parameter is required.
        Method accesses the interim_data.csv file, and identifies already processes observations, removing them from the
        current cleaning proces.

        Args:
            test_interim_df (DataFrame): This dataframe is None during Pipeline cleaning process, however it allows for the creation of an interim dataframe for testing purposes (Not running the entire pipeline.
            test_bad_df (DataFrame): This dataframe is None during the Pipeline process cleaning process, however is allows for the creation of a test_bad_df for testing purposes.
        """
        self.df_whole.set_index('id', inplace=True)

        interim_df = pd.DataFrame()
        bad_quality_df = pd.DataFrame()

        if self.TEST and test_interim_df is not None:  # Conditions for test cases
            interim_df = pd.concat([interim_df, test_interim_df])
            bad_quality_df = pd.concat([bad_quality_df, test_bad_df])

        if not self.TEST and self.interim_exists:  # Non-test conditions when interim data file exists
            interim_df = pd.read_csv(self.write_path + self.interim_file)
        if not self.TEST and self.bad_data_exists:  # Non-test conditions when bad_quality data file exists
            bad_quality_df = pd.read_csv(self.write_path + self.bad_file)

        if not interim_df.empty:  # Removal of duplicate rows already written to interim data
            interim_df.set_index('id', inplace=True)
            self.df_whole = self.df_whole.loc[self.df_whole.index.difference(interim_df.index), ]

        if not bad_quality_df.empty:  # Removal of duplicate rows already written to bad_quality data
            bad_quality_df.set_index('id', inplace=True)
            self.df_whole = self.df_whole.loc[self.df_whole.index.difference(bad_quality_df.index), ]

        self.row_sum = len(self.df_whole.index)

    def remove_na_working_columns(self):
        """ This method removes all rows with NaN values, specifically located within columns used for computation that require
        values.

         The 'working columns' include date, time, time zone, and coordinates.
         If the removal creates an empty dataframe, the method exists execution, displaying an exit message.
        """
        self.df_whole.dropna(subset=['observed_on', 'latitude', 'longitude', 'time_observed_at', 'time_zone'],
                             inplace=True)
        if self.df_whole.empty:
            print("*********** No further correctly format to process ***********")
            sys.exit()

    def batching(self) -> bool:
        """ This method creates observation batches from the aggregate observations in order to iteratively process and clean
        data.

        This method processes each batch, iteratively writing to interim_data.csv.
        The DataFrame df_whole contains aggregate observations, while df contains the current batch.
        Each batch once generated is removed from df_whole until this dataframe is empty, concluding the batching process.

        Returns:
            (bool): Method returns a boolean value. True if there are still observations to be batched and processes. False if df_whole is empty
            indicating the batching process has concluded.
        """
        rows_remaining = len(self.df_whole.index)
        if not self.TEST: self.percentage(rows_remaining)

        if self.df_whole.empty:
            return False

        if rows_remaining > self.batch_size:
            self.df = self.df_whole.iloc[0:self.batch_size]
            self.df_whole = self.df_whole.iloc[self.batch_size:]
            return True
        else:
            self.df = self.df_whole
            self.df_whole = pd.DataFrame()
            return True

    def percentage(self, rows_remaining):
        """ Method generates and updates a status bar based on the progress of batching.

        Both percentage complete and running time metrics are displayed
        Method inspiration: Inspiration: https://www.geeksforgeeks.org/progress-bars-in-python/

        Args:
            rows_remaining (int): The number of rows remaining to be processed in the df_whole DataFrame.
        """
        progress_bar_length = 100
        percentage_complete = (self.row_sum - rows_remaining) / self.row_sum
        filled = int(progress_bar_length * percentage_complete)
        running_time = datetime.now() - self.start_time

        bar = '=' * filled + '-' * (progress_bar_length - filled)
        percentage_display = round(100 * percentage_complete, 1)
        sys.stdout.write('\r[%s] %s%s ... running: %s' % (bar, percentage_display, '%', running_time))
        sys.stdout.flush()

    def format_observation_dates(self):
        """ Method ensures that raw data dates follow format yyyy-mm-dd. If the dates deviate they are removed from the dataframe.

        In place changes are effected within the classes dataframe df.
        Any date errors produced by incorrect date formats are transformed into NaT values, and the row removed from df
        Removal of the rows requires an index reset in order to facilitate testing operations on the cleaning pipeline.
        """
        self.df['observed_on'] = pd.to_datetime(self.df['observed_on'],
                                                format='%Y-%m-%d',
                                                yearfirst=True,
                                                errors='coerce',
                                                exact=True).astype(str)
        self.df.query('observed_on != "NaT"', inplace=True)

    def coordinate_to_country_rate_limited(self):
        """ Method takes data coordinates, and identifies the country of origin, creating a Country column within Interim data

        Note, this method is currently left out of the pipeline due to Nominitim rate limits.

        The Nomanitim geocoding API is utilized.
        Due to rate limiting of 1 request per second, a rate limiter has been introduced in order to respect the limits.

        This method modifies the current batch to include a country column with the country of observation specified for each observation.
        """
        # Set up the geolocation library
        geolocator = Nominatim(user_agent="Spatio_Tempt_Class")
        geocode = RateLimiter(geolocator.reverse, min_delay_seconds=2)

        # Combine lat and long into coordinates
        latitudes = pd.Series(self.df.latitude.values.astype(str))  # Generate a Series of latitude values as strings
        longitudes = pd.Series(self.df.longitude.values.astype(str))  # Generate a Series of longitude values as strings
        coordinates = latitudes + ', ' + longitudes  # Combined the latitudes and longitude values into a single coordinate Series

        # Retrieve countries from coordinates (rate limiting requests)
        locations = coordinates.apply(partial(geocode, language='en', exactly_one=True))
        self.df['country'] = locations.apply(lambda x: x.raw['address']['country']).values

    def generate_local_times(self):
        """ Method converts UTC time to correct local time utilizing the specified sighting timezone.

        The new column generated is labelled "local_time_observed_at" and can be found in interim data folder.

        The date conversion utilizes both date and time in the UTC format.
        This means, any day change (midnight -> next day) are accounted for.
        The observed_on column is correct.
        """

        # Standardize time zone formats
        self.standardize_timezones()

        # Generate local times by converting UTC to specified time zones
        self.df['local_time_observed_at'] = self.df.apply(
            lambda x: pd.to_datetime(x['time_observed_at'], utc=True).astimezone(pytz.timezone(x['time_zone'])),
            axis=1).astype(str)

    def standardize_timezones(self):
        """ Method generated timezones in a format accepted by the pytz library for use in the local time zone conversion

        This method utilizes the observation coordinates to return the time zone of the sighting.
        This timezone overwrites the "time_zone" column
        """
        finder = TimezoneFinder()

        self.df['time_zone'] = self.df.apply(
            lambda x: finder.timezone_at(lat=x['latitude'], lng=x['longitude']), axis=1)

    def bad_data_separation(self):
        """Method performs the sub-process of bad data separation, formatting, and writing to the bad_data file"""
        bad_df = self.identify_bad_observations()
        bad_df = self.format_bad_data(bad_df)
        self.write_bad_data(bad_df)

    def identify_bad_observations(self):
        """Method identifies probable bad quality images based on keyword extraction from the descriptions.

        The observations with identified keywords are excluded from the interim data, and instead written to the bad_data.csv file for manual filtering.
        Keywords list is created based on inspection of observation images and the descriptions of each observation.

        Keyword pattern identification is accomplished through the use of regex pattern matching

        Returns:
            (DataFrame): The method returns a DataFrame containing the identified potentially bad observations from within the current batch.
        """
        description_indicators = ['dead', 'road kill', 'road', 'scat', 'poo', 'killed', 'spoor', 'road-kill', 'remains',
                                  'body', 'deceased', 'prey', 'fatality', 'tracks', 'trapped', 'bad', 'roadkilled', 'poop',
                                  'crushed', 'kill', 'squashed', 'terrible', 'caught', 'pool', 'blurry', 'destroyed',
                                  'sidewalk',
                                  'grounded']
        regex_pattern = '|'.join([f'{key_word}' for key_word in description_indicators])
        filter = self.df.description.str.contains(regex_pattern, case=False, regex=True)  # Filter descriptions to identify keywords
        filter.fillna(False, inplace=True)  # Boolean filter
        bad_df = self.df[filter]  # Filter to produce bad_obs df
        self.df = self.df[~filter]  # Filter to remove bad_obs from df
        bad_df['image_quality'] = 'bad'  # Label bad data image quality
        return bad_df

    def remove_peripheral_columns(self):
        """ Method removes all peripheral columns before writing dataframe to interim_data.csv"""
        self.df = self.df[['observed_on', 'local_time_observed_at', 'latitude', 'longitude',
                           'positional_accuracy', 'public_positional_accuracy', 'image_url', 'license', 'geoprivacy',
                           'taxon_geoprivacy', 'scientific_name', 'common_name', 'taxon_id',
                           'taxon_kingdom_name', 'taxon_phylum_name', 'taxon_class_name', 'taxon_order_name',
                           'taxon_family_name', 'taxon_genus_name', 'taxon_species_name']]

    def format_bad_data(self, bad_df):
        """Method creates a sub-dataframe of only the image_url and the image quality (Index is observation ID)

        This subset creates the format that the bad quality images will be written to bad_quality.csv

        Args:
            bad_df (DataFrame): DataFrame containing all bad observations filtered from df

        Returns:
            (DataFrame): The method returns a DataFrame containing only the id, image_url, and image_quality columns (bad_data.csv format)
        """
        bad_df = bad_df[['image_url', 'image_quality']]
        return bad_df

    def write_bad_data(self, bad_df):
        """Method performs similar operation to the write_interim_data() method, in this case specifically writing bad data

        This method should be refactored in conjunction with write_interim_data in order to minimize code repetition

        Args:
            bad_df (DataFrame): DataFrame containing the sub-dataframe of only id, image_url, and image_quality columns
        """
        if not self.TEST:  # Identifies not in testing conditions
            self.bad_data_exists = os.path.isfile(self.write_path + self.bad_file)
            if self.bad_data_exists:  # If the file already exists, append the next batch   of bad data
                bad_df.to_csv(self.write_path + self.bad_file, mode='a', index=True, header=False)
            else:  # Write bad_data file as no file exists
                bad_df.to_csv(self.write_path + self.bad_file, mode='w', index=True, header=True)

    def write_interim_data(self):
        """ Method writes current state of df into interim data folder in csv format"""
        if not self.TEST:
            self.interim_exists = os.path.isfile(self.write_path + self.interim_file)
            if self.interim_exists:
                self.df.to_csv(self.write_path + self.interim_file, mode='a', index=True, header=False)
            else:
                self.df.to_csv(self.write_path + self.interim_file, mode='w', index=True, header=True)

__init__(datasets=['observations_sample.csv'], test_df=None)

Initializes the observation pipeline.

Note, it is recommended to process each data file one-by-one, but the pipeline has the capability to process several if required.

Parameters:

Name Type Description Default
datasets list

The list of datasets to process. The list should contain csv file names pointing to the dataset files.

['observations_sample.csv']
test_df DataFrame

The capability to pass a pre-generated dataframe for pipeline testing purposes

None
Source code in src/data/DataCleanPipeline.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __init__(self, datasets=['observations_sample.csv'], test_df=None):
    """ Initializes the observation pipeline.

    Note, it is recommended to process each data file one-by-one, but the pipeline has the capability to process several if required.

    Args:
        datasets (list): The list of datasets to process. The list should contain csv file names pointing to the dataset files.
        test_df (DataFrame): The capability to pass a pre-generated dataframe for pipeline testing purposes

    """
    if test_df is None:
        self.df_whole = pd.DataFrame()
        self.df = pd.DataFrame()
        self.datasets = datasets
        self.resource_path = root_dir() + "/data/obs_and_meta/raw/"
        self.write_path = root_dir() + "/data/obs_and_meta/interim/"
        self.interim_exists = os.path.isfile(self.write_path + self.interim_file)
        self.bad_data_exists = os.path.isfile(self.write_path + self.bad_file)
        self.row_sum = 0
        self.TEST = False
    else:
        self.df_whole = test_df.copy(deep=True)
        self.TEST = True
        self.row_sum = len(self.df_whole.index)

    self.start_time = datetime.now()

activate_flow()

Method details and executes the flow of the cleaning pipeline

Source code in src/data/DataCleanPipeline.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def activate_flow(self):
    """ Method details and executes the flow of the cleaning pipeline"""

    self.aggregate_observations()  # Aggregate all observation files

    self.enforce_unique_ids()  # No duplicate observations

    self.continuation()  # Continuation from interrupt/ start from scratch

    self.remove_na_working_columns()  # Remove any NaN types from columns undergoing computation

    while self.batching():  # Batching loop
        self.bad_data_separation()  # Detect bad quality observations from batch

        self.format_observation_dates()  # Format sighting dates

        self.generate_local_times()  # Generate local observation times

        self.remove_peripheral_columns()  # Remove peripheral columns

        self.write_interim_data()  # Write to interim data

aggregate_observations()

Method aggregates all observations from separate files, placing them within a df for manipulation

Method will check if dataframe is empty. This is to accommodate test cases which preloads the dataframe into the dataframe

Source code in src/data/DataCleanPipeline.py
85
86
87
88
89
90
91
92
93
94
def aggregate_observations(self):
    """Method aggregates all observations from separate files, placing them within a df for manipulation

    Method will check if dataframe is empty. This is to accommodate test cases which preloads the dataframe into
     the dataframe
    """
    if not self.TEST:
        for dataset in self.datasets:
            df_temp = pd.read_csv(self.resource_path + dataset)
            self.df_whole = pd.concat([self.df_whole, df_temp])  # Merge temp df into container df

bad_data_separation()

Method performs the sub-process of bad data separation, formatting, and writing to the bad_data file

Source code in src/data/DataCleanPipeline.py
265
266
267
268
269
def bad_data_separation(self):
    """Method performs the sub-process of bad data separation, formatting, and writing to the bad_data file"""
    bad_df = self.identify_bad_observations()
    bad_df = self.format_bad_data(bad_df)
    self.write_bad_data(bad_df)

batching()

This method creates observation batches from the aggregate observations in order to iteratively process and clean data.

This method processes each batch, iteratively writing to interim_data.csv. The DataFrame df_whole contains aggregate observations, while df contains the current batch. Each batch once generated is removed from df_whole until this dataframe is empty, concluding the batching process.

Returns:

Type Description
bool

Method returns a boolean value. True if there are still observations to be batched and processes. False if df_whole is empty

bool

indicating the batching process has concluded.

Source code in src/data/DataCleanPipeline.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def batching(self) -> bool:
    """ This method creates observation batches from the aggregate observations in order to iteratively process and clean
    data.

    This method processes each batch, iteratively writing to interim_data.csv.
    The DataFrame df_whole contains aggregate observations, while df contains the current batch.
    Each batch once generated is removed from df_whole until this dataframe is empty, concluding the batching process.

    Returns:
        (bool): Method returns a boolean value. True if there are still observations to be batched and processes. False if df_whole is empty
        indicating the batching process has concluded.
    """
    rows_remaining = len(self.df_whole.index)
    if not self.TEST: self.percentage(rows_remaining)

    if self.df_whole.empty:
        return False

    if rows_remaining > self.batch_size:
        self.df = self.df_whole.iloc[0:self.batch_size]
        self.df_whole = self.df_whole.iloc[self.batch_size:]
        return True
    else:
        self.df = self.df_whole
        self.df_whole = pd.DataFrame()
        return True

continuation(test_interim_df=None, test_bad_df=None)

Method determines the status of the Data Cleaning Pipeline, enabling continuation without redundancies if the cleaning process is interrupted.

This method enables testing through the test_interim_df dataframe that can be passed to it. However, within the pipeline this is automated, and no parameter is required. Method accesses the interim_data.csv file, and identifies already processes observations, removing them from the current cleaning proces.

Parameters:

Name Type Description Default
test_interim_df DataFrame

This dataframe is None during Pipeline cleaning process, however it allows for the creation of an interim dataframe for testing purposes (Not running the entire pipeline.

None
test_bad_df DataFrame

This dataframe is None during the Pipeline process cleaning process, however is allows for the creation of a test_bad_df for testing purposes.

None
Source code in src/data/DataCleanPipeline.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def continuation(self, test_interim_df=None, test_bad_df=None):
    """ Method determines the status of the Data Cleaning Pipeline, enabling continuation without redundancies if
    the cleaning process is interrupted.

    This method enables testing through the test_interim_df dataframe that can be passed to it.
    However, within the pipeline this is automated, and no parameter is required.
    Method accesses the interim_data.csv file, and identifies already processes observations, removing them from the
    current cleaning proces.

    Args:
        test_interim_df (DataFrame): This dataframe is None during Pipeline cleaning process, however it allows for the creation of an interim dataframe for testing purposes (Not running the entire pipeline.
        test_bad_df (DataFrame): This dataframe is None during the Pipeline process cleaning process, however is allows for the creation of a test_bad_df for testing purposes.
    """
    self.df_whole.set_index('id', inplace=True)

    interim_df = pd.DataFrame()
    bad_quality_df = pd.DataFrame()

    if self.TEST and test_interim_df is not None:  # Conditions for test cases
        interim_df = pd.concat([interim_df, test_interim_df])
        bad_quality_df = pd.concat([bad_quality_df, test_bad_df])

    if not self.TEST and self.interim_exists:  # Non-test conditions when interim data file exists
        interim_df = pd.read_csv(self.write_path + self.interim_file)
    if not self.TEST and self.bad_data_exists:  # Non-test conditions when bad_quality data file exists
        bad_quality_df = pd.read_csv(self.write_path + self.bad_file)

    if not interim_df.empty:  # Removal of duplicate rows already written to interim data
        interim_df.set_index('id', inplace=True)
        self.df_whole = self.df_whole.loc[self.df_whole.index.difference(interim_df.index), ]

    if not bad_quality_df.empty:  # Removal of duplicate rows already written to bad_quality data
        bad_quality_df.set_index('id', inplace=True)
        self.df_whole = self.df_whole.loc[self.df_whole.index.difference(bad_quality_df.index), ]

    self.row_sum = len(self.df_whole.index)

coordinate_to_country_rate_limited()

Method takes data coordinates, and identifies the country of origin, creating a Country column within Interim data

Note, this method is currently left out of the pipeline due to Nominitim rate limits.

The Nomanitim geocoding API is utilized. Due to rate limiting of 1 request per second, a rate limiter has been introduced in order to respect the limits.

This method modifies the current batch to include a country column with the country of observation specified for each observation.

Source code in src/data/DataCleanPipeline.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def coordinate_to_country_rate_limited(self):
    """ Method takes data coordinates, and identifies the country of origin, creating a Country column within Interim data

    Note, this method is currently left out of the pipeline due to Nominitim rate limits.

    The Nomanitim geocoding API is utilized.
    Due to rate limiting of 1 request per second, a rate limiter has been introduced in order to respect the limits.

    This method modifies the current batch to include a country column with the country of observation specified for each observation.
    """
    # Set up the geolocation library
    geolocator = Nominatim(user_agent="Spatio_Tempt_Class")
    geocode = RateLimiter(geolocator.reverse, min_delay_seconds=2)

    # Combine lat and long into coordinates
    latitudes = pd.Series(self.df.latitude.values.astype(str))  # Generate a Series of latitude values as strings
    longitudes = pd.Series(self.df.longitude.values.astype(str))  # Generate a Series of longitude values as strings
    coordinates = latitudes + ', ' + longitudes  # Combined the latitudes and longitude values into a single coordinate Series

    # Retrieve countries from coordinates (rate limiting requests)
    locations = coordinates.apply(partial(geocode, language='en', exactly_one=True))
    self.df['country'] = locations.apply(lambda x: x.raw['address']['country']).values

enforce_unique_ids()

Removal of any duplicate observations utilizing their observation id

In place duplicate removal such that changes are effected directly within df

Source code in src/data/DataCleanPipeline.py
 96
 97
 98
 99
100
101
def enforce_unique_ids(self):
    """Removal of any duplicate observations utilizing their observation id

    In place duplicate removal such that changes are effected directly within df
    """
    self.df_whole.drop_duplicates(subset=['id'], keep='first', inplace=True)

format_bad_data(bad_df)

Method creates a sub-dataframe of only the image_url and the image quality (Index is observation ID)

This subset creates the format that the bad quality images will be written to bad_quality.csv

Parameters:

Name Type Description Default
bad_df DataFrame

DataFrame containing all bad observations filtered from df

required

Returns:

Type Description
DataFrame

The method returns a DataFrame containing only the id, image_url, and image_quality columns (bad_data.csv format)

Source code in src/data/DataCleanPipeline.py
303
304
305
306
307
308
309
310
311
312
313
314
315
def format_bad_data(self, bad_df):
    """Method creates a sub-dataframe of only the image_url and the image quality (Index is observation ID)

    This subset creates the format that the bad quality images will be written to bad_quality.csv

    Args:
        bad_df (DataFrame): DataFrame containing all bad observations filtered from df

    Returns:
        (DataFrame): The method returns a DataFrame containing only the id, image_url, and image_quality columns (bad_data.csv format)
    """
    bad_df = bad_df[['image_url', 'image_quality']]
    return bad_df

format_observation_dates()

Method ensures that raw data dates follow format yyyy-mm-dd. If the dates deviate they are removed from the dataframe.

In place changes are effected within the classes dataframe df. Any date errors produced by incorrect date formats are transformed into NaT values, and the row removed from df Removal of the rows requires an index reset in order to facilitate testing operations on the cleaning pipeline.

Source code in src/data/DataCleanPipeline.py
199
200
201
202
203
204
205
206
207
208
209
210
211
def format_observation_dates(self):
    """ Method ensures that raw data dates follow format yyyy-mm-dd. If the dates deviate they are removed from the dataframe.

    In place changes are effected within the classes dataframe df.
    Any date errors produced by incorrect date formats are transformed into NaT values, and the row removed from df
    Removal of the rows requires an index reset in order to facilitate testing operations on the cleaning pipeline.
    """
    self.df['observed_on'] = pd.to_datetime(self.df['observed_on'],
                                            format='%Y-%m-%d',
                                            yearfirst=True,
                                            errors='coerce',
                                            exact=True).astype(str)
    self.df.query('observed_on != "NaT"', inplace=True)

generate_local_times()

Method converts UTC time to correct local time utilizing the specified sighting timezone.

The new column generated is labelled "local_time_observed_at" and can be found in interim data folder.

The date conversion utilizes both date and time in the UTC format. This means, any day change (midnight -> next day) are accounted for. The observed_on column is correct.

Source code in src/data/DataCleanPipeline.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def generate_local_times(self):
    """ Method converts UTC time to correct local time utilizing the specified sighting timezone.

    The new column generated is labelled "local_time_observed_at" and can be found in interim data folder.

    The date conversion utilizes both date and time in the UTC format.
    This means, any day change (midnight -> next day) are accounted for.
    The observed_on column is correct.
    """

    # Standardize time zone formats
    self.standardize_timezones()

    # Generate local times by converting UTC to specified time zones
    self.df['local_time_observed_at'] = self.df.apply(
        lambda x: pd.to_datetime(x['time_observed_at'], utc=True).astimezone(pytz.timezone(x['time_zone'])),
        axis=1).astype(str)

identify_bad_observations()

Method identifies probable bad quality images based on keyword extraction from the descriptions.

The observations with identified keywords are excluded from the interim data, and instead written to the bad_data.csv file for manual filtering. Keywords list is created based on inspection of observation images and the descriptions of each observation.

Keyword pattern identification is accomplished through the use of regex pattern matching

Returns:

Type Description
DataFrame

The method returns a DataFrame containing the identified potentially bad observations from within the current batch.

Source code in src/data/DataCleanPipeline.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def identify_bad_observations(self):
    """Method identifies probable bad quality images based on keyword extraction from the descriptions.

    The observations with identified keywords are excluded from the interim data, and instead written to the bad_data.csv file for manual filtering.
    Keywords list is created based on inspection of observation images and the descriptions of each observation.

    Keyword pattern identification is accomplished through the use of regex pattern matching

    Returns:
        (DataFrame): The method returns a DataFrame containing the identified potentially bad observations from within the current batch.
    """
    description_indicators = ['dead', 'road kill', 'road', 'scat', 'poo', 'killed', 'spoor', 'road-kill', 'remains',
                              'body', 'deceased', 'prey', 'fatality', 'tracks', 'trapped', 'bad', 'roadkilled', 'poop',
                              'crushed', 'kill', 'squashed', 'terrible', 'caught', 'pool', 'blurry', 'destroyed',
                              'sidewalk',
                              'grounded']
    regex_pattern = '|'.join([f'{key_word}' for key_word in description_indicators])
    filter = self.df.description.str.contains(regex_pattern, case=False, regex=True)  # Filter descriptions to identify keywords
    filter.fillna(False, inplace=True)  # Boolean filter
    bad_df = self.df[filter]  # Filter to produce bad_obs df
    self.df = self.df[~filter]  # Filter to remove bad_obs from df
    bad_df['image_quality'] = 'bad'  # Label bad data image quality
    return bad_df

percentage(rows_remaining)

Method generates and updates a status bar based on the progress of batching.

Both percentage complete and running time metrics are displayed Method inspiration: Inspiration: https://www.geeksforgeeks.org/progress-bars-in-python/

Parameters:

Name Type Description Default
rows_remaining int

The number of rows remaining to be processed in the df_whole DataFrame.

required
Source code in src/data/DataCleanPipeline.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def percentage(self, rows_remaining):
    """ Method generates and updates a status bar based on the progress of batching.

    Both percentage complete and running time metrics are displayed
    Method inspiration: Inspiration: https://www.geeksforgeeks.org/progress-bars-in-python/

    Args:
        rows_remaining (int): The number of rows remaining to be processed in the df_whole DataFrame.
    """
    progress_bar_length = 100
    percentage_complete = (self.row_sum - rows_remaining) / self.row_sum
    filled = int(progress_bar_length * percentage_complete)
    running_time = datetime.now() - self.start_time

    bar = '=' * filled + '-' * (progress_bar_length - filled)
    percentage_display = round(100 * percentage_complete, 1)
    sys.stdout.write('\r[%s] %s%s ... running: %s' % (bar, percentage_display, '%', running_time))
    sys.stdout.flush()

remove_na_working_columns()

This method removes all rows with NaN values, specifically located within columns used for computation that require values.

The 'working columns' include date, time, time zone, and coordinates. If the removal creates an empty dataframe, the method exists execution, displaying an exit message.

Source code in src/data/DataCleanPipeline.py
140
141
142
143
144
145
146
147
148
149
150
151
def remove_na_working_columns(self):
    """ This method removes all rows with NaN values, specifically located within columns used for computation that require
    values.

     The 'working columns' include date, time, time zone, and coordinates.
     If the removal creates an empty dataframe, the method exists execution, displaying an exit message.
    """
    self.df_whole.dropna(subset=['observed_on', 'latitude', 'longitude', 'time_observed_at', 'time_zone'],
                         inplace=True)
    if self.df_whole.empty:
        print("*********** No further correctly format to process ***********")
        sys.exit()

remove_peripheral_columns()

Method removes all peripheral columns before writing dataframe to interim_data.csv

Source code in src/data/DataCleanPipeline.py
295
296
297
298
299
300
301
def remove_peripheral_columns(self):
    """ Method removes all peripheral columns before writing dataframe to interim_data.csv"""
    self.df = self.df[['observed_on', 'local_time_observed_at', 'latitude', 'longitude',
                       'positional_accuracy', 'public_positional_accuracy', 'image_url', 'license', 'geoprivacy',
                       'taxon_geoprivacy', 'scientific_name', 'common_name', 'taxon_id',
                       'taxon_kingdom_name', 'taxon_phylum_name', 'taxon_class_name', 'taxon_order_name',
                       'taxon_family_name', 'taxon_genus_name', 'taxon_species_name']]

standardize_timezones()

Method generated timezones in a format accepted by the pytz library for use in the local time zone conversion

This method utilizes the observation coordinates to return the time zone of the sighting. This timezone overwrites the "time_zone" column

Source code in src/data/DataCleanPipeline.py
254
255
256
257
258
259
260
261
262
263
def standardize_timezones(self):
    """ Method generated timezones in a format accepted by the pytz library for use in the local time zone conversion

    This method utilizes the observation coordinates to return the time zone of the sighting.
    This timezone overwrites the "time_zone" column
    """
    finder = TimezoneFinder()

    self.df['time_zone'] = self.df.apply(
        lambda x: finder.timezone_at(lat=x['latitude'], lng=x['longitude']), axis=1)

write_bad_data(bad_df)

Method performs similar operation to the write_interim_data() method, in this case specifically writing bad data

This method should be refactored in conjunction with write_interim_data in order to minimize code repetition

Parameters:

Name Type Description Default
bad_df DataFrame

DataFrame containing the sub-dataframe of only id, image_url, and image_quality columns

required
Source code in src/data/DataCleanPipeline.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def write_bad_data(self, bad_df):
    """Method performs similar operation to the write_interim_data() method, in this case specifically writing bad data

    This method should be refactored in conjunction with write_interim_data in order to minimize code repetition

    Args:
        bad_df (DataFrame): DataFrame containing the sub-dataframe of only id, image_url, and image_quality columns
    """
    if not self.TEST:  # Identifies not in testing conditions
        self.bad_data_exists = os.path.isfile(self.write_path + self.bad_file)
        if self.bad_data_exists:  # If the file already exists, append the next batch   of bad data
            bad_df.to_csv(self.write_path + self.bad_file, mode='a', index=True, header=False)
        else:  # Write bad_data file as no file exists
            bad_df.to_csv(self.write_path + self.bad_file, mode='w', index=True, header=True)

write_interim_data()

Method writes current state of df into interim data folder in csv format

Source code in src/data/DataCleanPipeline.py
332
333
334
335
336
337
338
339
def write_interim_data(self):
    """ Method writes current state of df into interim data folder in csv format"""
    if not self.TEST:
        self.interim_exists = os.path.isfile(self.write_path + self.interim_file)
        if self.interim_exists:
            self.df.to_csv(self.write_path + self.interim_file, mode='a', index=True, header=False)
        else:
            self.df.to_csv(self.write_path + self.interim_file, mode='w', index=True, header=True)