#!/usr/bin/env python3 """ Consume a 945 file from Source Logistics, and translate into a Sage X3 readable file. """ # pylint: disable=too-many-instance-attributes import dataclasses import datetime import decimal import functools import pathlib import re import shutil import typing import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText import records # type: ignore import yamamotoyama # type: ignore import yamamotoyama.x3_imports # type: ignore THIS_DIRECTORY = pathlib.Path(__file__).parent X12_DIRECTORY = THIS_DIRECTORY / "incoming" IMPORTS_DIRECTORY = THIS_DIRECTORY / "x3_imports" SOURCE_945_FILENAME_RE = re.compile( r"\A Yamamotoyama_945_ .* [.]edi \Z", re.X | re.M | re.S ) msg = MIMEMultipart() msg['Subject'] = '945 processing error: Possible duplicate order lines?' msg['Precedence'] = 'bulk' msg['From'] = 'x3report@stashtea.com' msg['To'] = 'bleeson@stashtea.com' def main(): """ Do it! """ for edi_filename in X12_DIRECTORY.iterdir(): if SOURCE_945_FILENAME_RE.match(edi_filename.name): process_file(edi_filename) shutil.move(edi_filename, X12_DIRECTORY / "archive" / edi_filename.name) def tokens_from_edi_file( edi_filename: pathlib.Path, ) -> typing.Iterator[typing.List[str]]: """ Read tokens from EDI file """ with edi_filename.open(encoding="utf-8", newline="") as edi_file: for record in edi_file.read().split("~"): fields = record.split("*") if fields[0] in { "ISA", "GS", "ST", "N1", "N2", "N3", "N4", "G62", "W27", "W10", "LX", "MAN", }: continue yield fields def process_file(edi_filename: pathlib.Path): """ Convert a specific EDI file into an import file. """ tracking_number_not_found = True warehouse_shipment = WarehouseShipment() warehouse_shipment.header.ylicplate = '' #if we don't find a tracking number, submit a blank for fields in tokens_from_edi_file(edi_filename): if fields[0] == "W06": _, _, sohnum, shidat_str = fields[:4] warehouse_shipment.sohnum = sohnum warehouse_shipment.header.shidat = datetime.datetime.strptime( shidat_str, "%Y%m%d" ).date() # 20230922 if fields[0] == "N9" and fields[1] == "2I" and len(fields) > 2 and tracking_number_not_found: warehouse_shipment.header.ylicplate = fields[2] tracking_number_not_found = False if fields[0] == "W12": # W12*CC*32*32*0*CA**VN*08279*01112025C~ _, _, qty_str, det_qty, _, _, _, _, itmref, lot = fields[:10] subdetail = WarehouseShipmentSubDetail( qtypcu=-1 * int(det_qty), lot=lot, ) warehouse_shipment.append( WarehouseShipmentDetail( sohnum=warehouse_shipment.sohnum, itmref=itmref, qty=int(qty_str), ), subdetail, ) if fields[0] == "W03": _, _, weight, _ = fields warehouse_shipment.header.growei = weight time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") with yamamotoyama.x3_imports.open_import_file( IMPORTS_DIRECTORY / f"ZSHIP945S_{warehouse_shipment.sohnum}_{time_stamp}.dat" ) as import_file: warehouse_shipment.output(import_file) @dataclasses.dataclass class WarehouseShipmentSubDetail: """ Information that goes onto a shipment sub-detail line, taken from ZSHIP945 template. """ sta: str = "A" pcu: str = "" qtypcu: int = 0 loc: str = "" lot: str = "" sernum: str = "" def convert_to_strings(self) -> typing.List[str]: """ Convert to strings for X3 import writing. """ return yamamotoyama.x3_imports.convert_to_strings( [ "S", self.sta, self.pcu, self.qtypcu, self.loc, self.lot, self.sernum, ] ) @dataclasses.dataclass class WarehouseShipmentDetail: """ Information that goes on a shipment detail line, taken from ZSHIP945 template. """ sohnum: str = "" soplin: int = 0 itmref: str = "" itmdes: str = "" sau: str = "" qty: int = 0 star91: str = "" star92: str = "" subdetails: typing.List[WarehouseShipmentSubDetail] = dataclasses.field( default_factory=list ) def append(self, subdetail: WarehouseShipmentSubDetail): """ Add subdetail """ subdetail.pcu = self.sau self.subdetails.append(subdetail) def check_subdetail_qty(self): """ Check for shortages by totaling up subdetail quantities. """ total_cases = 0 for subdetail in self.subdetails: total_cases += subdetail.qtypcu return abs(total_cases) def convert_to_strings(self) -> typing.List[str]: """ Convert to strings for X3 import writing. """ self.qty = self.check_subdetail_qty() return yamamotoyama.x3_imports.convert_to_strings( [ "L", self.sohnum, self.soplin, self.itmref, self.itmdes, self.sau, self.qty, self.star91, self.star92, ] ) def __eq__(self, item: typing.Any) -> bool: """ Test for equality """ if isinstance(item, str): return self.itmref == item if isinstance(item, WarehouseShipmentDetail): return self.itmref == item.itmref return False def fill(self): """ Set soplin & itmdes from itmref & sohnum """ def get() -> records.Record: with yamamotoyama.get_connection() as database: how_many = ( database.query( """ select count(*) as [how_many] from [PROD].[SORDERP] as [SOP] where [SOP].[SOHNUM_0] = :sohnum and [SOP].[ITMREF_0] = :itmref """, sohnum=self.sohnum, itmref=self.itmref, ) .first() .how_many ) if how_many == 1: return database.query( """ select top 1 [SOP].[SOPLIN_0] ,[SOP].[ITMDES1_0] ,[SOP].[SAU_0] from [PROD].[SORDERP] as [SOP] where [SOP].[SOHNUM_0] = :sohnum and [SOP].[ITMREF_0] = :itmref order by [SOP].[SOPLIN_0] """, sohnum=self.sohnum, itmref=self.itmref, ).first() else: emailtext = str(self.sohnum +' '+str(self.itmref)) msg.attach(MIMEText(emailtext, 'plain')) with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp: smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n typing.List[str]: """ Convert to X3 import line """ return yamamotoyama.x3_imports.convert_to_strings( [ "H", self.salfcy, self.stofcy, self.sdhnum, self.bpcord, self.bpaadd, self.cur, self.shidat.strftime("%Y%m%d"), self.cfmflg, self.pjt, self.bptnum, self.ylicplate, self.invdtaamt_2, self.invdtaamt_3, self.invdtaamt_4, self.invdtaamt_5, self.invdtaamt_6, self.invdtaamt_7, self.invdtaamt_8, self.invdtaamt_9, self.die, self.die_1, self.die_2, self.die_3, self.die_4, self.die_5, self.die_6, self.die_7, self.die_8, self.die_9, self.die_10, self.die_11, self.die_12, self.die_13, self.die_14, self.die_15, self.die_16, self.die_17, self.die_18, self.die_19, self.cce, self.cce_1, self.cce_2, self.cce_3, self.cce_4, self.cce_5, self.cce_6, self.cce_7, self.cce_8, self.cce_9, self.cce_10, self.cce_11, self.cce_12, self.cce_13, self.cce_14, self.cce_15, self.cce_16, self.cce_17, self.cce_18, self.cce_19, self.bpdnam, self.bpdaddlig, self.bpdaddlig_1, self.bpdaddlig_2, self.bpdposcod, self.bpdcty, self.bpdsat, self.bpdcry, self.bpdcrynam, self.sdhtyp, self.growei, self.pacnbr, self.star71, self.star72, self.star81, self.star82, ] ) class WarehouseShipmentDetailList: """ List of shipment details """ _details: typing.List[WarehouseShipmentDetail] _item_set: typing.Set[str] def __init__(self): self._details = [] self._item_set = set() def append( self, shipment_detail: WarehouseShipmentDetail, shipment_subdetail: WarehouseShipmentSubDetail, ): """ Append """ itmref = shipment_detail.itmref if itmref in self._item_set: for detail in self._details: if detail == itmref: detail.subdetails.append(shipment_subdetail) return self._item_set.add(itmref) shipment_detail.fill() shipment_detail.append(shipment_subdetail) self._details.append(shipment_detail) def __iter__(self): return iter(self._details) class WarehouseShipment: """ Warehosue shipment, both header & details """ header: WarehouseShipmentHeader details: WarehouseShipmentDetailList _sohnum: str def __init__(self): self.header = WarehouseShipmentHeader() self._sohnum = "" self.details = WarehouseShipmentDetailList() def append( self, shipment_detail: WarehouseShipmentDetail, shipment_subdetail: WarehouseShipmentSubDetail, ): """ Add detail information. """ self.details.append(shipment_detail, shipment_subdetail) @property def sohnum(self): """ Sales order number """ return self._sohnum @sohnum.setter def sohnum(self, value: str): if self._sohnum != value: self._sohnum = value if value: self._fill_info_from_so() def _get_so_from_x3(self) -> records.Record: """ Fetch sales order from X3 database. """ with yamamotoyama.get_connection() as db_connection: return db_connection.query( """ select [SOH].[SALFCY_0] ,[SOH].[STOFCY_0] ,[SOH].[BPCORD_0] ,[SOH].[BPAADD_0] ,[SOH].[CUR_0] ,[SOH].[INVDTAAMT_2] ,[SOH].[INVDTAAMT_3] ,[SOH].[INVDTAAMT_4] ,[SOH].[INVDTAAMT_5] ,[SOH].[INVDTAAMT_6] ,[SOH].[INVDTAAMT_7] ,[SOH].[INVDTAAMT_8] ,[SOH].[INVDTAAMT_9] ,[SOH].[DIE_0] ,[SOH].[DIE_1] ,[SOH].[DIE_2] ,[SOH].[DIE_3] ,[SOH].[DIE_4] ,[SOH].[DIE_5] ,[SOH].[DIE_6] ,[SOH].[DIE_7] ,[SOH].[DIE_8] ,[SOH].[DIE_9] ,[SOH].[DIE_10] ,[SOH].[DIE_11] ,[SOH].[DIE_12] ,[SOH].[DIE_13] ,[SOH].[DIE_14] ,[SOH].[DIE_15] ,[SOH].[DIE_16] ,[SOH].[DIE_17] ,[SOH].[DIE_18] ,[SOH].[DIE_19] ,[SOH].[CCE_0] ,[SOH].[CCE_1] ,[SOH].[CCE_2] ,[SOH].[CCE_3] ,[SOH].[CCE_4] ,[SOH].[CCE_5] ,[SOH].[CCE_6] ,[SOH].[CCE_7] ,[SOH].[CCE_8] ,[SOH].[CCE_9] ,[SOH].[CCE_10] ,[SOH].[CCE_11] ,[SOH].[CCE_12] ,[SOH].[CCE_13] ,[SOH].[CCE_14] ,[SOH].[CCE_15] ,[SOH].[CCE_16] ,[SOH].[CCE_17] ,[SOH].[CCE_18] ,[SOH].[CCE_19] ,[SOH].[BPDNAM_0] ,[SOH].[BPDADDLIG_0] ,[SOH].[BPDADDLIG_1] ,[SOH].[BPDADDLIG_2] ,[SOH].[BPDPOSCOD_0] ,[SOH].[BPDCTY_0] ,[SOH].[BPDSAT_0] ,[SOH].[BPDCRY_0] ,[SOH].[BPDCRYNAM_0] from [PROD].[SORDER] as [SOH] where [SOH].[SOHNUM_0] = :order """, order=self.sohnum, ).first() def _copy_accounting_codes(self, result: records.Record): """ Fill in all the accounting codes """ self.header.die = result.DIE_0 self.header.die_1 = result.DIE_1 self.header.die_2 = result.DIE_2 self.header.die_3 = result.DIE_3 self.header.die_4 = result.DIE_4 self.header.die_5 = result.DIE_5 self.header.die_6 = result.DIE_6 self.header.die_7 = result.DIE_7 self.header.die_8 = result.DIE_8 self.header.die_9 = result.DIE_9 self.header.die_10 = result.DIE_10 self.header.die_11 = result.DIE_11 self.header.die_12 = result.DIE_12 self.header.die_13 = result.DIE_13 self.header.die_14 = result.DIE_14 self.header.die_15 = result.DIE_15 self.header.die_16 = result.DIE_16 self.header.die_17 = result.DIE_17 self.header.die_18 = result.DIE_18 self.header.die_19 = result.DIE_19 self.header.cce = result.CCE_0 self.header.cce_1 = result.CCE_1 self.header.cce_2 = result.CCE_2 self.header.cce_3 = result.CCE_3 self.header.cce_4 = result.CCE_4 self.header.cce_5 = result.CCE_5 self.header.cce_6 = result.CCE_6 self.header.cce_7 = result.CCE_7 self.header.cce_8 = result.CCE_8 self.header.cce_9 = result.CCE_9 self.header.cce_10 = result.CCE_10 self.header.cce_11 = result.CCE_11 self.header.cce_12 = result.CCE_12 self.header.cce_13 = result.CCE_13 self.header.cce_14 = result.CCE_14 self.header.cce_15 = result.CCE_15 self.header.cce_16 = result.CCE_16 self.header.cce_17 = result.CCE_17 self.header.cce_18 = result.CCE_18 self.header.cce_19 = result.CCE_19 def _fill_info_from_so(self): """ When we learn the SOHNUM, we can copy information from the sales order. """ result = self._get_so_from_x3() self.header.salfcy = result.SALFCY_0 self.header.stofcy = result.STOFCY_0 self.header.bpcord = result.BPCORD_0 self.header.bpaadd = result.BPAADD_0 self.header.cur = result.CUR_0 self.header.invdtaamt_2 = result.INVDTAAMT_2 self.header.invdtaamt_3 = result.INVDTAAMT_3 self.header.invdtaamt_4 = result.INVDTAAMT_4 self.header.invdtaamt_5 = result.INVDTAAMT_5 self.header.invdtaamt_6 = result.INVDTAAMT_6 self.header.invdtaamt_7 = result.INVDTAAMT_7 self.header.invdtaamt_8 = result.INVDTAAMT_8 self.header.invdtaamt_9 = result.INVDTAAMT_9 self._copy_accounting_codes(result) self.header.bpdnam = result.BPDNAM_0 self.header.bpdaddlig = result.BPDADDLIG_0 self.header.bpdaddlig_1 = result.BPDADDLIG_1 self.header.bpdaddlig_2 = result.BPDADDLIG_2 self.header.bpdposcod = result.BPDPOSCOD_0 self.header.bpdcty = result.BPDCTY_0 self.header.bpdsat = result.BPDSAT_0 self.header.bpdcry = result.BPDCRY_0 self.header.bpdcrynam = result.BPDCRYNAM_0 def output(self, import_file: typing.TextIO): """ Output entire order to import_file. """ output = functools.partial( yamamotoyama.x3_imports.output_with_file, import_file ) output(self.header.convert_to_strings()) for detail in self.details: output(detail.convert_to_strings()) for subdetail in detail.subdetails: output(subdetail.convert_to_strings()) if __name__ == "__main__": main()