164 lines
4.7 KiB
Python
164 lines
4.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Create a 997 and load it into the proper outgoing folder
|
|
|
|
A 997 is a functional acknowledgment, we received the EDI tranmission contorl number and reply back with it
|
|
"""
|
|
import dataclasses
|
|
import datetime
|
|
import io
|
|
import pathlib
|
|
import typing
|
|
import pprint
|
|
|
|
import records # type: ignore
|
|
import yamamotoyama # type: ignore
|
|
|
|
|
|
THIS_DIRECTORY = pathlib.Path(__file__).parent
|
|
X12_DIRECTORY = THIS_DIRECTORY / "edi-testing" #TODO change back to "outgoing"
|
|
ACK_DIRECTORY = THIS_DIRECTORY / "997_processing"
|
|
|
|
TEST_FILE = THIS_DIRECTORY / "edi-testing" / "944_YAMAMOTOYAMA_765aaebb-06c4-4eea-8d2a-7dddf2fd9ec2.edi"#TODO remove this
|
|
|
|
AK1_MAPPING = {
|
|
"944" : "RE",
|
|
"945" : "SW",
|
|
}
|
|
|
|
def main():
|
|
"""
|
|
Do it!
|
|
"""
|
|
#write_997(TEST_FILE)
|
|
#TODO loop through 997 directory and send for each file inside
|
|
for edi_filename in ACK_DIRECTORY.iterdir(): #TODO uncomment and review
|
|
process_file(edi_filename)
|
|
# file moved to 997 processing folder to be sent later
|
|
shutil.move(edi_filename, X12_DIRECTORY / "outgoing" / "archive" / edi_filename.name)
|
|
|
|
|
|
def write_997(edi_filename: pathlib.Path):
|
|
"""
|
|
Write out a 997 to a file
|
|
"""
|
|
group_control_number = ''
|
|
transaction_set_control_number = ''
|
|
company = ''
|
|
edi_type = ''
|
|
with open(edi_filename, 'r') as edi_file:
|
|
for line in edi_file:
|
|
line = line.split("~")
|
|
for field in line:
|
|
fields = field.split("*")
|
|
if fields[0] == 'ISA':
|
|
group_control_number = fields[13]
|
|
company_longname = fields[6]
|
|
elif fields[0] == 'GS':
|
|
company = fields[2]
|
|
transaction_set_control_number = fields[6]
|
|
elif fields[0] == 'ST':
|
|
edi_type = fields[1]
|
|
now = datetime.datetime.now()
|
|
datestamp_string = now.strftime("%Y-%m-%d-%H-%M-%S")
|
|
with (X12_DIRECTORY / f"{company}-{transaction_set_control_number}-{datestamp_string}-997.edi").open(
|
|
"w", encoding="utf-8", newline="\n"
|
|
) as x12_file:
|
|
raw_control_number = control_number()
|
|
output = x12_file.write
|
|
header = write_997_header(raw_control_number,company_longname,company,edi_type,group_control_number)
|
|
lines = write_997_lines(edi_type,transaction_set_control_number)
|
|
footer = write_997_footer(raw_control_number,group_control_number)
|
|
output(header)
|
|
output(lines)
|
|
output(footer)
|
|
|
|
|
|
def write_997_header(raw_control_number,company_longname,company,edi_type,group_control_number):
|
|
now = datetime.datetime.now()
|
|
date = now.strftime("%y%m%d")
|
|
longdate = now.strftime("%Y%m%d")
|
|
time = now.strftime("%H%M")
|
|
short_control_number = f"{raw_control_number:04}"
|
|
interchange_control_number = (
|
|
f"{raw_control_number:09}" # Format to 9 characters
|
|
)
|
|
AK1 = AK1_MAPPING[edi_type]
|
|
|
|
header_string = ''.join([
|
|
f"ISA*00* *00* *ZZ*YAMAMOTOYAMA *ZZ*{company_longname}*",
|
|
date,
|
|
"*",
|
|
time,
|
|
"*U*00401*",
|
|
interchange_control_number,
|
|
"*0*P*>~",
|
|
"GS*",
|
|
"FA*",
|
|
"YAMAMOTOYAMA*",
|
|
company+"*",
|
|
longdate+"*",
|
|
time+"*",
|
|
short_control_number+"*",
|
|
"X*",
|
|
"004010~",
|
|
"ST*",
|
|
"997*0001~",
|
|
"AK1*",
|
|
AK1+"*",
|
|
group_control_number+"~",
|
|
]
|
|
)
|
|
return header_string
|
|
|
|
|
|
def write_997_lines(edi_type,transaction_set_control_number):
|
|
#short_control_number = f"{group_control_number:04}"
|
|
detail_string = ''.join([
|
|
"AK2*",
|
|
edi_type+"*",
|
|
transaction_set_control_number+"~"
|
|
"AK5*",
|
|
"A~",
|
|
"AK9*A*1*1*1~",
|
|
])
|
|
return detail_string
|
|
|
|
def write_997_footer(raw_control_number,group_control_number):
|
|
interchange_control_number = (
|
|
f"{raw_control_number:09}" # Format to 9 characters
|
|
)
|
|
short_control_number = f"{raw_control_number:04}"
|
|
footer_string = ''.join([
|
|
"SE*6*0001~",
|
|
"GE*1*",
|
|
str(short_control_number)+"~",
|
|
"IEA*1*",
|
|
str(interchange_control_number)
|
|
])
|
|
return footer_string
|
|
|
|
|
|
def control_number() -> int:
|
|
"""
|
|
Next EDI serial number
|
|
"""
|
|
filepath = pathlib.Path(__file__).with_suffix(".remember")
|
|
encoding = "utf-8"
|
|
newline = "\n"
|
|
try:
|
|
with filepath.open(
|
|
"r", encoding=encoding, newline=newline
|
|
) as remember_file:
|
|
number = int(remember_file.readline().rstrip("\n"))
|
|
except (OSError, ValueError):
|
|
number = 0
|
|
number += 1
|
|
with filepath.open("w", encoding=encoding, newline=newline) as remember_file:
|
|
remember_file.write(f"{number}\n")
|
|
return number
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|