#!/usr/bin/env python3 """ Consume a generic 944 file from 3PLs, and translate into a Sage X3 readable file - import template ZPTHI. For Shandex we also need to reply with a 997 """ # pylint: disable=too-many-instance-attributes import dataclasses import datetime import decimal import functools import pathlib import re import shutil import typing import pprint import records # type: ignore import yamamotoyama # type: ignore import yamamotoyama.x3_imports # type: ignore THIS_DIRECTORY = pathlib.Path(__file__).parent X12_DIRECTORY = THIS_DIRECTORY / "incoming" IMPORTS_DIRECTORY = THIS_DIRECTORY / "x3_imports" EDI_997_DIRECTORY = THIS_DIRECTORY / "997_processing" SHANDEX_944_FILENAME_RE = re.compile( r"\A 944_YAMAMOTOYAMA_ \S+ [.]edi \Z", re.X | re.M | re.S ) def main(): """ Do it! """ for edi_filename in X12_DIRECTORY.iterdir(): #TODO uncomment and review if SHANDEX_944_FILENAME_RE.match(edi_filename.name): process_file(edi_filename) # file moved to 997 processing folder to be sent later shutil.move(edi_filename, EDI_997_DIRECTORY / edi_filename.name) combine_zpthis() def combine_zpthis(): """ Collect all ZPTHI imports into a single file for easy import. """ archive_directory = IMPORTS_DIRECTORY / "archive" archive_directory.mkdir(exist_ok=True) with (IMPORTS_DIRECTORY / "ZPTHI.dat").open( "w", encoding="utf-8", newline="\n" ) as combined_import_file: for individual_import_filename in IMPORTS_DIRECTORY.glob( "ZPTHI_*.dat" ): with individual_import_filename.open( "r", encoding="utf-8", newline="\n" ) as individual_import_file: for line in individual_import_file: combined_import_file.write(line) shutil.move( individual_import_filename, archive_directory / individual_import_filename.name, ) def tokens_from_edi_file( edi_filename: pathlib.Path, ) -> typing.Iterator[typing.List[str]]: """ Read tokens from EDI file """ with edi_filename.open(encoding="utf-8", newline="") as edi_file: for record in edi_file.read().split("~"): fields = record.split("*") if fields[0] in { #TODO see if there are more fields used in vendor EDI "ISA", "ST", "N2", "N3", "N4", "LX", }: continue yield fields def process_file(edi_filename: pathlib.Path): """ Convert a specific EDI file into an import file. """ warehouse_receipt = Receipt() for fields in tokens_from_edi_file(edi_filename): if fields[0] == "W17": _, _, rcpdat, _, sohnum, sdhnum = fields[:6] warehouse_receipt.sdhnum = sdhnum warehouse_receipt.header.rcpdat = datetime.datetime.strptime( rcpdat, "%Y%m%d" ).date() # 20230922 if fields[0] == "N9" and fields[1] == "PO": pohnum = fields[2] if fields[0] == "W07": # W07*1023*CA**PN*C08249*LT*07032026A***UK*10077652082491 # N9*LI*1000 _, qty_str, uom, _, _, itmref, _, lot = fields[:8] subdetail = ReceiptSubDetail( qtypcu=int(qty_str), lot=lot, ) if fields[0] == 'N9' and fields[1] == 'LI': # N9*LI*1000 line = fields[2] warehouse_receipt.append( ReceiptDetail( sdhnum=warehouse_receipt.sdhnum, itmref=itmref, qtyuom=int(qty_str), poplin=int(line), uom=uom ), subdetail, ) time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") with yamamotoyama.x3_imports.open_import_file( IMPORTS_DIRECTORY / f"ZPTHI_{warehouse_receipt.sdhnum}_{time_stamp}.dat" ) as import_file: warehouse_receipt.output(import_file) @dataclasses.dataclass class ReceiptSubDetail: """ Information that goes onto a receipt sub-detail line, taken from ZPTHI template. """ sta: str = "A" pcu: str = "" qtypcu: int = 0 loc: str = "" lot: str = "" bpslot: str = "" sernum: str = "" def stojous(self, shipment, item) -> typing.List[str]: """ Convert grouped lot quantities into individual STOJOU records to fit on receipt """ with yamamotoyama.get_connection('test') as database: #todo remove 'test' details = ( database.query( """ select 'S' [Code], 'A' [STA_0], [STJ].[PCU_0], cast(cast(-1*[STJ].[QTYSTU_0] as int) as nvarchar) [QTYPCU_0], [STJ].[LOT_0], '' [BPSLOT_0], '' [SERNUM_0] from [FY23TEST].[STOJOU] [STJ] --TODO change to PROD where [STJ].[VCRNUM_0] = :sdhnum and [STJ].[ITMREF_0] = :itmref and [STJ].[LOT_0] = :lot and [STJ].[TRSTYP_0] = 4 """, sdhnum=shipment, itmref=item, lot=self.lot, ) .all() ) return details def convert_to_strings(self) -> typing.List[str]: """ Convert to strings for X3 import writing. """ return yamamotoyama.x3_imports.convert_to_strings( [ "S", self.sta, self.pcu, self.qtypcu, self.lot, self.bpslot, self.sernum, ] ) @dataclasses.dataclass class ReceiptDetail: """ Information that goes on a receipt detail line, taken from ZPTHI template. """ sdhnum: str = "" poplin: int = 0 itmref: str = "" itmdes: str = "" uom: str = "" qtyuom: int = 0 pjt: str = "" star65: str = "" star91: str = "" star92: str = "" subdetails: typing.List[ReceiptSubDetail] = dataclasses.field( default_factory=list ) def append(self, subdetail: ReceiptSubDetail): """ Add subdetail """ subdetail.pcu = self.uom self.subdetails.append(subdetail) def check_subdetail_qty(self): """ Check for shortages by totaling up subdetail quantities. """ total_cases = 0 for subdetail in self.subdetails: total_cases -= subdetail.qtypcu return abs(total_cases) def convert_to_strings(self) -> typing.List[str]: """ Convert to strings for X3 import writing. """ def fix_uom(uom): x3_uom = '' if uom == 'CA': x3_uom = 'CS' else: x3_uom = uom return x3_uom self.qty = self.check_subdetail_qty() return yamamotoyama.x3_imports.convert_to_strings( [ "L", self.sdhnum, self.poplin, self.itmref, fix_uom(self.uom), self.qty, self.star65, self.star91, self.star92, ] ) def __eq__(self, item: typing.Any) -> bool: """ Test for equality """ if isinstance(item, str): return self.itmref == item if isinstance(item, ReceiptDetail): return self.itmref == item.itmref return False # def fill(self):#not needed for receipts # """ # Set soplin & itmdes from itmref & sohnum # """ # def get() -> records.Record: # with yamamotoyama.get_connection() as database: # how_many = ( # database.query( # """ # select # count(*) as [how_many] # from [PROD].[SORDERP] as [SOP] # where # [SOP].[SOHNUM_0] = :sohnum # and [SOP].[ITMREF_0] = :itmref # """, # sohnum=self.sohnum, # itmref=self.itmref, # ) # .first() # .how_many # ) # if how_many == 1: # return database.query( # """ # select top 1 # [SOP].[SOPLIN_0] # ,[SOP].[ITMDES1_0] # ,[SOP].[SAU_0] # from [PROD].[SORDERP] as [SOP] # where # [SOP].[SOHNUM_0] = :sohnum # and [SOP].[ITMREF_0] = :itmref # order by # [SOP].[SOPLIN_0] # """, # sohnum=self.sohnum, # itmref=self.itmref, # ).first() result = get() self.soplin = result.SOPLIN_0 self.itmdes = result.ITMDES1_0 self.sau = result.SAU_0 @dataclasses.dataclass class ReceiptHeader: """ Information that goes on a receipt header, taken from ZPTHI template. """ stofcy: str = "" bpcord: str = "" prhfcy: str = "" rcpdat: datetime.date = datetime.date(1753, 1, 1) pthnum: str = "" bpsnum: str = "" cur: str = "USD" star71 = "" star72 = "" star81 = "" star82 = "" def convert_to_strings(self) -> typing.List[str]: """ Convert to X3 import line """ return yamamotoyama.x3_imports.convert_to_strings( [ "E", self.bpcord, self.rcpdat.strftime("%Y%m%d"), self.pthnum, self.stofcy, self.cur, self.star71, self.star72, self.star81, self.star82, ] ) class ReceiptDetailList: """ List of receipt details """ _details: typing.List[ReceiptDetail] _item_set: typing.Set[str] def __init__(self): self._details = [] self._item_set = set() def append( self, receipt_detail: ReceiptDetail, receipt_subdetail: ReceiptSubDetail, ): """ Append """ itmref = receipt_detail.itmref if itmref in self._item_set: for detail in self._details: if detail == itmref: detail.subdetails.append(receipt_subdetail) return self._item_set.add(itmref) #receipt_detail.fill() receipt_detail.append(receipt_subdetail) self._details.append(receipt_detail) def __iter__(self): return iter(self._details) class Receipt: """ Warehouse receipt, both header & details """ header: ReceiptHeader details: ReceiptDetailList _sdhnum: str def __init__(self): self.header = ReceiptHeader() self._sdhnum = "" self.details = ReceiptDetailList() def append( self, receipt_detail: ReceiptDetail, receipt_subdetail: ReceiptSubDetail, ): """ Add detail information. """ self.details.append(receipt_detail, receipt_subdetail) @property def sdhnum(self): """ shipment number """ return self._sdhnum @sdhnum.setter def sdhnum(self, value: str): if self._sdhnum != value: self._sdhnum = value if value: self._fill_info_from_shipment() def _get_shipment_from_x3(self) -> records.Record: """ Fetch shipment from X3 database. """ with yamamotoyama.get_connection('test') as db_connection:#todo remove 'test' return db_connection.query( """ select [SDH].[STOFCY_0], [SDH].[SDHNUM_0], [SDH].[SALFCY_0], [SDH].[BPCORD_0], [SDH].[CUR_0], [SDH].[SOHNUM_0] from [FY23TEST].[SDELIVERY] [SDH]--TODO change back to [PROD] where [SDH].[SDHNUM_0] = :shipment """, shipment=self.sdhnum, ).first() def _fill_info_from_shipment(self): """ When we learn the SOHNUM, we can copy information from the sales order. """ result = self._get_shipment_from_x3() self.header.stofcy = result.STOFCY_0 self.header.sdhnum = result.SDHNUM_0 self.header.salfcy = result.SALFCY_0 self.header.bpcord = result.BPCORD_0 self.header.cur = result.CUR_0 self.header.sohnum = result.SOHNUM_0 def output(self, import_file: typing.TextIO): """ Output entire order to import_file. """ output = functools.partial( yamamotoyama.x3_imports.output_with_file, import_file ) output(self.header.convert_to_strings()) for detail in self.details: output(detail.convert_to_strings()) for subdetail in detail.subdetails: shipment = detail.sdhnum item = detail.itmref for record in subdetail.stojous(shipment, item): record_list = [ record['Code'], record['STA_0'], record['PCU_0'], record['QTYPCU_0'], record['LOT_0'], record['BPSLOT_0'], record['SERNUM_0'] ] #pprint.pprint(record_list) output(record_list) if __name__ == "__main__": main()