Source code for scoring_metrics

import os
import sys
import logging as log

import json

import pandas as pd

[docs]def new_score_card( fuzzy_flags = [ 'exact' ] , normalization_engines = [] ): score_card = {} for fuzzy_flag in fuzzy_flags: score_card[ fuzzy_flag ] = pd.DataFrame( columns = [ 'File' , 'Start' , 'End' , 'Type' , 'Pivot' , 'Score' ] ) for ref_engine , test_engine in normalization_engines: score_card[ ref_engine ] = {} for fuzzy_flag in fuzzy_flags: score_card[ ref_engine ][ fuzzy_flag ] = pd.DataFrame( columns = [ 'File' , 'Start' , 'End' , 'Type' , 'Pivot' , 'Score' ] ) return score_card
[docs]def get_annotation_from_base_entry( annotation_entry , start_key , end_key ): try: annotation_type = annotation_entry[ 'type' ] except KeyError as e: log.warning( 'Could not access annotation type. Skipping entry.' ) return None , None , None if( 'parity' in annotation_entry ): log.debug( '{} ( -1 )'.format( annotation_type ) ) return annotation_type , -1 , -1 try: annotation_start = annotation_entry[ start_key ] try: annotation_start = int( annotation_start ) except ValueError: log.debug( 'Annotation start position could not be converted to int. Treating as a string: {}'.format( annotation_start ) ) except KeyError as e: log.warning( 'Could not access annotation start key. Skipping entry.' ) return None , None , None try: annotation_end = annotation_entry[ end_key ] try: annotation_end = int( annotation_end ) except ValueError: log.debug( 'Annotation end position could not be converted to int. Treating as a string: {}'.format( annotation_end ) ) except KeyError as e: log.warning( 'Could not access annotation end key. Skipping entry.' ) return None , None , None log.debug( '{} ( {} - {} )'.format( annotation_type , annotation_start , annotation_end ) ) return annotation_type , annotation_start , annotation_end
## ## ##
[docs]def flatten_ss_dictionary( ss_dictionary , category = '(unknown)' ): log.debug( "Entering '{}'".format( sys._getframe().f_code.co_name ) ) all_keys = list( ss_dictionary ) if( len( all_keys ) == 0 ): log.debug( 'Zero {} keys in strict starts dictionary'.format( category ) ) else: all_keys.sort( key = int ) log.debug( '{} {} keys ranging from {} to {}'.format( len( all_keys ) , category , all_keys[ 0 ] , all_keys[ -1 ] ) ) ## flat_entries = [] for this_key in all_keys: for annot_index in range( len( ss_dictionary[ this_key ] ) ): flat_entries.append( ss_dictionary[ this_key ][ annot_index ] ) log.debug( "Leaving '{}'".format( sys._getframe().f_code.co_name ) ) return flat_entries
[docs]def update_confusion_matrix( confusion_matrix , fuzzy_flag , ref_type , test_type ): ########################## ## Before we touch a cell ## in the confusion matrix, ## we need to make sure that ## the full path to it exists if( fuzzy_flag not in confusion_matrix ): confusion_matrix[ fuzzy_flag ] = {} ## if( ref_type not in confusion_matrix[ fuzzy_flag ] ): confusion_matrix[ fuzzy_flag ][ ref_type ] = {} ## if( test_type not in confusion_matrix[ fuzzy_flag ][ ref_type ] ): confusion_matrix[ fuzzy_flag ][ ref_type ][ test_type ] = 0 ########################## ## Update the correct cell ## of the confusion matrix confusion_matrix[ fuzzy_flag ][ ref_type ][ test_type ] += 1
[docs]def update_score_card( condition , score_card , fuzzy_flag , filename , start_pos , end_pos , type , pivot_value = None , ref_annot = None , test_annot = None , scorable_attributes = None , scorable_engines = None , norm_synonyms = {} ): score_card[ fuzzy_flag ].loc[ score_card[ fuzzy_flag ].shape[ 0 ] ] = \ [ filename , start_pos , end_pos , type , pivot_value , condition ] if( condition != 'TP' ): return ## TODO - add flag for an additional entry when ALL scorable_attributes are correct for ref_attribute, test_attribute in scorable_attributes: ## Skip entries for which the attribute wasn't extracted in ## either the ref or system annotation if( ref_attribute not in ref_annot or test_attribute not in test_annot ): continue ## TODO - add flag that treats TN and TP results both at TP if( ref_annot[ ref_attribute ] == test_annot[ test_attribute ] ): if( ref_annot[ ref_attribute ] == 'true' ): score_card[ fuzzy_flag ].loc[ score_card[ fuzzy_flag ].shape[ 0 ] ] = \ [ filename , start_pos , end_pos , type , ref_attribute , 'TP' ] else: score_card[ fuzzy_flag ].loc[ score_card[ fuzzy_flag ].shape[ 0 ] ] = \ [ filename , start_pos , end_pos , type , ref_attribute , 'TN' ] elif( ref_annot[ ref_attribute ] == 'true' ): score_card[ fuzzy_flag ].loc[ score_card[ fuzzy_flag ].shape[ 0 ] ] = \ [ filename , start_pos , end_pos , type , ref_attribute , 'FN' ] else: score_card[ fuzzy_flag ].loc[ score_card[ fuzzy_flag ].shape[ 0 ] ] = \ [ filename , start_pos , end_pos , type , ref_attribute , 'FP' ] ## Loop over all scorable normalization engines in the score_card for ref_engine , test_engine in scorable_engines: ## Skip normalization engines that don't have a score ## card associated with them if( ref_engine not in score_card ): continue ## TODO - add flag that treats TN and TP results both at TP ## If neither the ref nor the system annotation have a normalization ## entry for this engine, keep going. We can also consider this entry ## a TN for the normalization engine in question. if( ref_engine not in ref_annot and test_engine not in test_annot ): score_card[ ref_engine ][ fuzzy_flag ].loc[ score_card[ ref_engine ][ fuzzy_flag ].shape[ 0 ] ] = \ [ filename , start_pos , end_pos , type , None , 'TN' ] elif( test_engine not in test_annot ): ## If we don't have a normalized entry in the test, ## this is a FN score_card[ ref_engine ][ fuzzy_flag ].loc[ score_card[ ref_engine ][ fuzzy_flag ].shape[ 0 ] ] = \ [ filename , start_pos , end_pos , type , ref_annot[ ref_engine ] , 'FN' ] elif( ref_engine not in ref_annot ): ## If we don't have a normalized entry in the reference, ## this is a FP score_card[ ref_engine ][ fuzzy_flag ].loc[ score_card[ ref_engine ][ fuzzy_flag ].shape[ 0 ] ] = \ [ filename , start_pos , end_pos , type , test_annot[ test_engine ] , 'FP' ] elif( ref_annot[ ref_engine ] == test_annot[ test_engine ] ): score_card[ ref_engine ][ fuzzy_flag ].loc[ score_card[ ref_engine ][ fuzzy_flag ].shape[ 0 ] ] = \ [ filename , start_pos , end_pos , type , ref_annot[ ref_engine ] , 'TP' ] else: equiv_match = False ref_concept = ref_annot[ ref_engine ] test_concept = test_annot[ test_engine ] for lhs in norm_synonyms: if( ref_concept in norm_synonyms[ lhs ] and test_concept in norm_synonyms[ lhs ] ): equiv_match = True break if( equiv_match ): score_card[ ref_engine ][ fuzzy_flag ].loc[ score_card[ ref_engine ][ fuzzy_flag ].shape[ 0 ] ] = \ [ filename , start_pos , end_pos , type , ref_concept , 'TP' ] else: score_card[ ref_engine ][ fuzzy_flag ].loc[ score_card[ ref_engine ][ fuzzy_flag ].shape[ 0 ] ] = \ [ filename , start_pos , end_pos , type , ref_annot[ ref_engine ] , 'FN' ] score_card[ ref_engine ][ fuzzy_flag ].loc[ score_card[ ref_engine ][ fuzzy_flag ].shape[ 0 ] ] = \ [ filename , start_pos , end_pos , type , test_annot[ test_engine ] , 'FP' ]
[docs]def exact_comparison_runner( reference_filename , confusion_matrix , score_card , reference_annot , test_entries , start_key , end_key , fuzzy_flag , scorable_attributes , scorable_engines , norm_synonyms ): log.debug( "Entering '{}'".format( sys._getframe().f_code.co_name ) ) ## grab type and end position reference_type , reference_start , reference_end = \ get_annotation_from_base_entry( reference_annot , start_key , end_key ) if( reference_type == None ): ## If we couldn't extract a type, consider this ## an invalid annotations return( False , test_entries ) ## Loop through all the test annotations ## that haven't been matched yet test_leftovers = [] matched_flag = False for test_annot in test_entries: ## TODO - nesting comparisons, multiple overlaps if( matched_flag ): test_leftovers.append( test_annot ) continue ## grab type and end position test_type , test_start , test_end = \ get_annotation_from_base_entry( test_annot , start_key , end_key ) if( test_type == None ): ## If we couldn't extract a type, consider this ## an invalid annotation continue if( reference_start == test_start and reference_end == test_end ): matched_flag = True update_confusion_matrix( confusion_matrix , fuzzy_flag , reference_type , test_type ) ## If the types match... if( reference_type == test_type ): ## ... and the end positions match, then we have a ## perfect match update_score_card( 'TP' , score_card , fuzzy_flag , reference_filename , reference_start , reference_end , reference_type , ref_annot = reference_annot , test_annot = test_annot , scorable_attributes = scorable_attributes , scorable_engines = scorable_engines , norm_synonyms = norm_synonyms ) else: update_score_card( 'FN' , score_card , fuzzy_flag , reference_filename , reference_start , reference_end , reference_type , ref_annot = reference_annot , test_annot = test_annot ) update_score_card( 'FP' , score_card , fuzzy_flag , reference_filename , test_start , test_end , test_type , ref_annot = reference_annot , test_annot = test_annot ) else: test_leftovers.append( test_annot ) ######### log.debug( "Leaving '{}'".format( sys._getframe().f_code.co_name ) ) return( matched_flag , test_leftovers )
[docs]def start_comparison_runner( reference_filename , confusion_matrix , score_card , reference_annot , test_entries , start_key , end_key , fuzzy_flag , scorable_attributes , scorable_engines , norm_synonyms ): log.debug( "Entering '{}'".format( sys._getframe().f_code.co_name ) ) ## grab type and end position reference_type , reference_start , reference_end = \ get_annotation_from_base_entry( reference_annot , start_key , end_key ) if( reference_type == None ): ## If we couldn't extract a type, consider this ## an invalid annotations return( False , test_entries ) ## Loop through all the test annotations ## that haven't been matched yet test_leftovers = [] matched_flag = False for test_annot in test_entries: ## TODO - nesting comparisons, multiple overlaps if( matched_flag ): test_leftovers.append( test_annot ) continue ## grab type and end position test_type , test_start , test_end = \ get_annotation_from_base_entry( test_annot , start_key , end_key ) if( test_type == None ): ## If we couldn't extract a type, consider this ## an invalid annotation continue if( reference_start == test_start or ## TODO - the SOF guard isn't needed here. ## Need to research the best approach ## or data representation when we have ## the equivalent overrun prior to the ## start of a file as after the ## SOF (START OF FILE) indicator ( reference_start != 'SOF' and test_start != 'SOF' and reference_start != 'EOF' and test_start != 'EOF' and reference_start >= test_start - 1 and reference_start <= test_start + 1 ) ): matched_flag = True update_confusion_matrix( confusion_matrix , fuzzy_flag , reference_type , test_type ) ## If the types match... if( reference_type == test_type ): ## ... and the end positions match, then we have a ## perfect match update_score_card( 'TP' , score_card , fuzzy_flag , reference_filename , reference_start , reference_end , reference_type , ref_annot = reference_annot , test_annot = test_annot , scorable_attributes = scorable_attributes , scorable_engines = scorable_engines , norm_synonyms = norm_synonyms ) else: update_score_card( 'FN' , score_card , fuzzy_flag , reference_filename , reference_start , reference_end , reference_type , ref_annot = reference_annot , test_annot = test_annot ) update_score_card( 'FP' , score_card , fuzzy_flag , reference_filename , test_start , test_end , test_type , ref_annot = reference_annot , test_annot = test_annot ) else: test_leftovers.append( test_annot ) ######### log.debug( "Leaving '{}'".format( sys._getframe().f_code.co_name ) ) return( matched_flag , test_leftovers )
[docs]def end_comparison_runner( reference_filename , confusion_matrix , score_card , reference_annot , test_entries , start_key , end_key , fuzzy_flag , scorable_attributes , scorable_engines , norm_synonyms ): log.debug( "Entering '{}'".format( sys._getframe().f_code.co_name ) ) ## grab type and end position reference_type , reference_start , reference_end = \ get_annotation_from_base_entry( reference_annot , start_key , end_key ) if( reference_type == None ): ## If we couldn't extract a type, consider this ## an invalid annotations return( False , test_entries ) ## Loop through all the test annotations ## that haven't been matched yet test_leftovers = [] matched_flag = False for test_annot in test_entries: ## TODO - nesting comparisons, multiple overlaps if( matched_flag ): test_leftovers.append( test_annot ) continue ## grab type and end position test_type , test_start , test_end = \ get_annotation_from_base_entry( test_annot , start_key , end_key ) if( test_type == None ): ## If we couldn't extract a type, consider this ## an invalid annotation continue if( reference_end == test_end or ( reference_end != 'SOF' and test_end != 'SOF' and reference_end != 'EOF' and test_end != 'EOF' and reference_end >= test_end - 1 and reference_end <= test_end + 1 ) ): matched_flag = True update_confusion_matrix( confusion_matrix , fuzzy_flag , reference_type , test_type ) ## If the types match... if( reference_type == test_type ): ## ... and the end positions match, then we have a ## perfect match update_score_card( 'TP' , score_card , fuzzy_flag , reference_filename , reference_start , reference_end , reference_type , ref_annot = reference_annot , test_annot = test_annot , scorable_attributes = scorable_attributes , scorable_engines = scorable_engines , norm_synonyms = norm_synonyms ) else: update_score_card( 'FN' , score_card , fuzzy_flag , reference_filename , reference_start , reference_end , reference_type , ref_annot = reference_annot , test_annot = test_annot ) update_score_card( 'FP' , score_card , fuzzy_flag , reference_filename , test_start , test_end , test_type , ref_annot = reference_annot , test_annot = test_annot ) else: test_leftovers.append( test_annot ) ######### log.debug( "Leaving '{}'".format( sys._getframe().f_code.co_name ) ) return( matched_flag , test_leftovers )
[docs]def fully_contained_comparison_runner( reference_filename , confusion_matrix , score_card , reference_annot , test_entries , start_key , end_key , fuzzy_flag , scorable_attributes , scorable_engines , norm_synonyms ): log.debug( "Entering '{}'".format( sys._getframe().f_code.co_name ) ) ## grab type and end position reference_type , reference_start , reference_end = \ get_annotation_from_base_entry( reference_annot , start_key , end_key ) if( reference_type == None ): ## If we couldn't extract a type, consider this ## an invalid annotations return( False , test_entries ) ## Loop through all the test annotations ## that haven't been matched yet test_leftovers = [] matched_flag = False for test_annot in test_entries: ## TODO - nesting comparisons, multiple overlaps if( matched_flag ): test_leftovers.append( test_annot ) continue ## grab type and end position test_type , test_start , test_end = \ get_annotation_from_base_entry( test_annot , start_key , end_key ) if( test_type == None ): ## If we couldn't extract a type, consider this ## an invalid annotation continue if( ( test_start == 'SOF' or ( reference_start != 'SOF' and test_start <= reference_start ) ) and ( test_end == 'EOF' or ( reference_end != 'EOF' and reference_end <= test_end ) ) ): matched_flag = True update_confusion_matrix( confusion_matrix , fuzzy_flag , reference_type , test_type ) ## If the types match... if( reference_type == test_type ): update_score_card( 'TP' , score_card , fuzzy_flag , reference_filename , reference_start , reference_end , reference_type , ref_annot = reference_annot , test_annot = test_annot , scorable_attributes = scorable_attributes , scorable_engines = scorable_engines , norm_synonyms = norm_synonyms ) else: update_score_card( 'FN' , score_card , fuzzy_flag , reference_filename , reference_start , reference_end , reference_type , ref_annot = reference_annot , test_annot = test_annot ) update_score_card( 'FP' , score_card , fuzzy_flag , reference_filename , test_start , test_end , test_type , ref_annot = reference_annot , test_annot = test_annot ) else: test_leftovers.append( test_annot ) ######### log.debug( "Leaving '{}'".format( sys._getframe().f_code.co_name ) ) return( matched_flag , test_leftovers )
[docs]def partial_comparison_runner( reference_filename , confusion_matrix , score_card , reference_annot , test_entries , start_key , end_key , fuzzy_flag , scorable_attributes , scorable_engines , norm_synonyms ): log.debug( "Entering '{}'".format( sys._getframe().f_code.co_name ) ) ## grab type and end position reference_type , reference_start , reference_end = \ get_annotation_from_base_entry( reference_annot , start_key , end_key ) if( reference_type == None ): ## If we couldn't extract a type, consider this ## an invalid annotations return( False , test_entries ) ## Loop through all the test annotations ## that haven't been matched yet test_leftovers = [] matched_flag = False for test_annot in test_entries: ## TODO - nesting comparisons, multiple overlaps if( matched_flag ): test_leftovers.append( test_annot ) continue ## grab type and end position test_type , test_start , test_end = \ get_annotation_from_base_entry( test_annot , start_key , end_key ) if( test_type == None ): ## If we couldn't extract a type, consider this ## an invalid annotation continue if( ( ( reference_start == 'SOF' or reference_start <= test_start ) and ( reference_end == 'EOF' or reference_end > test_start ) ) or ( ( reference_start == 'SOF' or reference_start < test_end ) and ( reference_end == 'EOF' or reference_end >= test_end ) ) ): matched_flag = True update_confusion_matrix( confusion_matrix , fuzzy_flag , reference_type , test_type ) ## If the types match... if( reference_type == test_type ): update_score_card( 'TP' , score_card , fuzzy_flag , reference_filename , reference_start , reference_end , reference_type , ref_annot = reference_annot , test_annot = test_annot , scorable_attributes = scorable_attributes , scorable_engines = scorable_engines , norm_synonyms = norm_synonyms ) else: update_score_card( 'FN' , score_card , fuzzy_flag , reference_filename , reference_start , reference_end , reference_type , ref_annot = reference_annot , test_annot = test_annot ) update_score_card( 'FP' , score_card , fuzzy_flag , reference_filename , test_start , test_end , test_type , ref_annot = reference_annot , test_annot = test_annot ) else: test_leftovers.append( test_annot ) ######### log.debug( "Leaving '{}'".format( sys._getframe().f_code.co_name ) ) return( matched_flag , test_leftovers )
[docs]def reference_annot_comparison_runner( reference_filename , confusion_matrix , score_card , reference_annot , test_entries , start_key , end_key , fuzzy_flag , scorable_attributes , scorable_engines , norm_synonyms ): log.debug( "Entering '{}'".format( sys._getframe().f_code.co_name ) ) ## Start offset matching is special and gets run alone if( fuzzy_flag == 'start' ): reference_matched, test_leftovers = start_comparison_runner( reference_filename , confusion_matrix , score_card , reference_annot , test_entries , start_key , end_key , fuzzy_flag , scorable_attributes , scorable_engines , norm_synonyms ) return( reference_matched , test_leftovers ) ## End offset matching is special and gets run alone if( fuzzy_flag == 'end' ): reference_matched, test_leftovers = end_comparison_runner( reference_filename , confusion_matrix , score_card , reference_annot , test_entries , start_key , end_key , fuzzy_flag , scorable_attributes , scorable_engines , norm_synonyms ) return( reference_matched , test_leftovers ) ## The other three types of matching care compatible and can be ## run together reference_matched, test_leftovers = exact_comparison_runner( reference_filename , confusion_matrix , score_card , reference_annot , test_entries , start_key , end_key , fuzzy_flag , scorable_attributes , scorable_engines , norm_synonyms ) if( fuzzy_flag == 'exact' or reference_matched ): return( reference_matched , test_leftovers ) reference_matched, test_leftovers = fully_contained_comparison_runner( reference_filename , confusion_matrix , score_card , reference_annot , test_leftovers , start_key , end_key , fuzzy_flag , scorable_attributes , scorable_engines , norm_synonyms ) if( fuzzy_flag == 'fully-contained' or reference_matched ): return( reference_matched , test_leftovers ) reference_matched , test_leftovers = partial_comparison_runner( reference_filename , confusion_matrix , score_card , reference_annot , test_leftovers , start_key , end_key , fuzzy_flag , scorable_attributes , scorable_engines , norm_synonyms ) return( reference_matched , test_leftovers )
[docs]def document_level_annot_comparison_runner( reference_filename , confusion_matrix , score_card , reference_annot , test_entries , fuzzy_flag , scorable_attributes ): log.debug( "Entering '{}'".format( sys._getframe().f_code.co_name ) ) ## reference_type = reference_annot[ 'type' ] reference_pivot_value = reference_annot[ 'pivot_value' ] annot_parity = reference_annot[ 'parity' ] if( reference_type == None ): ## If we couldn't extract a type, consider this ## an invalid annotations return( False , test_entries ) ## Loop through all the test annotations ## that haven't been matched yet test_leftovers = [] matched_flag = False for test_annot in test_entries: ## TODO - nesting comparisons, multiple overlaps test_type = test_annot[ 'type' ] if( matched_flag ): if( reference_type == test_type ): if( annot_parity == 'Unique' ): log.warn( 'Multiple system annotations found for annotation type: {}'.format( test_type ) ) ## TODO - We're going to skip over 'Any', 'First', and 'Last' annotations for now else: test_leftovers.append( test_annot ) continue ## test_pivot_value = test_annot[ 'pivot_value' ] if( test_type == None ): ## If we couldn't extract a type, consider this ## an invalid annotation continue if( reference_type == test_type ): this_type = '{}'.format( reference_type , reference_pivot_value ) that_type = '{}'.format( test_type , test_pivot_value ) ## If the pivot_values match... if( reference_pivot_value == test_pivot_value ): matched_flag = True update_confusion_matrix( confusion_matrix , fuzzy_flag , this_type , that_type ) update_score_card( 'TP' , score_card , fuzzy_flag , reference_filename , -1 , -1 , this_type , pivot_value = reference_pivot_value , scorable_attributes = scorable_attributes , scorable_engines = [] ) elif( annot_parity == 'Any' ): ## Not matching with the 'Any' parity flag means that it could ## potentially match another annotation later so we won't be ## worried about scoring it yet. test_leftovers.append( test_annot ) else: matched_flag = True update_confusion_matrix( confusion_matrix , fuzzy_flag , this_type , that_type ) update_score_card( 'FN' , score_card , fuzzy_flag , reference_filename , -1 , -1 , this_type , pivot_value = reference_pivot_value ) update_score_card( 'FP' , score_card , fuzzy_flag , reference_filename , -1 , -1 , that_type , pivot_value = test_pivot_value ) else: test_leftovers.append( test_annot ) return( matched_flag , test_leftovers )
[docs]def evaluate_positions( reference_filename , confusion_matrix , score_card , reference_ss , test_ss , fuzzy_flag = 'exact' , use_mapped_chars = False , scorable_attributes = [] , scorable_engines = [] , norm_synonyms = {} ): log.debug( "Entering '{}'".format( sys._getframe().f_code.co_name ) ) if( use_mapped_chars ): start_key = 'begin_pos_mapped' end_key = 'end_pos_mapped' else: start_key = 'begin_pos' end_key = 'end_pos' ## log.debug( 'Anchoring positions at \'{}\' and \'{}\''.format( start_key , end_key ) ) reference_entries = flatten_ss_dictionary( reference_ss , 'reference' ) test_entries = flatten_ss_dictionary( test_ss , 'test' ) ## In case there are no reference_entries, initialize test_leftovers ## as the full list of test_entries test_leftovers = test_entries ## reference_entries_doc_level = [] test_entries_doc_level = [] ## for reference_annot in reference_entries: ## grab type and end position reference_type , reference_start , reference_end = \ get_annotation_from_base_entry( reference_annot , start_key , end_key ) if( reference_start == -1 ): ## A start_key of -1 means that this an a document level ## annotation and should be scored elsewhere reference_entries_doc_level.append( reference_annot ) continue reference_matched , test_leftovers = \ reference_annot_comparison_runner( reference_filename , confusion_matrix , score_card , reference_annot , test_entries , start_key , end_key , fuzzy_flag , scorable_attributes , scorable_engines , norm_synonyms ) test_entries = test_leftovers if( not reference_matched ): if( reference_type != None ): update_confusion_matrix( confusion_matrix , fuzzy_flag , reference_type , '*FN*' ) update_score_card( 'FN' , score_card , fuzzy_flag , reference_filename , reference_start , reference_end , reference_type , ref_annot = reference_annot , test_annot = None ) ## any remaining entries in the reference set are FNs for test_annot in test_leftovers: ## grab type and end position test_type , test_start , test_end = \ get_annotation_from_base_entry( test_annot , start_key , end_key ) if( test_type == None ): continue if( test_start == -1 ): ## A start_key of -1 means that this an a document level ## annotation and should be scored elsewhere test_entries_doc_level.append( test_annot ) continue update_confusion_matrix( confusion_matrix , fuzzy_flag , '*FP*' , test_type ) update_score_card( 'FP' , score_card , fuzzy_flag , reference_filename , test_start , test_end , test_type , None , test_annot ) ## ## When there are document level annotations, we loop over the entries ## again to score them using a different algorithm test_entries = test_entries_doc_level test_leftovers = test_entries for reference_annot in reference_entries_doc_level: reference_matched , test_leftovers = \ document_level_annot_comparison_runner( reference_filename , confusion_matrix , score_card , reference_annot , test_entries , fuzzy_flag , scorable_attributes ) test_entries = test_leftovers if( not reference_matched ): ## grab type and end position reference_type = reference_annot[ 'type' ] reference_pivot = reference_annot[ 'pivot_value' ] this_type = '{} = "{}"'.format( reference_type , reference_pivot ) if( reference_type != None ): update_confusion_matrix( confusion_matrix , fuzzy_flag , reference_type , '*FN*' ) update_score_card( 'FN' , score_card , fuzzy_flag , reference_filename , reference_start , reference_end , reference_type , pivot_value = reference_pivot , ref_annot = reference_annot , test_annot = None ) ## any remaining entries in the reference set are FNs for test_annot in test_leftovers: ## test_type = test_annot[ 'type' ] test_pivot = test_annot[ 'pivot_value' ] if( test_type == None ): continue that_type = '{} = "{}"'.format( test_type , test_pivot ) update_confusion_matrix( confusion_matrix , fuzzy_flag , '*FP*' , that_type ) update_score_card( 'FP' , score_card , fuzzy_flag , reference_filename , -1 , -1 , that_type , pivot_value = test_pivot , ref_annot = None , test_annot = test_annot ) ## log.debug( "Leaving '{}'".format( sys._getframe().f_code.co_name ) )
[docs]def evaluate_doc_properties( reference_filename , confusion_matrix , score_card , reference_ss , test_ss , patterns , fuzzy_flag = 'doc-property' , scorable_attributes = [] , scorable_engines = [] , norm_synonyms = {} ): log.debug( "Entering '{}'".format( sys._getframe().f_code.co_name ) ) ## log.debug( 'Anchoring positions at the document level' ) start_key = 'begin_pos' end_key = 'end_pos' reference_entries = flatten_ss_dictionary( reference_ss , 'reference' ) test_entries = flatten_ss_dictionary( test_ss , 'test' ) ## In case there are no reference_entries, initialize test_leftovers ## as the full list of test_entries test_leftovers = test_entries #### tn_types = set() for pattern in patterns: tn_types.add( pattern[ 'type' ] ) #### for reference_annot in reference_entries: ## grab type and end position reference_type , reference_start , reference_end = \ get_annotation_from_base_entry( reference_annot , start_key , end_key ) if( reference_type in tn_types ): tn_types.remove( reference_type ) test_leftovers = [] matched_flag = False for test_annot in test_entries: ## TODO - nesting comparisons, multiple overlaps if( matched_flag ): test_leftovers.append( test_annot ) continue ## grab type and end position test_type , test_start , test_end = \ get_annotation_from_base_entry( test_annot , start_key , end_key ) if( test_type == None ): ## If we couldn't extract a type, consider this ## an invalid annotation continue if( reference_type == test_type ): matched_flag = True update_confusion_matrix( confusion_matrix , fuzzy_flag , reference_type , test_type ) update_score_card( 'TP' , score_card , fuzzy_flag , reference_filename , reference_start , reference_end , reference_type , ref_annot = reference_annot , test_annot = test_annot , scorable_attributes = scorable_attributes , scorable_engines = scorable_engines , norm_synonyms = norm_synonyms ) if( not matched_flag ): update_confusion_matrix( confusion_matrix , fuzzy_flag , reference_type , '*FN*' ) update_score_card( 'FN' , score_card , fuzzy_flag , reference_filename , reference_start , reference_end , reference_type , ref_annot = reference_annot , test_annot = None ) #### test_entries = test_leftovers ## any remaining entries in the test set are FPs for test_annot in test_leftovers: ## grab type and end position test_type , test_start , test_end = \ get_annotation_from_base_entry( test_annot , start_key , end_key ) if( test_type == None ): continue if( test_type in tn_types ): tn_types.remove( test_type ) update_confusion_matrix( confusion_matrix , fuzzy_flag , '*FP*' , test_type ) update_score_card( 'FP' , score_card , fuzzy_flag , reference_filename , test_start , test_end , test_type , None , test_annot ) ## any remaining entries in the type system, we consider those TNs for tn_type in tn_types: update_confusion_matrix( confusion_matrix , fuzzy_flag , tn_type , '*TN*' ) update_score_card( 'TN' , score_card , fuzzy_flag , reference_filename , -1 , -1 , tn_type , ref_annot = None , test_annot = None ) ## log.debug( "Leaving '{}'".format( sys._getframe().f_code.co_name ) )
## ## All functions related to printing and calculating scoring metrics ##
[docs]def accuracy( tp , fp , tn , fn ): if( tp + fp + tn + fn > 0 ): return ( tp + tn ) / float( tp + fp + tn + fn ) else: return None
[docs]def precision( tp , fp ): if( fp + tp > 0 ): return tp / float( fp + tp ) else: return None
[docs]def recall( tp , fn ): if( fn + tp > 0 ): return tp / float( fn + tp ) else: return None
[docs]def specificity( tn , fp , empty_value = None ): ##print( '{} + {}'.format( tn , fp ) ) if( tn + fp > 0 ): return tn / float( tn + fp ) else: return empty_value
[docs]def f_score( p , r , beta = 1 ): if( p != None and r != None and p + r > 0 ): return ( 1 + ( beta**2 ) ) * ( p * r ) / \ ( ( ( beta**2 ) * p ) + r ) else: return None
[docs]def add_missing_fields( score_summary ): log.debug( "Entering '{}'".format( sys._getframe().f_code.co_name ) ) score_types = score_summary.keys() if( 'TP' not in score_types ): score_summary[ 'TP' ] = 0.0 if( 'FP' not in score_types ): score_summary[ 'FP' ] = 0.0 if( 'TN' not in score_types ): score_summary[ 'TN' ] = 0.0 if( 'FN' not in score_types ): score_summary[ 'FN' ] = 0.0 log.debug( "Leaving '{}'".format( sys._getframe().f_code.co_name ) )
[docs]def norm_summary( score_summary , args ): log.debug( "Entering '{}'".format( sys._getframe().f_code.co_name ) ) ## Source for definitions: ## -- https://en.wikipedia.org/wiki/Precision_and_recall#Definition_.28classification_context.29 ## ## First, we want to make sure that all score types are represented ## in the summary series. add_missing_fields( score_summary ) ## True Positive Rate (TPR), ## Sensitivity, ## Recall, ## Probability of Detection if( 'Recall' in args.metrics_list or len( args.f_beta_values ) > 0 ): score_summary[ 'Recall' ] = recall( tp = score_summary[ 'TP' ] , fn = score_summary[ 'FN' ] ) if( 'Sensitivity' in args.metrics_list ): score_summary[ 'Sensitivity' ] = recall( tp = score_summary[ 'TP' ] , fn = score_summary[ 'FN' ] ) ## Positive Predictive Value (PPV), ## Precision if( 'Precision' in args.metrics_list or len( args.f_beta_values ) > 0 ): score_summary[ 'Precision' ] = precision( tp = score_summary[ 'TP' ] , fp = score_summary[ 'FP' ] ) ## True Negative Rate (TNR), ## Specificity (SPC) if( 'Specificity' in args.metrics_list ): score_summary[ 'Specificity' ] = specificity( tn = score_summary[ 'TN' ] , fp = score_summary[ 'FP' ] ) ## Accuracy if( 'Accuracy' in args.metrics_list ): score_summary[ 'Accuracy' ] = accuracy( tp = score_summary[ 'TP' ] , fp = score_summary[ 'FP' ] , tn = score_summary[ 'TN' ] , fn = score_summary[ 'FN' ] ) ## for beta in args.f_beta_values: score_summary[ 'F{}'.format( beta ) ] = f_score( p = score_summary[ 'Precision' ] , r = score_summary[ 'Recall' ] , beta = float( beta ) ) ## metrics = [] for metric in args.metrics_list: score = score_summary[ metric ] if( score is None ): score = args.empty_value metrics.append( score ) log.debug( "Leaving '{}'".format( sys._getframe().f_code.co_name ) ) return metrics
[docs]def recursive_deep_key_value_pair( dictionary , path , key , value ): log.debug( "Entering '{}'".format( sys._getframe().f_code.co_name ) ) if( len( path ) == 0 ): dictionary[ key ] = value else: pop_path = path[ 0 ] if( pop_path not in dictionary ): dictionary[ pop_path ] = {} dictionary[ pop_path ] = recursive_deep_key_value_pair( dictionary[ pop_path ] , path[ 1: ] , key , value ) log.debug( "Leaving '{}'".format( sys._getframe().f_code.co_name ) ) return dictionary
[docs]def update_output_dictionary( out_file , metric_type , metrics_keys , metrics_values ): log.debug( "Entering '{}'".format( sys._getframe().f_code.co_name ) ) if( os.path.exists( out_file ) ): try: with open( out_file , 'r' ) as fp: file_dictionary = json.load( fp ) except ValueError as e: log.error( 'I can\'t update the output dictionary \'{}\'' + \ 'because I had a problem loading it into memory: ' + \ '{}'.format( out_file , e ) ) log.debug( "Leaving '{}'".format( sys._getframe().f_code.co_name ) ) return else: file_dictionary = {} for key , value in zip( metrics_keys , metrics_values ): file_dictionary = recursive_deep_key_value_pair( file_dictionary , metric_type , key , value ) with open( out_file , 'w' ) as fp: json.dump( file_dictionary , fp , sort_keys = True , indent = 4 ) log.debug( "Leaving '{}'".format( sys._getframe().f_code.co_name ) )
[docs]def update_csv_output( csv_out_filename , delimiter , row_content ): log.debug( "Entering '{}'".format( sys._getframe().f_code.co_name ) ) with open( csv_out_filename , 'a' ) as fp: fp.write( '{}\n'.format( delimiter.join( row_content ) ) ) log.debug( "Leaving '{}'".format( sys._getframe().f_code.co_name ) )
[docs]def output_metrics( class_data , fuzzy_flag , metrics , delimiter_prefix , delimiter , stdout_flag , csv_out_filename , pretty_print_flag ): log.debug( "Entering '{}'".format( sys._getframe().f_code.co_name ) ) if( len( class_data ) == 1 ): row_name = class_data[ 0 ] elif( len( class_data ) == 2 ): row_name = class_data[ 1 ] elif( len( class_data ) == 4 ): row_name = '{} x {}'.format( class_data[ 1 ] , class_data[ 3 ] ) ## row_content = delimiter.join( '{}'.format( m ) for m in metrics ) ## if( csv_out_filename ): full_row = [ fuzzy_flag ] for n in range( 0 , 4 ): if( n >= len( class_data ) ): full_row.append( '' ) else: full_row.append( class_data[ n ] ) full_row.append( row_content ) update_csv_output( csv_out_filename , delimiter , full_row ) if( stdout_flag ): if( not pretty_print_flag ) : print( '{}{}{}{}'.format( delimiter_prefix , row_name , delimiter , row_content ) ) else: pretty_row = '{0}{1:30s}'.format( delimiter_prefix , row_name ) for i in range( 0 , len( metrics ) ): if( metrics[ i ] is None or metrics[ i ] == '' ): pretty_row = '{}{}{:9s}'.format( pretty_row , delimiter , '' ) elif( metrics[ i ] == 0 ): pretty_row = '{}{}{:9d}'.format( pretty_row , delimiter , 0 ) elif( metrics[ i ] == int( metrics[ i ] ) ): pretty_row = '{}{}{:9d}'.format( pretty_row , delimiter , int( metrics[ i ] ) ) else: pretty_row = '{}{}{:9.4f}'.format( pretty_row , delimiter , metrics[ i ] ) print( pretty_row ) ######### log.debug( "Leaving '{}'".format( sys._getframe().f_code.co_name ) )
[docs]def get_unique_types( config ): unique_types = set() for pattern in config: if( 'pivot_attr' in pattern ): ## TODO - pull this fron the config file for pivot_value in [ 'met' , 'not met' ]: ##pattern[ 'pivot_values' ]: this_type = '{} = "{}"'.format( pattern[ 'type' ] , pivot_value ) unique_types.add( this_type ) else: unique_types.add( pattern[ 'type' ] ) ## return( unique_types )
## TODO - load this from an external file