Source code for text_extraction
import sys
import logging as log
import os
import json
import xml.etree.ElementTree as ET
import re
#############################################
## Minor helper functions to help simplify
## the apparent logic of extracting out
## annotations
#############################################
[docs]def create_annotation_entry( begin_pos = -1 , begin_pos_mapped = None ,
end_pos = -1 , end_pos_mapped = None ,
raw_text = None ,
pivot_attr = None , pivot_value = None ,
parity = None ,
tag_name = None ):
new_entry = dict( begin_pos = begin_pos ,
end_pos = end_pos ,
raw_text = raw_text ,
type = tag_name )
##
if( begin_pos_mapped != None ):
new_entry[ 'begin_pos_mapped' ] = begin_pos_mapped
##
if( end_pos_mapped != None ):
new_entry[ 'end_pos_mapped' ] = end_pos_mapped
##
if( pivot_attr != None ):
new_entry[ 'pivot_attr' ] = pivot_attr
if( pivot_value != None ):
new_entry[ 'pivot_value' ] = pivot_value
if( parity != None ):
new_entry[ 'parity' ] = parity
return new_entry
[docs]def map_position( offset_mapping , position , direction ):
"""Convert a character position to the closest non-skipped position.
Use the offset mapping dictionary to convert a position to the
closest valid character position. We include a direction for the
mapping because it is important to consider the closest position
to the right or left of a position when mapping the start or
end position, respectively.
:param offset_mapping: a dictionary mapping character positions to
``None`` if the character is in the skip
list or to an int, otherwise
:param position: current character position
:param direction: 1, if moving right; -1 if moving left
:returns: character position if all skipped characters were removed
from the document and positions re-assigned or
``None``, on KeyError
"""
if( not bool( offset_mapping ) ):
return None
else:
try:
while( offset_mapping[ position ] == None ):
position = str( int( position ) + direction )
return offset_mapping[ position ]
except KeyError:
if( direction < 0 ):
return 'EOF'
elif( direction > 0 ):
return 'SOF'
else:
return None
#############################################
##
#############################################
[docs]def extract_annotations_xml( ingest_file ,
offset_mapping ,
annotation_path ,
tag_name ,
namespaces = {} ,
begin_attribute = None ,
end_attribute = None ,
text_attribute = None ,
optional_attributes = [] ,
normalization_engines = [] ):
log.debug( "Entering '{}'".format( sys._getframe().f_code.co_name ) )
found_annots = {}
strict_starts = {}
##
tree = ET.parse( ingest_file )
root = tree.getroot()
##
try:
found_annots = root.findall( annotation_path , namespaces )
except SyntaxError as e:
log.warning( 'I had a problem parsing the XML file. Are you sure your XPath is correct and matches your namespace?\n\tSkipping file ({}) and XPath ({})\n\tReported Error: {}'.format( ingest_file , annotation_path , e ) )
log.debug( "-- Leaving '{}'".format( sys._getframe().f_code.co_name ) )
return strict_starts
##
log.debug( 'Found {} annotation(s) matching the pattern \'{}\''.format(
len( found_annots ) , annotation_path ) )
for annot in found_annots:
if( begin_attribute != None ):
try:
begin_pos = annot.get( begin_attribute )
begin_pos_mapped = map_position( offset_mapping , begin_pos , 1 )
except NameError as e:
log.error( 'NameError: {}'.format( e ) )
if( end_attribute != None ):
## TODO - add flag to distinguish between conditions
## when the end_pos marks the last character
## vs. when the end_pos is the position after
## the last character
try:
end_pos = annot.get( end_attribute )
end_pos_mapped = map_position( offset_mapping , end_pos , -1 )
except NameError as e:
log.error( 'NameError: {}'.format( e ) )
if( text_attribute == None ):
raw_text = annot.text
else:
raw_text = annot.get( text_attribute )
new_entry = create_annotation_entry( begin_pos = begin_pos ,
begin_pos_mapped = begin_pos_mapped ,
end_pos = end_pos ,
end_pos_mapped = end_pos_mapped ,
raw_text = raw_text ,
tag_name = tag_name )
## TODO - do we need to sheild this in case an optional attribute
## doesn't exist in the annotation or does Python (and
## later etude engine code) handle a null correctly/safely?
for optional_attr in optional_attributes:
new_entry[ optional_attr ] = annot.get( optional_attr )
## TODO - do we need to sheild this in case a normalization engine
## doesn't exist in the annotation or does Python (and
## later etude engine code) handle a null correctly/safely?
for normalization_engine in normalization_engines:
if( normalization_engine in annot.attrib ):
new_entry[ normalization_engine ] = annot.get( normalization_engine )
##
if( begin_pos in strict_starts ):
strict_starts[ begin_pos ].append( new_entry )
else:
strict_starts[ begin_pos ] = [ new_entry ]
##
return strict_starts
[docs]def extract_annotations_xml_spanless( ingest_file ,
annotation_path ,
tag_name ,
pivot_attribute ,
parity ,
namespaces = {} ,
text_attribute = None ,
optional_attributes = [] ):
log.debug( "Entering '{}'".format( sys._getframe().f_code.co_name ) )
found_annots = {}
strict_starts = {}
##
tree = ET.parse( ingest_file )
root = tree.getroot()
##
try:
found_annots = root.findall( annotation_path , namespaces )
except SyntaxError as e:
log.warning( 'I had a problem parsing the XML file. Are you sure your XPath is correct and matches your namespace?\n\tSkipping file ({}) and XPath ({})\n\tReported Error: {}'.format( ingest_file , annotation_path , e ) )
log.debug( "-- Leaving '{}'".format( sys._getframe().f_code.co_name ) )
return strict_starts
##
log.debug( 'Found {} annotation(s) matching the pattern \'{}\''.format(
len( found_annots ) , annotation_path ) )
for annot in found_annots:
pivot_value = annot.get( pivot_attribute )
new_entry = create_annotation_entry( pivot_attr = pivot_attribute ,
pivot_value = pivot_value ,
parity = parity ,
tag_name = tag_name )
##
for optional_attr in optional_attributes:
new_entry[ optional_attr ] = annot.get( optional_attr )
##
if( -1 in strict_starts ):
for old_entry in strict_starts[ -1 ]:
## TODO - current logic allows multiple instances of the same type
## if they differ on their pivot_value. This is good for topic
## tagging or similar annotations but is bad for most instances
## of publication date or author tagging.
if( new_entry[ 'pivot_value' ] != old_entry[ 'pivot_value' ] ):
strict_starts[ -1 ].append( new_entry )
break
else:
strict_starts[ -1 ] = [ new_entry ]
##
return strict_starts
[docs]def extract_brat_text_bound_annotation( ingest_file ,
annot_line ,
offset_mapping ,
tag_name ,
optional_attributes = [] ):
## Continuous:
## T1 Organization 0 43 International Business Machines Corporation
## TODO - Discontinuous:
## T1 Location 0 5;16 23 North America
matches = re.match( r'^(T[0-9]+)\s+(\w+)\s+([0-9]+)\s+([0-9]+)\s+(.*)' ,
annot_line )
if( matches ):
found_tag = matches.group( 2 )
if( found_tag != tag_name ):
## Skip this line because we don't care about this type
return None
match_index = matches.group( 1 )
begin_pos = matches.group( 3 )
begin_pos_mapped = map_position( offset_mapping , begin_pos , 1 )
end_pos = matches.group( 4 )
end_pos_mapped = map_position( offset_mapping , end_pos , -1 )
raw_text = matches.group( 5 )
new_entry = create_annotation_entry( begin_pos = begin_pos ,
begin_pos_mapped = begin_pos_mapped ,
end_pos = end_pos ,
end_pos_mapped = end_pos_mapped ,
raw_text = raw_text ,
tag_name = tag_name )
new_entry[ 'match_index' ] = match_index
for optional_attr in optional_attributes:
new_entry[ optional_attr ] = 'false'
return new_entry
else:
log.warning( 'I had a problem parsing a brat text-bound annotation line ({}):{}'.format( ingest_file ,
annot_line ) )
return None
[docs]def extract_brat_relation( ingest_file ,
annot_line ,
tag_name ,
optional_attributes = [] ):
## T3 Organization 33 41 Ericsson
## T4 Country 75 81 Sweden
## R1 Origin Arg1:T3 Arg2:T4
return None
[docs]def extract_brat_equivalence( ingest_file ,
annot_line ,
optional_attributes = [] ):
## T1 Organization 0 43 International Business Machines Corporation
## T2 Organization 45 48 IBM
## T3 Organization 52 60 Big Blue
## * Equiv T1 T2 T3
return None
[docs]def extract_brat_event( ingest_file ,
annot_line ,
tag_name ,
optional_attributes = [] ):
## T1 Organization 0 4 Sony
## T2 MERGE-ORG 14 27 joint venture
## T3 Organization 33 41 Ericsson
## E1 MERGE-ORG:T2 Org1:T1 Org2:T3
return None
[docs]def extract_brat_attribute( ingest_file ,
annot_line ,
optional_attributes = [] ):
## A1 Negated T34
## TODO - support multi-valued attributes
## A2 Confidence E2 L1
matches = re.match( r'^([AM][0-9]+)\s+(\w+)\s+([TREAMN\*][0-9]+)$' ,
annot_line )
match_index = None
attribute = None
key = None
attribute_value = 'true'
if( matches ):
attribute = matches.group( 2 )
match_index = matches.group( 3 )
if( attribute in optional_attributes ):
key = attribute
return( [ match_index , attribute , key , attribute_value ] )
else:
log.warning( 'I had a problem parsing a brat attribute line ({}):{}'.format( ingest_file ,
annot_line ) )
return None
[docs]def extract_brat_normalization( ingest_file ,
annot_line ,
normalization_engines = [] ):
## N1 Reference T1 Wikipedia:534366 Barack Obama
matches = re.match( r'^(N[0-9]+)\s+Reference\s+([TREAMN\*][0-9]+)\s+([^:]+):([^\s]+)\s+(.+)$' ,
annot_line )
match_index = None
normalization_engine = None
normalization_id = None
normalized_value = None
if( matches ):
match_index = matches.group( 2 )
normalization_engine = matches.group( 3 )
normalization_id = matches.group( 4 )
normalized_value = matches.group( 5 )
if( normalization_engine in normalization_engines ):
return( [ match_index ,
normalization_engine , normalization_id ,
normalized_value ] )
else:
return( None )
else:
log.warning( 'I had a problem parsing a brat normalization line ({}):{}'.format( ingest_file ,
annot_line ) )
return None
[docs]def extract_annotations_brat_standoff( ingest_file ,
offset_mapping ,
type_prefix ,
tag_name ,
optional_attributes = [] ,
normalization_engines = [] ):
log.debug( "Entering '{}'".format( sys._getframe().f_code.co_name ) )
annots_by_index = dict()
##
try:
with open( ingest_file , 'r' ) as fp:
for line in fp:
line = line.rstrip()
brat_annotation_type = line[ 0 ]
if( brat_annotation_type == 'T' ):
## T1 Organization 0 43 International Business Machines Corporation
new_entry = extract_brat_text_bound_annotation( ingest_file ,
line ,
offset_mapping ,
tag_name ,
optional_attributes )
## A non-None entry means we were able to parse the line
if( new_entry != None ):
annots_by_index[ new_entry[ 'match_index' ] ] = new_entry
## TODO - support discontinous spans:
## T1 Location 0 5;16 23 North America
## T2 Location 10 23
elif( brat_annotation_type == 'A' or
brat_annotation_type == 'M' ):
## A1 Negated T34
new_attribute_value = extract_brat_attribute( ingest_file ,
line ,
optional_attributes )
if( new_attribute_value[ 0 ] != None and
new_attribute_value[ 0 ] in annots_by_index and
new_attribute_value[ 2 ] != None ):
annots_by_index[ new_attribute_value[ 0 ] ][ new_attribute_value[ 2 ] ] = new_attribute_value[ 3 ]
elif( brat_annotation_type == 'R' ):
## R1 Origin Arg1:T3 Arg2:T4
new_entry = extract_brat_relation( ingest_file ,
line ,
tag_name ,
optional_attributes )
elif( brat_annotation_type == '*' ):
## * Equiv T1 T2 T3
new_entry = extract_brat_relation( ingest_file ,
line ,
optional_attributes )
elif( brat_annotation_type == 'E' ):
## E1 MERGE-ORG:T2 Org1:T1 Org2:T3
new_entry = extract_brat_relation( ingest_file ,
line ,
tag_name ,
optional_attributes )
elif( brat_annotation_type == 'N' ):
## N1 Reference T1 Wikipedia:534366 Barack Obama
new_normalization = extract_brat_normalization( ingest_file ,
line ,
normalization_engines )
if( new_normalization is not None and
new_normalization[ 0 ] is not None and
new_normalization[ 0 ] in annots_by_index and
new_normalization[ 1 ] is not None and
new_normalization[ 2 ] is not None ):
annots_by_index[ new_normalization[ 0 ] ][ new_normalization[ 1 ] ] = new_normalization[ 2 ]
##elif( brat_annotation_type == '#' ):
## ## Do nothing. We don't support comments.
##
except IOError as e:
log.warning( 'I had a problem reading the standoff notation file ({}).\n\tReported Error: {}'.format( ingest_file ,
e ) )
log.debug( "-- Leaving '{}'".format( sys._getframe().f_code.co_name ) )
strict_starts = {}
for match_index in annots_by_index:
new_entry = annots_by_index[ match_index ]
begin_pos = new_entry[ 'begin_pos' ]
if( begin_pos in strict_starts ):
strict_starts[ begin_pos ].append( new_entry )
else:
strict_starts[ begin_pos ] = [ new_entry ]
return strict_starts
[docs]def extract_annotations_plaintext( offset_mapping ,
raw_content ,
delimiter ,
tag_name ):
strict_starts = {}
init_offset = 0
last_offset = 0
##
list_of_chars = list( raw_content )
##
for char in list_of_chars:
if( re.search( delimiter , char ) ):
## Skip when we see multiple of the same delimiter in a row
if( init_offset == last_offset ):
init_offset += 1
last_offset = init_offset
continue
begin_pos = str( last_offset )
begin_pos_mapped = map_position( offset_mapping , begin_pos , 1 )
## TODO - add flag to distinguish between conditions
## when the end_pos marks the last character
## vs. when the end_pos is the position after
## the last character
last_offset = init_offset + 1
end_pos = str( init_offset )
end_pos_mapped = map_position( offset_mapping , end_pos , -1 )
raw_text = ''.join( list_of_chars[ int( begin_pos ):int( end_pos ) ] )
new_entry = create_annotation_entry( begin_pos = begin_pos ,
begin_pos_mapped = \
begin_pos_mapped ,
end_pos = end_pos ,
end_pos_mapped = end_pos_mapped ,
raw_text = raw_text ,
tag_name = tag_name )
##
if( begin_pos in strict_starts ):
strict_starts[ begin_pos ].append( new_entry )
else:
strict_starts[ begin_pos ] = [ new_entry ]
init_offset += 1
##
if( last_offset < init_offset ):
begin_pos = str( last_offset )
begin_pos_mapped = map_position( offset_mapping , begin_pos , 1 )
## TODO - add flag to distinguish between conditions
## when the end_pos marks the last character
## vs. when the end_pos is the position after
## the last character
last_offset = init_offset + 1
end_pos = str( init_offset )
end_pos_mapped = map_position( offset_mapping , end_pos , -1 )
raw_text = ''.join( list_of_chars[ int( begin_pos ):int( end_pos ) ] )
new_entry = create_annotation_entry( begin_pos = begin_pos ,
begin_pos_mapped = begin_pos_mapped ,
end_pos = end_pos ,
end_pos_mapped = end_pos_mapped ,
raw_text = raw_text ,
tag_name = tag_name )
##
if( begin_pos in strict_starts ):
strict_starts[ begin_pos ].append( new_entry )
else:
strict_starts[ begin_pos ] = [ new_entry ]
##
log.debug( "-- Leaving '{}'".format( sys._getframe().f_code.co_name ) )
##
return strict_starts
[docs]def write_annotations_to_disk( annotations , out_file ):
log.debug( "Entering '{}'".format( sys._getframe().f_code.co_name ) )
if( out_file == None ):
log.debug( "-- Leaving '{}'".format( sys._getframe().f_code.co_name ) )
return
##
## TODO - add directory existence check
with open( out_file , 'w' ) as output:
json.dump( annotations , output ,
sort_keys = True , indent = 4 )
log.debug( "-- Leaving '{}'".format( sys._getframe().f_code.co_name ) )
[docs]def split_content( raw_text , offset_mapping , skip_chars ):
log.debug( "Entering '{}'".format( sys._getframe().f_code.co_name ) )
list_of_chars = list( raw_text )
init_offset = 0
mapped_offset = 0
for char in list_of_chars:
if( re.match( skip_chars , char ) ):
offset_mapping[ '{}'.format( init_offset ) ] = None
else:
offset_mapping[ '{}'.format( init_offset ) ] = '{}'.format( mapped_offset )
mapped_offset += 1
init_offset += 1
log.debug( "-- Leaving '{}'".format( sys._getframe().f_code.co_name ) )
return offset_mapping
[docs]def extract_chars( ingest_file ,
namespaces ,
document_data ,
skip_chars = None ):
log.debug( "Entering '{}'".format( sys._getframe().f_code.co_name ) )
offset_mapping = {}
##
cdata_flag = False
attribute_flag = False
if( 'cdata_xpath' in document_data ):
cdata_flag = True
content_path = document_data[ 'cdata_xpath' ]
elif( 'content_attribute' in document_data ):
attribute_flag = True
content_path = document_data[ 'tag_xpath' ]
attribute_name = document_data[ 'content_attribute' ]
else:
log.debug( "Leaving '{}'".format( sys._getframe().f_code.co_name ) )
return None , offset_mapping
##
tree = ET.parse( ingest_file )
root = tree.getroot()
##
try:
found_annots = root.findall( content_path , namespaces )
except SyntaxError as e:
log.warning( 'I had a problem parsing the XML file. Are you sure your XPath is correct and matches your namespace?\n\tSkipping file ({}) and XPath ({})\n\tReported Error: {}'.format( ingest_file , content_path , e ) )
log.debug( "-- Leaving '{}'".format( sys._getframe().f_code.co_name ) )
return None , offset_mapping
##
raw_text = None
log.debug( 'Found {} match(es) for the pattern \'{}\''.format( len( found_annots ) ,
content_path ) )
if( len( found_annots ) > 1 ):
log.warning( 'Expected to only find a single pattern matching content XPath (\'{}\') but found {}. Using first match.'.format( content_path , len( found_annots ) ) )
elif( len( found_annots ) == 0 ):
log.warning( 'Expected to find exactly one match for content XPath (\'{}\') but found {}. Returning empty document content.'.format( content_path , len( found_annots ) ) )
log.debug( "-- Leaving '{}'".format( sys._getframe().f_code.co_name ) )
return None , offset_mapping
for annot in found_annots:
if( cdata_flag ):
raw_text = annot.text
break
elif( attribute_flag ):
try:
raw_text = annot.attrib[ attribute_name ]
break
except KeyError as e:
log.warning( 'KeyError: could not find attribute_name {} in the matched path \'{}\''.format( e , content_path ) )
raw_text = None
##
if( raw_text != None and skip_chars != None ):
offset_mapping = split_content( raw_text ,
offset_mapping ,
skip_chars )
log.debug( "-- Leaving '{}'".format( sys._getframe().f_code.co_name ) )
return raw_text , offset_mapping
[docs]def extract_plaintext( ingest_file , skip_chars ):
offset_mapping = {}
##
with open( ingest_file , 'r' ) as fp:
raw_text = fp.read()
if( raw_text != None and skip_chars != None ):
offset_mapping = split_content( raw_text ,
offset_mapping ,
skip_chars )
return raw_text , offset_mapping
[docs]def align_tokens_on_whitespace( dictionary ,
out_file ):
if( out_file != None and
os.path.exists( out_file ) ):
os.remove( out_file )
mapping = dictionary[ 'offset_mapping' ]
keys = list( mapping )
content = dictionary[ 'raw_content' ]
keys.sort( key = int )
token_start = None
for this_token in keys:
##print( '{}\t{}\t{}'.format( token_start ,
## this_token ,
## mapping[ this_token ] ) )
if( mapping[ this_token ] != None and
token_start == None ):
token_start = this_token
elif( mapping[ this_token ] == None and
token_start != None ):
if( out_file != None ):
with open( out_file , 'a' ) as fp:
fp.write( '{}\n'.format( ##token_start , previous_token ,
content[ int( token_start ):int( this_token ) ] ) )
token_start = None
#print( '{} vs. {}'.format( len( dictionary[ 'raw_content' ] ) ,
# len( dictionary[ 'raw_content' ] ) ) )
#############################################
##
#############################################
[docs]def extract_annotations( ingest_file ,
namespaces ,
document_data ,
patterns ,
skip_chars = None ,
out_file = None ):
log.debug( "Entering '{}'".format( sys._getframe().f_code.co_name ) )
raw_content = None
annotations = {}
offset_mapping = {}
file_dictionary = {}
if( bool( document_data ) ):
if( 'format' in document_data and
document_data[ 'format' ] == 'txt' ):
try:
raw_content , offset_mapping = extract_plaintext( ingest_file ,
skip_chars )
except:
e = sys.exc_info()[0]
log.error( 'Uncaught exception in extract_plaintext: {}'.format( e ) )
elif( 'format' in document_data and
document_data[ 'format' ] == '.ann .txt' ):
## TODO use format to change filename according to pattern
## document_data[ 'format' ]
plaintext_alternate_file = re.sub( '.ann$' ,
'.txt' ,
ingest_file )
try:
raw_content , offset_mapping = extract_plaintext( plaintext_alternate_file ,
skip_chars )
except:
e = sys.exc_info()[0]
log.error( 'Uncaught exception in extract_plaintext: {}'.format( e ) )
else:
try:
raw_content , offset_mapping = extract_chars( ingest_file ,
namespaces ,
document_data ,
skip_chars )
except ET.ParseError as e:
log.warning( 'ParseError in file ({}): {}'.format( ingest_file , e ) )
log.debug( "-- Leaving '{}'".format( sys._getframe().f_code.co_name ) )
return offset_mapping , annotations
except UnicodeEncodeError as e:
print(( '{}'.format( e ) ))
except:
e = sys.exc_info()[0]
log.error( 'Uncaught exception in extract_chars: {}'.format( e ) )
if( skip_chars and raw_content == None ):
log.error( 'I could not find the raw content for this document but was asked to ignore its whitespace. Add document data to the config file for extracting raw content or use the --heed-whitespace flag.' )
log.debug( "-- Leaving '{}'".format( sys._getframe().f_code.co_name ) )
return offset_mapping , annotations
## Normalization engines are global for the config file
## rather than pattern-specific
norm_eng = []
if( 'normalization_engines' in document_data ):
norm_eng = document_data[ 'normalization_engines' ]
for pattern in patterns:
new_annots = None
if( 'delimiter' in pattern ):
new_annots = \
extract_annotations_plaintext( offset_mapping = offset_mapping ,
raw_content = raw_content ,
delimiter = \
pattern[ 'delimiter' ] ,
tag_name = pattern[ 'type' ] )
elif( 'type_prefix' in pattern ):
norm_eng = []
if( 'normalization_engines' in document_data ):
norm_eng = document_data[ 'normalization_engines' ]
new_annots = \
extract_annotations_brat_standoff( ingest_file ,
offset_mapping = offset_mapping ,
type_prefix = \
pattern[ 'type_prefix' ] ,
tag_name = pattern[ 'type' ] ,
optional_attributes = \
pattern[ 'optional_attributes' ] ,
normalization_engines = norm_eng )
elif( 'xpath' in pattern and
'begin_attr' in pattern and
'end_attr' in pattern ):
new_annots = \
extract_annotations_xml( ingest_file ,
offset_mapping = offset_mapping ,
namespaces = namespaces ,
annotation_path = pattern[ 'xpath' ] ,
tag_name = pattern[ 'type' ] ,
begin_attribute = \
pattern[ 'begin_attr' ] ,
end_attribute = \
pattern[ 'end_attr' ] ,
optional_attributes = \
pattern[ 'optional_attributes' ] ,
normalization_engines = norm_eng )
elif( 'xpath' in pattern and
'pivot_attr' in pattern ):
new_annots = \
extract_annotations_xml_spanless( ingest_file ,
namespaces = namespaces ,
annotation_path = pattern[ 'xpath' ] ,
tag_name = pattern[ 'type' ] ,
pivot_attribute = \
pattern[ 'pivot_attr' ] ,
parity = \
pattern[ 'parity' ] ,
optional_attributes = \
pattern[ 'optional_attributes' ] )
else:
print( 'WARNING: Skipping pattern because it is missing essential elements:\n\n{}'.format( pattern ) )
##
if( new_annots != None ):
for new_annot_key in new_annots:
if( new_annot_key in annotations ):
## TODO - If multiple patterns are associated with the same type
## and we're evaluating annotations at the document level
## (or otherwise want at most one instance of an annotation
## type at a given position), then we need to de-dup some
## of the annotation entries before combining them here.
combined_annots = annotations[ new_annot_key ] + new_annots[ new_annot_key ]
annotations.update( { new_annot_key : combined_annots } )
else:
annotations.update( { new_annot_key : new_annots[ new_annot_key ] } )
##
file_dictionary = dict( raw_content = raw_content ,
offset_mapping = offset_mapping ,
annotations = annotations )
##
##
try:
write_annotations_to_disk( file_dictionary , out_file )
except IOError as e:
log.error( 'IOError caught in write_annotations_to_disk: {}'.format( e ) )
except:
e = sys.exc_info()[0]
log.error( 'Uncaught exception in write_annotations_to_disk: {}'.format( e ) )
log.debug( "-- Leaving '{}'".format( sys._getframe().f_code.co_name ) )
return offset_mapping , annotations