diff --git a/.gitignore b/.gitignore index df617f797..c3184ddbc 100644 --- a/.gitignore +++ b/.gitignore @@ -13,4 +13,5 @@ development.ini node_modules *.project .eggs -.vscode/ \ No newline at end of file +.vscode/ +.idea/ \ No newline at end of file diff --git a/ckanext/harvest/cli.py b/ckanext/harvest/cli.py index 20a750447..594ef6a58 100644 --- a/ckanext/harvest/cli.py +++ b/ckanext/harvest/cli.py @@ -108,26 +108,23 @@ def clear(ctx, id): @source.command() @click.argument(u"id", metavar=u"SOURCE_ID_OR_NAME", required=False) -@click.option( - "-k", - "--keep-current", - default=False -) @click.pass_context -def clear_history(ctx, id, keep_current): +def clear_history(ctx, id): """If no source id is given the history for all harvest sources (maximum is 1000) will be cleared. - Clears all jobs and objects related to a harvest source, but keeps - the source itself. The datasets imported from the harvest source - will NOT be deleted!!! If a source id is given, it only clears + Clears all jobs and out-of-date objects related to a harvest source, but keeps + the source itself and a history of what has been harvested already. + The datasets imported from the harvest source will NOT be deleted!!! + + If a source id is given, it only clears the history of the harvest source with the given source id. """ flask_app = ctx.meta["flask_app"] with flask_app.test_request_context(): - result = utils.clear_harvest_source_history(id, bool(keep_current)) + result = utils.clear_harvest_source_history(id) click.secho(result, fg="green") diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index daa482aea..fe70db4e6 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -190,14 +190,6 @@ def __init__(self, name): will be aborted. You can use comma as a separator to provide multiple source_id's""", ) - self.parser.add_option( - "-k", - "--keep-current", - dest="keep_current", - default=False, - help="Do not delete relevant harvest objects", - ) - def command(self): self._load_config() @@ -324,12 +316,11 @@ def create_harvest_source(self): print(result) def clear_harvest_source_history(self): - keep_current = bool(self.options.keep_current) source_id = None if len(self.args) >= 2: source_id = unicode_safe(self.args[1]) - print(utils.clear_harvest_source_history(source_id, keep_current)) + print(utils.clear_harvest_source_history(source_id)) def show_harvest_source(self): diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index 090e6e155..93d2224d7 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -313,16 +313,14 @@ def harvest_abort_failed_jobs(context, data_dict): def harvest_sources_job_history_clear(context, data_dict): ''' - Clears the history for all active harvest sources. All jobs and objects related to a harvest source will - be cleared, but keeps the source itself. + Clears the history for all active harvest sources. All non-running jobs and non-current harvest objects will + be cleared, but keeps the source itself and the most current harvest objects. This is useful to clean history of long running harvest sources to start again fresh. The datasets imported from the harvest source will NOT be deleted!!! ''' check_access('harvest_sources_clear', context, data_dict) - keep_current = data_dict.get('keep_current', False) - job_history_clear_results = [] # We assume that the maximum of 1000 (hard limit) rows should be enough result = logic.get_action('package_search')(context, {'fq': '+dataset_type:harvest', 'rows': 1000}) @@ -331,7 +329,7 @@ def harvest_sources_job_history_clear(context, data_dict): for data_dict in harvest_packages: try: clear_result = get_action('harvest_source_job_history_clear')( - context, {'id': data_dict['id'], 'keep_current': keep_current}) + context, {'id': data_dict['id']}) job_history_clear_results.append(clear_result) except NotFound: # Ignoring not existent harvest sources because of a possibly corrupt search index @@ -343,7 +341,7 @@ def harvest_sources_job_history_clear(context, data_dict): def harvest_source_job_history_clear(context, data_dict): ''' - Clears all jobs and objects related to a harvest source, but keeps the source itself. + Clears all jobs and out-of-date harvest objects from a harvest source. This is useful to clean history of long running harvest sources to start again fresh. The datasets imported from the harvest source will NOT be deleted!!! @@ -354,7 +352,6 @@ def harvest_source_job_history_clear(context, data_dict): check_access('harvest_source_clear', context, data_dict) harvest_source_id = data_dict.get('id', None) - keep_current = data_dict.get('keep_current', False) source = HarvestSource.get(harvest_source_id) if not source: @@ -365,49 +362,26 @@ def harvest_source_job_history_clear(context, data_dict): model = context['model'] - if keep_current: - sql = '''BEGIN; + sql = '''BEGIN; DELETE FROM harvest_object_error WHERE harvest_object_id IN (SELECT id FROM harvest_object AS obj WHERE harvest_source_id = '{harvest_source_id}' AND current != true AND (NOT EXISTS (SELECT id FROM harvest_job WHERE id = obj.harvest_job_id - AND status = 'Running')) - AND (NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = obj.harvest_job_id - AND current = true)) - ); + AND status = 'Running'))); DELETE FROM harvest_object_extra WHERE harvest_object_id IN (SELECT id FROM harvest_object AS obj WHERE harvest_source_id = '{harvest_source_id}' AND current != true AND (NOT EXISTS (SELECT id FROM harvest_job WHERE id = obj.harvest_job_id - AND status = 'Running')) - AND (NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = obj.harvest_job_id - AND current = true)) - ); + AND status = 'Running'))); DELETE FROM harvest_object AS obj WHERE harvest_source_id = '{harvest_source_id}' AND current != true AND (NOT EXISTS (SELECT id FROM harvest_job WHERE id = obj.harvest_job_id - AND status = 'Running')) - AND (NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = obj.harvest_job_id - AND current = true)); + AND status = 'Running')); DELETE FROM harvest_gather_error WHERE harvest_job_id IN (SELECT id FROM harvest_job AS job WHERE source_id = '{harvest_source_id}' - AND job.status != 'Running' - AND NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = job.id)); + AND job.status != 'Running'); DELETE FROM harvest_job AS job WHERE source_id = '{harvest_source_id}' - AND job.status != 'Running' - AND NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = job.id); - COMMIT; - '''.format(harvest_source_id=harvest_source_id) - else: - sql = '''BEGIN; - DELETE FROM harvest_object_error WHERE harvest_object_id - IN (SELECT id FROM harvest_object WHERE harvest_source_id = '{harvest_source_id}'); - DELETE FROM harvest_object_extra WHERE harvest_object_id - IN (SELECT id FROM harvest_object WHERE harvest_source_id = '{harvest_source_id}'); - DELETE FROM harvest_object WHERE harvest_source_id = '{harvest_source_id}'; - DELETE FROM harvest_gather_error WHERE harvest_job_id - IN (SELECT id FROM harvest_job WHERE source_id = '{harvest_source_id}'); - DELETE FROM harvest_job WHERE source_id = '{harvest_source_id}'; + AND job.status != 'Running'; COMMIT; '''.format(harvest_source_id=harvest_source_id) diff --git a/ckanext/harvest/model/__init__.py b/ckanext/harvest/model/__init__.py index 69ecafb5d..42ceeb0c8 100644 --- a/ckanext/harvest/model/__init__.py +++ b/ckanext/harvest/model/__init__.py @@ -28,6 +28,7 @@ 'HarvestObject', 'harvest_object_table', 'HarvestGatherError', 'harvest_gather_error_table', 'HarvestObjectError', 'harvest_object_error_table', + 'HarvestObjectExtra', 'harvest_object_extra_table', 'HarvestLog', 'harvest_log_table' ] @@ -356,7 +357,7 @@ def define_harvester_tables(): Column('state', types.UnicodeText, default=u'WAITING'), Column('metadata_modified_date', types.DateTime), Column('retry_times', types.Integer, default=0), - Column('harvest_job_id', types.UnicodeText, ForeignKey('harvest_job.id')), + Column('harvest_job_id', types.UnicodeText, ForeignKey('harvest_job.id', ondelete='SET NULL'), nullable=True), Column('harvest_source_id', types.UnicodeText, ForeignKey('harvest_source.id')), Column('package_id', types.UnicodeText, ForeignKey('package.id', deferrable=True), nullable=True), diff --git a/ckanext/harvest/tests/test_action.py b/ckanext/harvest/tests/test_action.py index d2927d599..bf5ec7f18 100644 --- a/ckanext/harvest/tests/test_action.py +++ b/ckanext/harvest/tests/test_action.py @@ -296,45 +296,6 @@ def test_harvest_sources_job_history_clear(self): data_dict['url'] = 'http://another-url' source_2 = factories.HarvestSourceObj(**data_dict) - job_1 = factories.HarvestJobObj(source=source_1) - dataset_1 = ckan_factories.Dataset() - object_1_ = factories.HarvestObjectObj(job=job_1, source=source_1, - package_id=dataset_1['id']) - job_2 = factories.HarvestJobObj(source=source_2) - dataset_2 = ckan_factories.Dataset() - object_2_ = factories.HarvestObjectObj(job=job_2, source=source_2, - package_id=dataset_2['id']) - - # execute - context = {'session': model.Session, - 'ignore_auth': True, 'user': ''} - result = get_action('harvest_sources_job_history_clear')( - context, {}) - - # verify - assert sorted(result, key=lambda item: item['id']) == sorted( - [{'id': source_1.id}, {'id': source_2.id}], key=lambda item: item['id']) - assert harvest_model.HarvestSource.get(source_1.id) - assert harvest_model.HarvestJob.get(job_1.id) is None - assert harvest_model.HarvestObject.get(object_1_.id) is None - dataset_from_db_1 = model.Package.get(dataset_1['id']) - assert dataset_from_db_1 - assert dataset_from_db_1.id == dataset_1['id'] - assert harvest_model.HarvestSource.get(source_2.id) - assert harvest_model.HarvestJob.get(job_2.id) is None - assert harvest_model.HarvestObject.get(object_2_.id) is None - dataset_from_db_2 = model.Package.get(dataset_2['id']) - assert dataset_from_db_2 - assert dataset_from_db_2.id == dataset_2['id'] - - def test_harvest_sources_job_history_clear_keep_current(self): - # prepare - data_dict = SOURCE_DICT.copy() - source_1 = factories.HarvestSourceObj(**data_dict) - data_dict['name'] = 'another-source' - data_dict['url'] = 'http://another-url' - source_2 = factories.HarvestSourceObj(**data_dict) - job_1 = factories.HarvestJobObj(source=source_1) dataset_1 = ckan_factories.Dataset() object_1_ = factories.HarvestObjectObj(job=job_1, source=source_1, @@ -353,15 +314,15 @@ def test_harvest_sources_job_history_clear_keep_current(self): context = {'model': model, 'session': model.Session, 'ignore_auth': True, 'user': ''} result = get_action('harvest_sources_job_history_clear')( - context, {'keep_current': True}) + context, {}) # verify assert sorted(result, key=lambda item: item['id']) == sorted( [{'id': source_1.id}, {'id': source_2.id}], key=lambda item: item['id']) - # dataset, related source, object and job still persist! + # dataset, related source, object still persist, job is deleted! assert harvest_model.HarvestSource.get(source_1.id) - assert harvest_model.HarvestJob.get(job_1.id) + assert not harvest_model.HarvestJob.get(job_1.id) assert harvest_model.HarvestObject.get(object_1_.id) dataset_from_db_1 = model.Package.get(dataset_1['id']) assert dataset_from_db_1 @@ -372,50 +333,7 @@ def test_harvest_sources_job_history_clear_keep_current(self): assert not harvest_model.HarvestJob.get(job_2.id) assert not harvest_model.HarvestObject.get(object_2_.id) - def test_harvest_source_job_history_clear_keep_current(self): - # prepare - source = factories.HarvestSourceObj(**SOURCE_DICT.copy()) - job = factories.HarvestJobObj(source=source) - dataset = ckan_factories.Dataset() - object_ = factories.HarvestObjectObj(job=job, source=source, - package_id=dataset['id']) - - data_dict = SOURCE_DICT.copy() - data_dict['name'] = 'another-source' - data_dict['url'] = 'http://another-url' - source2 = factories.HarvestSourceObj(**data_dict) - job2 = factories.HarvestJobObj(source=source2) - dataset2 = ckan_factories.Dataset() - object_2_ = factories.HarvestObjectObj(job=job2, source=source2, - package_id=dataset2['id']) - - setattr(object_, 'report_status', 'added') - setattr(object_, 'current', True) - model.Session.commit() - - # execute - context = {'model': model, 'session': model.Session, - 'ignore_auth': True, 'user': ''} - result = get_action('harvest_source_job_history_clear')( - context, {'id': source.id, 'keep_current': True}) - - # verify - assert result == {'id': source.id} - assert harvest_model.HarvestSource.get(source.id) - assert harvest_model.HarvestJob.get(job.id) - assert harvest_model.HarvestObject.get(object_.id) - dataset_from_db = model.Package.get(dataset['id']) - assert dataset_from_db - assert dataset_from_db.id == dataset['id'] - # source2 and related objects are untouched - assert harvest_model.HarvestSource.get(source2.id) - assert harvest_model.HarvestJob.get(job2.id) - assert harvest_model.HarvestObject.get(object_2_.id) - dataset_from_db_2 = model.Package.get(dataset2['id']) - assert dataset_from_db_2 - assert dataset_from_db_2.id == dataset2['id'] - - def test_harvest_source_job_history_clear_keep_current_finished_jobs(self): + def test_harvest_source_job_history_clear_deletes_current_finished_jobs(self): # prepare source = factories.HarvestSourceObj(**SOURCE_DICT.copy()) job = factories.HarvestJobObj(source=source) @@ -439,7 +357,7 @@ def test_harvest_source_job_history_clear_keep_current_finished_jobs(self): context = {'model': model, 'session': model.Session, 'ignore_auth': True, 'user': ''} result = get_action('harvest_source_job_history_clear')( - context, {'id': source.id, 'keep_current': True}) + context, {'id': source.id}) # verify assert result == {'id': source.id} @@ -449,8 +367,9 @@ def test_harvest_source_job_history_clear_keep_current_finished_jobs(self): dataset_from_db = model.Package.get(dataset['id']) assert dataset_from_db assert dataset_from_db.id == dataset['id'] - # job2 and related objects are untouched - assert harvest_model.HarvestJob.get(job2.id) + + # job2 is deleted, but harvest objects are kept + assert not harvest_model.HarvestJob.get(job2.id) assert harvest_model.HarvestObject.get(object_2_.id) dataset_from_db_2 = model.Package.get(dataset2['id']) assert dataset_from_db_2 @@ -488,17 +407,19 @@ def test_harvest_source_job_history_clear_keep_current_running_job(self): context = {'model': model, 'session': model.Session, 'ignore_auth': True, 'user': ''} result = get_action('harvest_source_job_history_clear')( - context, {'id': source.id, 'keep_current': True}) + context, {'id': source.id}) - # verify that both jobs still exists + # verify first job and non-current objects are deleted, but any current objects are kept assert result == {'id': source.id} assert harvest_model.HarvestSource.get(source.id) - assert harvest_model.HarvestJob.get(job1.id) - assert harvest_model.HarvestObject.get(object_1_.id) + assert not harvest_model.HarvestJob.get(job1.id) + assert not harvest_model.HarvestObject.get(object_1_.id) assert harvest_model.HarvestObject.get(object_2_.id) dataset_from_db = model.Package.get(dataset1['id']) assert dataset_from_db assert dataset_from_db.id == dataset1['id'] + + # verify that second job still exists and all harvest objects are kept assert harvest_model.HarvestJob.get(job2.id) assert harvest_model.HarvestObject.get(object_3_.id) assert harvest_model.HarvestObject.get(object_4_.id) diff --git a/ckanext/harvest/utils.py b/ckanext/harvest/utils.py index 7acaaea59..fcfef6f3f 100644 --- a/ckanext/harvest/utils.py +++ b/ckanext/harvest/utils.py @@ -206,7 +206,7 @@ def clear_harvest_source(source_id_or_name): tk.get_action("harvest_source_clear")(context, {"id": source["id"]}) -def clear_harvest_source_history(source_id, keep_current): +def clear_harvest_source_history(source_id): context = { "model": model, @@ -216,18 +216,12 @@ def clear_harvest_source_history(source_id, keep_current): if source_id is not None: tk.get_action("harvest_source_job_history_clear")(context, { "id": source_id, - "keep_current": keep_current - }) + }) return "Cleared job history of harvest source: {0}".format(source_id) else: - # Purge queues, because we clean all harvest jobs and - # objects in the database. - if not keep_current: - purge_queues() + # If source is not given, apply to all sources cleared_sources_dicts = tk.get_action( - "harvest_sources_job_history_clear")(context, { - "keep_current": keep_current - }) + "harvest_sources_job_history_clear")(context) return "Cleared job history for all harvest sources: {0} source(s)".format( len(cleared_sources_dicts))