#!/usr/bin/env python3 #coding: utf-8 # # generate JP-PINT CSV from yaml file # # This software was designed by SAMBUICHI,Nobuyuki (Sambuichi Professional Engineers Office) # and it was written by SAMBUICHI,Nobuyuki (Sambuichi Professional Engineers Office). # # ==== MIT License ==== # # (c) 2022 SAMBUICHI Nobuyuki (Sambuichi Professional Engineers Office) # # Permission is hereby granted,free of charge,to any person obtaining a copy # of this software and associated documentation files (the "Software"),to deal # in the Software without restriction,including without limitation the rights # to use,copy,modify,merge,publish,distribute,sublicense,and/or sell # copies of the Software,and to permit persons to whom the Software is # furnished to do so,subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS",WITHOUT WARRANTY OF ANY KIND,EXPRESS OR # IMPLIED,INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,DAMAGES OR OTHER # LIABILITY,WHETHER IN AN ACTION OF CONTRACT,TORT OR OTHERWISE,ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import csv from collections import OrderedDict from datetime import datetime, timedelta, timezone import re import json import sys import os import yaml import argparse fieldnames = [ 'SemSort','PINT_ID','Section','Card','Aligned','AlignedCard','Level','BT','BT_ja','DT','Desc','Desc_ja','Explanation','Explanation_ja','Example','SynSort','element','UBLdatatype','XPath','selectors','Codelist','UBLCard','CAR' ] def file_path(pathname): if '/' == pathname[0:1]: return pathname else: dir = os.path.dirname(__file__) new_path = os.path.join(dir,pathname) return new_path if __name__ == '__main__': JST = timezone(timedelta(hours=+9), 'JST') now = re.sub('-','',datetime.now(JST).isoformat(timespec='minutes')[:10])[2:] # Create the parser parser = argparse.ArgumentParser(prog='jp-pint_syntax', usage='%(prog)s [options] semfile synfile', description='Generate CSV files from yaml files') # Add the arguments parser.add_argument('semFile',metavar='semfile',type=str,help='Semantic model yaml file') parser.add_argument('synFile',metavar='synfile',type=str,help='Syntax binding yaml file') parser.add_argument('-v','--verbose',action='store_true') parser.add_argument('-d','--debug',action='store_true') args = parser.parse_args() semantic_yaml_file = file_path(args.semFile) if args.semFile else file_path('data/_092semantic-model.yaml') pre, ext = os.path.splitext(semantic_yaml_file) fname = pre.split('/').pop() semantic_csv_file = file_path(f'data/{now}{fname}.csv') syntax_yaml_file = file_path(args.synFile) if args.synFile else file_path('data/_092syntax-binding.yaml') pre, ext = os.path.splitext(syntax_yaml_file) fname = pre.split('/').pop() syntax_csv_file = file_path(f'data/{now}{fname}.csv') pint_csv_file = file_path(f'data/{now}PINT.csv') verbose = args.verbose # Check if infile exists if not os.path.isfile(semantic_yaml_file): print('file not found') sys.exit() if verbose: print('** START ** ',__file__) # read semantic model yaml file and update accordingly sem_sort = 1000 def parse_semantic(term, level): global semantic_model global sem_sort id = term['id'] if 'id' in term else '' id = id.lower() name = term['name'] if 'name' in term else '' definition = term['definition'].replace("\n", "\\n") if 'definition' in term else '' section = term['section'] if 'section' in term else '' cardinality = term['cardinality'] if 'cardinality' in term else '' business_term = { 'sem_sort':sem_sort, 'id':id, 'level': level, 'name':name, 'definition':definition, 'section': section, 'cardinality':cardinality } sem_sort += 10 if 'value' in term: for k,v in term['value'].items(): business_term[k] = v semantic_model[id] = business_term if 'children' in term: children = term['children'] if isinstance(children,dict): parse_semantic(children, level+1) elif isinstance(children,list): for child in children: parse_semantic(child, level+1) semantic_model = {} with open(semantic_yaml_file, 'r') as yml: semantic_yaml = yaml.safe_load(yml) _yaml = { 'id':'ibg-00', 'level': 0, 'name':'Invoice', 'definition':'A Invoice.', 'section':'Shared', 'cardinality':'1..1', 'children':semantic_yaml['content'] } parse_semantic(_yaml, 0) keys = set() for k,v in semantic_model.items(): for key in v.keys(): if not key in keys: keys.add(key) fieldnames = list(keys) with open(semantic_csv_file,'w',newline='') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for k,v in semantic_model.items(): writer.writerow(v) # read syntax binding yaml file and update accordingly syn_sort = 1000 def parse_syntax(term, level): global syntax_binding global levels global syn_sort while level > len(levels)-1: levels.append('') while len(levels)-1 > level: levels.pop() weight = term['weight'] if 'weight' in term else '' id = term['id'] if 'id' in term else '' id = id.lower() element = term['element'] if 'element' in term else '' selector = term['selector'] if 'selector' in term else '' if selector: levels[level] = f'{element}[{selector}]' else: levels[level] = element xpath = '/' + '/'.join(levels) reference = term['reference'] if 'reference' in term else '' if isinstance(reference,str): reference = reference.lower() else: pass # print(f'{id} Wrong reference: {reference}') name = term['name'] if 'name' in term else '' definition = term['definition'].replace("\n", "\\n") if 'definition' in term else '' section = term['section'] if 'section' in term else '' cardinality = term['cardinality'] if 'cardinality' in term else '' business_term = { 'syn_sort':syn_sort, 'weight':weight, 'id':id, 'level': level, 'xpath':xpath, 'element':element, 'selector':selector, 'reference':reference, 'name':name, 'definition':definition, 'section': section, 'cardinality':cardinality } syn_sort += 10 if 'value' in term: for k,v in term['value'].items(): business_term[k] = v syntax_binding[id] = business_term if 'children' in term: children = term['children'] if isinstance(children,dict): parse_syntax(children, level+1) elif isinstance(children,list): for child in children: parse_syntax(child, level+1) syntax_binding = {} with open(syntax_yaml_file, 'r') as yml: syntax_yaml = yaml.safe_load(yml) _yaml = syntax_yaml['content'] levels = [''] parse_syntax(_yaml, 0) keys = set() for k,v in syntax_binding.items(): for key in v.keys(): if not key in keys: keys.add(key) fieldnames = list(keys) with open(syntax_csv_file,'w',newline='') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for k,v in syntax_binding.items(): writer.writerow(v) fieldnames = [ 'syn_sort','section','level','cardinality','xpath','element','selector','reference','name','datatype','definition','id','weight', 'sem_sort','sem_section','sem_id','sem_level','sem_cardinality','sem_name','sem_datatype','sem_definition', ] records = [] for k,v in syntax_binding.items(): ref = v['reference'] try: sem = semantic_model[ref] if ref and isinstance(ref,str) and re.match(r'^(i|I)',ref) and ref in semantic_model else {} except: print(ref) record = {} # syntax record['syn_sort'] = v['syn_sort'] if 'syn_sort' in v else '' record['xpath'] = v['xpath'] if 'xpath' in v else '' record['level'] = v['level'] if 'level' in v else '' record['element'] = v['element'] if 'element' in v else '' record['selector'] = v['selector'] if 'selector' in v else '' record['reference'] = v['reference'] if 'reference' in v else '' record['section'] = v['section'] if 'section' in v else '' record['cardinality'] = v['cardinality'] if 'cardinality' in v else '' record['name'] = v['name'] if 'name' in v else '' record['datatype'] = v['datatype'] if 'datatype' in v else '' record['definition'] = v['definition'] if 'definition' in v else '' record['id'] = v['id'] if 'id' in v else '' record['weight'] = v['weight'] if 'weight' in v else '' # semantics record['sem_sort'] = sem['sem_sort'] if ref and len(v['reference']) > 0 and 'sem_sort' in sem else '' record['sem_section'] = sem['section'] if ref and len(v['reference']) > 0 and 'section' in sem else '' record['sem_id'] = sem['id'] if ref and len(v['reference']) > 0 and 'id' in sem else '' record['sem_level'] = sem['level'] if ref and len(v['reference']) > 0 and 'level' in sem else '' record['sem_cardinality'] = sem['cardinality'] if ref and len(v['reference']) > 0 and 'cardinality' in sem else '' record['sem_name'] = sem['name'] if ref and len(v['reference']) > 0 and 'name' in sem else '' record['sem_datatype'] = sem['datatype'] if ref and len(v['reference']) > 0 and 'datatype' in sem else '' record['sem_definition'] = sem['definition'] if ref and len(v['reference']) > 0 and 'definition' in sem else '' records.append(record) with open(pint_csv_file,'w',newline='') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for d in records: writer.writerow(d) if verbose: print(f'** END ** {semantic_yaml_file} {syntax_yaml_file}')