shandex_edi_2024/edi_997_outgoing.py

164 lines
4.7 KiB
Python

#!/usr/bin/env python3
"""
Create a 997 and load it into the proper outgoing folder
A 997 is a functional acknowledgment, we received the EDI tranmission contorl number and reply back with it
"""
import dataclasses
import datetime
import io
import pathlib
import typing
import pprint
import records # type: ignore
import yamamotoyama # type: ignore
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "edi-testing" #TODO change back to "outgoing"
ACK_DIRECTORY = THIS_DIRECTORY / "997_processing"
TEST_FILE = THIS_DIRECTORY / "edi-testing" / "944_YAMAMOTOYAMA_765aaebb-06c4-4eea-8d2a-7dddf2fd9ec2.edi"#TODO remove this
AK1_MAPPING = {
"944" : "RE",
"945" : "SW",
}
def main():
"""
Do it!
"""
#write_997(TEST_FILE)
#TODO loop through 997 directory and send for each file inside
for edi_filename in ACK_DIRECTORY.iterdir(): #TODO uncomment and review
process_file(edi_filename)
# file moved to 997 processing folder to be sent later
shutil.move(edi_filename, X12_DIRECTORY / "outgoing" / "archive" / edi_filename.name)
def write_997(edi_filename: pathlib.Path):
"""
Write out a 997 to a file
"""
group_control_number = ''
transaction_set_control_number = ''
company = ''
edi_type = ''
with open(edi_filename, 'r') as edi_file:
for line in edi_file:
line = line.split("~")
for field in line:
fields = field.split("*")
if fields[0] == 'ISA':
group_control_number = fields[13]
company_longname = fields[6]
elif fields[0] == 'GS':
company = fields[2]
transaction_set_control_number = fields[6]
elif fields[0] == 'ST':
edi_type = fields[1]
now = datetime.datetime.now()
datestamp_string = now.strftime("%Y-%m-%d-%H-%M-%S")
with (X12_DIRECTORY / f"{company}-{transaction_set_control_number}-{datestamp_string}-997.edi").open(
"w", encoding="utf-8", newline="\n"
) as x12_file:
raw_control_number = control_number()
output = x12_file.write
header = write_997_header(raw_control_number,company_longname,company,edi_type,group_control_number)
lines = write_997_lines(edi_type,transaction_set_control_number)
footer = write_997_footer(raw_control_number,group_control_number)
output(header)
output(lines)
output(footer)
def write_997_header(raw_control_number,company_longname,company,edi_type,group_control_number):
now = datetime.datetime.now()
date = now.strftime("%y%m%d")
longdate = now.strftime("%Y%m%d")
time = now.strftime("%H%M")
short_control_number = f"{raw_control_number:04}"
interchange_control_number = (
f"{raw_control_number:09}" # Format to 9 characters
)
AK1 = AK1_MAPPING[edi_type]
header_string = ''.join([
f"ISA*00* *00* *ZZ*YAMAMOTOYAMA *ZZ*{company_longname}*",
date,
"*",
time,
"*U*00401*",
interchange_control_number,
"*0*P*>~",
"GS*",
"FA*",
"YAMAMOTOYAMA*",
company+"*",
longdate+"*",
time+"*",
short_control_number+"*",
"X*",
"004010~",
"ST*",
"997*0001~",
"AK1*",
AK1+"*",
group_control_number+"~",
]
)
return header_string
def write_997_lines(edi_type,transaction_set_control_number):
#short_control_number = f"{group_control_number:04}"
detail_string = ''.join([
"AK2*",
edi_type+"*",
transaction_set_control_number+"~"
"AK5*",
"A~",
"AK9*A*1*1*1~",
])
return detail_string
def write_997_footer(raw_control_number,group_control_number):
interchange_control_number = (
f"{raw_control_number:09}" # Format to 9 characters
)
short_control_number = f"{raw_control_number:04}"
footer_string = ''.join([
"SE*6*0001~",
"GE*1*",
str(short_control_number)+"~",
"IEA*1*",
str(interchange_control_number)
])
return footer_string
def control_number() -> int:
"""
Next EDI serial number
"""
filepath = pathlib.Path(__file__).with_suffix(".remember")
encoding = "utf-8"
newline = "\n"
try:
with filepath.open(
"r", encoding=encoding, newline=newline
) as remember_file:
number = int(remember_file.readline().rstrip("\n"))
except (OSError, ValueError):
number = 0
number += 1
with filepath.open("w", encoding=encoding, newline=newline) as remember_file:
remember_file.write(f"{number}\n")
return number
if __name__ == "__main__":
main()