#!/usr/bin/env python3 """ Consume a 867 file from Shandex, and translate into a Sage X3 readable file-ZSHIP867. """ # pylint: disable=too-many-instance-attributes import dataclasses import datetime import decimal import functools import pathlib import re import shutil import typing import pprint import records # type: ignore import yamamotoyama # type: ignore import yamamotoyama.x3_imports # type: ignore THIS_DIRECTORY = pathlib.Path(__file__).parent X12_DIRECTORY = THIS_DIRECTORY / "incoming" IMPORTS_DIRECTORY = THIS_DIRECTORY / "x3_imports" EDI_997_DIRECTORY = THIS_DIRECTORY / "997_processing" SOURCE_867_FILENAME_RE = re.compile( r"\A 867_YAMAMOTOYAMA_ .* [.]edi \Z", re.X | re.M | re.S ) UOM_MAPPING = { "CA" : "CS", "EC" : "EA" } def main(): """ Do it! """ for edi_filename in X12_DIRECTORY.iterdir(): if SOURCE_867_FILENAME_RE.match(edi_filename.name): process_file(edi_filename) shutil.copy(edi_filename, EDI_997_DIRECTORY / edi_filename.name) shutil.move(edi_filename, THIS_DIRECTORY / "processed_867s" / edi_filename.name) #They go in here so we can use them in the dashboard script combine_zship867s() def combine_zship867s(): """ Collect all ZSHIP867 imports into a single file for easy import. """ archive_directory = IMPORTS_DIRECTORY / "archive" archive_directory.mkdir(exist_ok=True) with (IMPORTS_DIRECTORY / "ZSHIP867.dat").open( "w", encoding="utf-8", newline="\n" ) as combined_import_file: for individual_import_filename in IMPORTS_DIRECTORY.glob( "ZSHIP867_*.dat" ): with individual_import_filename.open( "r", encoding="utf-8", newline="\n" ) as individual_import_file: for line in individual_import_file: combined_import_file.write(line) shutil.move( individual_import_filename, archive_directory / individual_import_filename.name, ) def tokens_from_edi_file( edi_filename: pathlib.Path, ) -> typing.Iterator[typing.List[str]]: """ Read tokens from EDI file """ with edi_filename.open(encoding="utf-8", newline="") as edi_file: for record in edi_file.read().split("~"): fields = record.split("*") if fields[0] in { "ISA", "ST", "BPT", "N1", "PTD", "REF", }: continue yield fields def get_product_from_gtin(gtin): with yamamotoyama.get_connection() as database: result = database.query( """ select ITM.ITMREF_0, ITM.ITMDES1_0 from PROD.ITMMASTER ITM join PROD.ITMFACILIT ITF on ITM.ITMREF_0 = ITF.ITMREF_0 and ITF.STOFCY_0 = 'WON' where ZCASEUPC_0 = :zcaseupc """, zcaseupc=gtin, ).first() return result def process_file(edi_filename: pathlib.Path): """ Convert a specific EDI file into an import file. """ warehouse_shipment = WarehouseShipment() for fields in tokens_from_edi_file(edi_filename): if fields[0] == "GS": control_number = fields[5] warehouse_shipment.header.ylicplate = f'{control_number}' if fields[0] == "DTM": date_field = fields[2] warehouse_shipment.header.shidat = datetime.datetime.strptime( date_field, "%Y%m%d") if fields[0] == "QTY": #QTY*39*10*CA _, _, qty_str, uom = fields[:4] #warehouse_shipment.sohnum = sohnum if fields[0] == "LIN": #LIN**VN*10077652082224*LT*09032026C# _, _, _, gtin, _, lot = fields[:6] if fields[0] == "UIT": #UIT*CA*0.00 _, _, price = fields[:3] sau = UOM_MAPPING[uom] lookup_values = get_product_from_gtin(gtin) itmref = lookup_values['ITMREF_0'] itmdes = lookup_values['ITMDES1_0'] subdetail = WarehouseShipmentSubDetail( qtypcu=-1 * int(qty_str), lot=lot, ) warehouse_shipment.append( WarehouseShipmentDetail( #sohnum=warehouse_shipment.sohnum, itmref=itmref, itmdes=itmdes, qty=int(qty_str), gropri=price, sau=sau ), subdetail, ) time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") with yamamotoyama.x3_imports.open_import_file( IMPORTS_DIRECTORY / f"ZSHIP867_{control_number}_{time_stamp}.dat" ) as import_file: warehouse_shipment.output(import_file) @dataclasses.dataclass class WarehouseShipmentSubDetail: """ Information that goes onto a shipment sub-detail line, taken from ZSHIP867 template. """ sta: str = "A" pcu: str = "" qtypcu: int = 0 loc: str = "" lot: str = "" sernum: str = "" def convert_to_strings(self) -> typing.List[str]: """ Convert to strings for X3 import writing. """ return yamamotoyama.x3_imports.convert_to_strings( [ "S", self.sta, self.pcu, self.qtypcu, self.loc, self.lot, self.sernum, ] ) @dataclasses.dataclass class WarehouseShipmentDetail: """ Information that goes on a shipment detail line, taken from ZSHIP867 template. """ sohnum: str = "" soplin: int = 0 itmref: str = "" itmdes: str = "" sau: str = "" qty: int = 0 gropri: decimal.Decimal = decimal.Decimal() star91: str = "" star92: str = "" subdetails: typing.List[WarehouseShipmentSubDetail] = dataclasses.field( default_factory=list ) def append(self, subdetail: WarehouseShipmentSubDetail): """ Add subdetail """ subdetail.pcu = self.sau self.subdetails.append(subdetail) def check_subdetail_qty(self): """ Check for shortages by totaling up subdetail quantities. """ total_cases = 0 for subdetail in self.subdetails: total_cases += subdetail.qtypcu return abs(total_cases) def convert_to_strings(self) -> typing.List[str]: """ Convert to strings for X3 import writing. """ self.qty = self.check_subdetail_qty() return yamamotoyama.x3_imports.convert_to_strings( [ "L", self.sohnum, self.soplin, self.itmref, self.itmdes, self.sau, self.qty, self.gropri, self.star91, self.star92, ] ) def __eq__(self, item: typing.Any) -> bool: """ Test for equality """ if isinstance(item, str): return self.itmref == item if isinstance(item, WarehouseShipmentDetail): return self.itmref == item.itmref return False def fill(self): """ Set soplin & itmdes from itmref & sohnum """ def get() -> records.Record: with yamamotoyama.get_connection() as database: how_many = ( database.query( """ select count(*) as [how_many] from [PROD].[SORDERP] as [SOP] where [SOP].[SOHNUM_0] = :sohnum and [SOP].[ITMREF_0] = :itmref """, sohnum=self.sohnum, itmref=self.itmref, ) .first() .how_many ) if how_many == 1: return database.query( """ select top 1 [SOP].[SOPLIN_0] ,[SOP].[ITMDES1_0] ,[SOP].[SAU_0] from [PROD].[SORDERP] as [SOP] where [SOP].[SOHNUM_0] = :sohnum and [SOP].[ITMREF_0] = :itmref order by [SOP].[SOPLIN_0] """, sohnum=self.sohnum, itmref=self.itmref, ).first() else: emailtext = str(self.sohnum +' '+str(self.itmref)) msg.attach(MIMEText(emailtext, 'plain')) with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp: smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n typing.List[str]: """ Convert to X3 import line """ return yamamotoyama.x3_imports.convert_to_strings( [ "H", self.salfcy, self.stofcy, self.sdhnum, self.bpcord, self.bpaadd, self.cur, self.shidat.strftime("%Y%m%d"), self.cfmflg, self.pjt, self.bptnum, self.ylicplate, self.invdtaamt_2, self.invdtaamt_3, self.invdtaamt_4, self.invdtaamt_5, self.invdtaamt_6, self.invdtaamt_7, self.invdtaamt_8, self.invdtaamt_9, self.die, self.die_1, self.die_2, self.die_3, self.die_4, self.die_5, self.die_6, self.die_7, self.die_8, self.die_9, self.die_10, self.die_11, self.die_12, self.die_13, self.die_14, self.die_15, self.die_16, self.die_17, self.die_18, self.die_19, self.cce, self.cce_1, self.cce_2, self.cce_3, self.cce_4, self.cce_5, self.cce_6, self.cce_7, self.cce_8, self.cce_9, self.cce_10, self.cce_11, self.cce_12, self.cce_13, self.cce_14, self.cce_15, self.cce_16, self.cce_17, self.cce_18, self.cce_19, self.bpdnam, self.bpdaddlig, self.bpdaddlig_1, self.bpdaddlig_2, self.bpdposcod, self.bpdcty, self.bpdsat, self.bpdcry, self.bpdcrynam, self.sdhtyp, self.growei, self.pacnbr, self.star71, self.star72, self.star81, self.star82, ] ) class WarehouseShipmentDetailList: """ List of shipment details """ _details: typing.List[WarehouseShipmentDetail] _item_set: typing.Set[str] def __init__(self): self._details = [] self._item_set = set() def append( self, shipment_detail: WarehouseShipmentDetail, shipment_subdetail: WarehouseShipmentSubDetail, ): """ Append """ itmref = shipment_detail.itmref # if itmref in self._item_set: # for detail in self._details: # if detail == itmref: # detail.subdetails.append(shipment_subdetail) # return self._item_set.add(itmref) #shipment_detail.fill() shipment_detail.append(shipment_subdetail) self._details.append(shipment_detail) def __iter__(self): return iter(self._details) class WarehouseShipment: """ Warehosue shipment, both header & details """ header: WarehouseShipmentHeader details: WarehouseShipmentDetailList _sohnum: str def __init__(self): self.header = WarehouseShipmentHeader() self._sohnum = "" self.details = WarehouseShipmentDetailList() def append( self, shipment_detail: WarehouseShipmentDetail, shipment_subdetail: WarehouseShipmentSubDetail, ): """ Add detail information. """ self.details.append(shipment_detail, shipment_subdetail) @property def sohnum(self): """ Sales order number """ return self._sohnum @sohnum.setter def sohnum(self, value: str): if self._sohnum != value: self._sohnum = value if value: self._fill_info_from_so() def _get_so_from_x3(self) -> records.Record: """ Fetch sales order from X3 database. """ with yamamotoyama.get_connection() as db_connection: return db_connection.query( """ select [SOH].[SALFCY_0] ,[SOH].[STOFCY_0] ,[SOH].[BPCORD_0] ,[SOH].[BPAADD_0] ,[SOH].[CUR_0] ,[SOH].[INVDTAAMT_2] ,[SOH].[INVDTAAMT_3] ,[SOH].[INVDTAAMT_4] ,[SOH].[INVDTAAMT_5] ,[SOH].[INVDTAAMT_6] ,[SOH].[INVDTAAMT_7] ,[SOH].[INVDTAAMT_8] ,[SOH].[INVDTAAMT_9] ,[SOH].[DIE_0] ,[SOH].[DIE_1] ,[SOH].[DIE_2] ,[SOH].[DIE_3] ,[SOH].[DIE_4] ,[SOH].[DIE_5] ,[SOH].[DIE_6] ,[SOH].[DIE_7] ,[SOH].[DIE_8] ,[SOH].[DIE_9] ,[SOH].[DIE_10] ,[SOH].[DIE_11] ,[SOH].[DIE_12] ,[SOH].[DIE_13] ,[SOH].[DIE_14] ,[SOH].[DIE_15] ,[SOH].[DIE_16] ,[SOH].[DIE_17] ,[SOH].[DIE_18] ,[SOH].[DIE_19] ,[SOH].[CCE_0] ,[SOH].[CCE_1] ,[SOH].[CCE_2] ,[SOH].[CCE_3] ,[SOH].[CCE_4] ,[SOH].[CCE_5] ,[SOH].[CCE_6] ,[SOH].[CCE_7] ,[SOH].[CCE_8] ,[SOH].[CCE_9] ,[SOH].[CCE_10] ,[SOH].[CCE_11] ,[SOH].[CCE_12] ,[SOH].[CCE_13] ,[SOH].[CCE_14] ,[SOH].[CCE_15] ,[SOH].[CCE_16] ,[SOH].[CCE_17] ,[SOH].[CCE_18] ,[SOH].[CCE_19] ,[SOH].[BPDNAM_0] ,[SOH].[BPDADDLIG_0] ,[SOH].[BPDADDLIG_1] ,[SOH].[BPDADDLIG_2] ,[SOH].[BPDPOSCOD_0] ,[SOH].[BPDCTY_0] ,[SOH].[BPDSAT_0] ,[SOH].[BPDCRY_0] ,[SOH].[BPDCRYNAM_0] from [PROD].[SORDER] as [SOH] where [SOH].[SOHNUM_0] = :order """, order=self.sohnum, ).first() def _copy_accounting_codes(self, result: records.Record): """ Fill in all the accounting codes """ self.header.die = result.DIE_0 self.header.die_1 = result.DIE_1 self.header.die_2 = result.DIE_2 self.header.die_3 = result.DIE_3 self.header.die_4 = result.DIE_4 self.header.die_5 = result.DIE_5 self.header.die_6 = result.DIE_6 self.header.die_7 = result.DIE_7 self.header.die_8 = result.DIE_8 self.header.die_9 = result.DIE_9 self.header.die_10 = result.DIE_10 self.header.die_11 = result.DIE_11 self.header.die_12 = result.DIE_12 self.header.die_13 = result.DIE_13 self.header.die_14 = result.DIE_14 self.header.die_15 = result.DIE_15 self.header.die_16 = result.DIE_16 self.header.die_17 = result.DIE_17 self.header.die_18 = result.DIE_18 self.header.die_19 = result.DIE_19 self.header.cce = result.CCE_0 self.header.cce_1 = result.CCE_1 self.header.cce_2 = result.CCE_2 self.header.cce_3 = result.CCE_3 self.header.cce_4 = result.CCE_4 self.header.cce_5 = result.CCE_5 self.header.cce_6 = result.CCE_6 self.header.cce_7 = result.CCE_7 self.header.cce_8 = result.CCE_8 self.header.cce_9 = result.CCE_9 self.header.cce_10 = result.CCE_10 self.header.cce_11 = result.CCE_11 self.header.cce_12 = result.CCE_12 self.header.cce_13 = result.CCE_13 self.header.cce_14 = result.CCE_14 self.header.cce_15 = result.CCE_15 self.header.cce_16 = result.CCE_16 self.header.cce_17 = result.CCE_17 self.header.cce_18 = result.CCE_18 self.header.cce_19 = result.CCE_19 def _fill_info_from_so(self): """ When we learn the SOHNUM, we can copy information from the sales order. """ result = self._get_so_from_x3() self.header.salfcy = result.SALFCY_0 self.header.stofcy = result.STOFCY_0 self.header.bpcord = result.BPCORD_0 self.header.bpaadd = result.BPAADD_0 self.header.cur = result.CUR_0 self.header.invdtaamt_2 = result.INVDTAAMT_2 self.header.invdtaamt_3 = result.INVDTAAMT_3 self.header.invdtaamt_4 = result.INVDTAAMT_4 self.header.invdtaamt_5 = result.INVDTAAMT_5 self.header.invdtaamt_6 = result.INVDTAAMT_6 self.header.invdtaamt_7 = result.INVDTAAMT_7 self.header.invdtaamt_8 = result.INVDTAAMT_8 self.header.invdtaamt_9 = result.INVDTAAMT_9 self._copy_accounting_codes(result) self.header.bpdnam = result.BPDNAM_0 self.header.bpdaddlig = result.BPDADDLIG_0 self.header.bpdaddlig_1 = result.BPDADDLIG_1 self.header.bpdaddlig_2 = result.BPDADDLIG_2 self.header.bpdposcod = result.BPDPOSCOD_0 self.header.bpdcty = result.BPDCTY_0 self.header.bpdsat = result.BPDSAT_0 self.header.bpdcry = result.BPDCRY_0 self.header.bpdcrynam = result.BPDCRYNAM_0 def output(self, import_file: typing.TextIO): """ Output entire order to import_file. """ output = functools.partial( yamamotoyama.x3_imports.output_with_file, import_file ) output(self.header.convert_to_strings()) for detail in self.details: output(detail.convert_to_strings()) for subdetail in detail.subdetails: output(subdetail.convert_to_strings()) if __name__ == "__main__": main()