#!/usr/bin/env python3 """ Consume a generic 944 file from 3PLs, and translate into a Sage X3 readable file - import template ZPTHI. For Shandex we also need to reply with a 997 """ # pylint: disable=too-many-instance-attributes import dataclasses import datetime import decimal import functools import pathlib import re import shutil import typing import pprint import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText import records # type: ignore import yamamotoyama # type: ignore import yamamotoyama.x3_imports # type: ignore THIS_DIRECTORY = pathlib.Path(__file__).parent X12_DIRECTORY = THIS_DIRECTORY / "incoming" IMPORTS_DIRECTORY = THIS_DIRECTORY / "x3_imports" EDI_997_DIRECTORY = THIS_DIRECTORY / "997_processing" SHANDEX_944_FILENAME_RE = re.compile( r"\A 944_STASH-YAMAMOTOYAMA_ \S+ [.]edi \Z", re.X | re.M | re.S ) def main(): """ Do it! """ for edi_filename in X12_DIRECTORY.iterdir(): if SHANDEX_944_FILENAME_RE.match(edi_filename.name): process_file(edi_filename) # file moved to 997 processing folder to be sent later shutil.move(edi_filename, EDI_997_DIRECTORY / edi_filename.name) combine_zpthis() def new_944_alert(sdhnum, pohnum, rcpdat): msg = MIMEMultipart() msg['Subject'] = 'New Receipt from Shandex' msg['Precedence'] = 'bulk' msg['From'] = 'x3report@stashtea.com' msg['To'] = 'isenn@yamamotoyama.com' msg['CC'] = 'bleeson@stashtea.com' emailtext = f'Delivery: {sdhnum}\nPO: {pohnum}\nDate: {rcpdat}' msg.attach(MIMEText(emailtext, 'plain')) with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp: smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n typing.Iterator[typing.List[str]]: """ Read tokens from EDI file """ with edi_filename.open(encoding="utf-8", newline="") as edi_file: for record in edi_file.read().split("~"): fields = record.split("*") if fields[0] in { "ISA", "ST", "N2", "N3", "N4", "LX", }: continue yield fields def find_shipment_line(sdhnum, itmref): with yamamotoyama.get_connection() as database: result = database.query( """ select SDDLIN_0 from PROD.SDELIVERYD where SDHNUM_0 = :sdhnum and ITMREF_0 = :itmref """, sdhnum=sdhnum, itmref=itmref ).first()['SDDLIN_0'] return result def check_shipment_status(delivery): with yamamotoyama.get_connection() as database: result = database.query( """ select SDH.SDHNUM_0, CFMFLG_0 from PROD.SDELIVERY SDH where SDH.SDHNUM_0 = :sdhnum """, sdhnum=delivery ).first()['CFMFLG_0'] if result == 2: return True return False def process_file(edi_filename: pathlib.Path): """ Convert a specific EDI file into an import file. """ warehouse_receipt = Receipt() pohnum = '' for fields in tokens_from_edi_file(edi_filename): if fields[0] == "W17": _, _, rcpdat, _, sohnum, sdhnum = fields[:6] warehouse_receipt.sdhnum = sdhnum validated = check_shipment_status(sdhnum) if not validated: validation_alert(sdhnum) break warehouse_receipt.header.rcpdat = datetime.datetime.strptime( rcpdat, "%Y%m%d" ).date() # 20230922 if fields[0] == "N9" and fields[1] == "PO": pohnum = fields[2] if fields[0] == "W07": # W07*1023*CA**PN*C08249*LT*07032026A***UK*10077652082491 # N9*LI*1000 _, qty_str, uom, _, _, itmref, _, lot = fields[:8] subdetail = ReceiptSubDetail( qtypcu=int(qty_str), lot=lot, ) if fields[0] == 'N9' and fields[1] == 'LI': # N9*LI*1000 #line = fields[2] #This line isn't the line number from X3, it needs to be looked up line = find_shipment_line(warehouse_receipt.sdhnum, itmref) warehouse_receipt.append( ReceiptDetail( sdhnum=warehouse_receipt.sdhnum, itmref=itmref, qtyuom=int(qty_str), poplin=int(line), uom=uom ), subdetail, ) time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") new_944_alert(sdhnum, pohnum, warehouse_receipt.header.rcpdat) with yamamotoyama.x3_imports.open_import_file( IMPORTS_DIRECTORY / f"ZPTHI_{warehouse_receipt.sdhnum}_{time_stamp}.dat" ) as import_file: warehouse_receipt.output(import_file) @dataclasses.dataclass class ReceiptSubDetail: """ Information that goes onto a receipt sub-detail line, taken from ZPTHI template. """ sta: str = "A" pcu: str = "" qtypcu: int = 0 loc: str = "" lot: str = "" bpslot: str = "" sernum: str = "" def stojous(self, shipment, item) -> typing.List[str]: """ Convert grouped lot quantities into individual STOJOU records to fit on receipt """ with yamamotoyama.get_connection() as database: details = ( database.query( """ select 'S' [Code], 'A' [STA_0], [STJ].[PCU_0], cast(cast(-1*[STJ].[QTYSTU_0] as int) as nvarchar) [QTYPCU_0], [STJ].[LOT_0], '' [BPSLOT_0], '' [SERNUM_0] from [PROD].[STOJOU] [STJ] where [STJ].[VCRNUM_0] = :sdhnum and [STJ].[ITMREF_0] = :itmref and [STJ].[LOT_0] = :lot and [STJ].[TRSTYP_0] = 4 """, sdhnum=shipment, itmref=item, lot=self.lot, ) .all() ) return details def convert_to_strings(self) -> typing.List[str]: """ Convert to strings for X3 import writing. """ return yamamotoyama.x3_imports.convert_to_strings( [ "S", self.sta, self.pcu, self.qtypcu, self.lot, self.bpslot, self.sernum, ] ) @dataclasses.dataclass class ReceiptDetail: """ Information that goes on a receipt detail line, taken from ZPTHI template. """ sdhnum: str = "" poplin: int = 0 itmref: str = "" itmdes: str = "" uom: str = "" qtyuom: int = 0 pjt: str = "" star65: str = "" star91: str = "" star92: str = "" subdetails: typing.List[ReceiptSubDetail] = dataclasses.field( default_factory=list ) def append(self, subdetail: ReceiptSubDetail): """ Add subdetail """ subdetail.pcu = self.uom self.subdetails.append(subdetail) def check_subdetail_qty(self): """ Check for shortages by totaling up subdetail quantities. """ total_cases = 0 for subdetail in self.subdetails: total_cases -= subdetail.qtypcu return abs(total_cases) def convert_to_strings(self) -> typing.List[str]: """ Convert to strings for X3 import writing. """ def fix_uom(uom): x3_uom = '' if uom == 'CA': x3_uom = 'CS' else: x3_uom = uom return x3_uom self.qty = self.check_subdetail_qty() return yamamotoyama.x3_imports.convert_to_strings( [ "L", self.sdhnum, self.poplin, self.itmref, fix_uom(self.uom), self.qty, self.star65, self.star91, self.star92, ] ) def __eq__(self, item: typing.Any) -> bool: """ Test for equality """ if isinstance(item, str): return self.itmref == item if isinstance(item, ReceiptDetail): return self.itmref == item.itmref return False # def fill(self):#not needed for receipts # """ # Set soplin & itmdes from itmref & sohnum # """ # def get() -> records.Record: # with yamamotoyama.get_connection() as database: # how_many = ( # database.query( # """ # select # count(*) as [how_many] # from [PROD].[SORDERP] as [SOP] # where # [SOP].[SOHNUM_0] = :sohnum # and [SOP].[ITMREF_0] = :itmref # """, # sohnum=self.sohnum, # itmref=self.itmref, # ) # .first() # .how_many # ) # if how_many == 1: # return database.query( # """ # select top 1 # [SOP].[SOPLIN_0] # ,[SOP].[ITMDES1_0] # ,[SOP].[SAU_0] # from [PROD].[SORDERP] as [SOP] # where # [SOP].[SOHNUM_0] = :sohnum # and [SOP].[ITMREF_0] = :itmref # order by # [SOP].[SOPLIN_0] # """, # sohnum=self.sohnum, # itmref=self.itmref, # ).first() result = get() self.soplin = result.SOPLIN_0 self.itmdes = result.ITMDES1_0 self.sau = result.SAU_0 @dataclasses.dataclass class ReceiptHeader: """ Information that goes on a receipt header, taken from ZPTHI template. """ stofcy: str = "" bpcord: str = "" prhfcy: str = "" rcpdat: datetime.date = datetime.date(1753, 1, 1) pthnum: str = "" bpsnum: str = "" cur: str = "USD" star71 = "" star72 = "" star81 = "" star82 = "" def convert_to_strings(self) -> typing.List[str]: """ Convert to X3 import line """ return yamamotoyama.x3_imports.convert_to_strings( [ "E", self.bpcord, self.rcpdat.strftime("%Y%m%d"), self.pthnum, self.stofcy, self.cur, self.star71, self.star72, self.star81, self.star82, ] ) class ReceiptDetailList: """ List of receipt details """ _details: typing.List[ReceiptDetail] _item_set: typing.Set[str] def __init__(self): self._details = [] self._item_set = set() def append( self, receipt_detail: ReceiptDetail, receipt_subdetail: ReceiptSubDetail, ): """ Append """ itmref = receipt_detail.itmref if itmref in self._item_set: for detail in self._details: if detail == itmref: detail.subdetails.append(receipt_subdetail) return self._item_set.add(itmref) #receipt_detail.fill() receipt_detail.append(receipt_subdetail) self._details.append(receipt_detail) def __iter__(self): return iter(self._details) class Receipt: """ Warehouse receipt, both header & details """ header: ReceiptHeader details: ReceiptDetailList _sdhnum: str def __init__(self): self.header = ReceiptHeader() self._sdhnum = "" self.details = ReceiptDetailList() def append( self, receipt_detail: ReceiptDetail, receipt_subdetail: ReceiptSubDetail, ): """ Add detail information. """ self.details.append(receipt_detail, receipt_subdetail) @property def sdhnum(self): """ shipment number """ return self._sdhnum @sdhnum.setter def sdhnum(self, value: str): if self._sdhnum != value: self._sdhnum = value if value: self._fill_info_from_shipment() def _get_shipment_from_x3(self) -> records.Record: """ Fetch shipment from X3 database. """ with yamamotoyama.get_connection() as db_connection: return db_connection.query( """ select [SDH].[STOFCY_0], [SDH].[SDHNUM_0], [SDH].[SALFCY_0], [SDH].[BPCORD_0], [SDH].[CUR_0], [SDH].[SOHNUM_0] from [PROD].[SDELIVERY] [SDH] where [SDH].[SDHNUM_0] = :shipment """, shipment=self.sdhnum, ).first() def _fill_info_from_shipment(self): """ When we learn the SOHNUM, we can copy information from the sales order. """ result = self._get_shipment_from_x3() self.header.stofcy = result.STOFCY_0 self.header.sdhnum = result.SDHNUM_0 self.header.salfcy = result.SALFCY_0 self.header.bpcord = result.BPCORD_0 self.header.cur = result.CUR_0 self.header.sohnum = result.SOHNUM_0 def output(self, import_file: typing.TextIO): """ Output entire order to import_file. """ output = functools.partial( yamamotoyama.x3_imports.output_with_file, import_file ) output(self.header.convert_to_strings()) for detail in self.details: output(detail.convert_to_strings()) for subdetail in detail.subdetails: shipment = detail.sdhnum item = detail.itmref for record in subdetail.stojous(shipment, item): record_list = [ record['Code'], record['STA_0'], record['PCU_0'], record['QTYPCU_0'], record['LOT_0'], record['BPSLOT_0'], record['SERNUM_0'] ] #pprint.pprint(record_list) output(record_list) if __name__ == "__main__": main()