#!/usr/bin/env python3 """ Consume a 867 file from Shandex, and translate into a Sage X3 readable file-ZSHIP867. New changes, need to bring in under ship to customer whenever possible. Build a mapping file of known customers by matching their address to x3 codes need to not import a shipment if a customer mapping doesn't exist. """ # pylint: disable=too-many-instance-attributes import dataclasses import datetime import decimal import functools import pathlib import re import shutil import typing import pprint import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText import records # type: ignore import yamamotoyama # type: ignore import yamamotoyama.x3_imports # type: ignore THIS_DIRECTORY = pathlib.Path(__file__).parent X12_DIRECTORY = THIS_DIRECTORY / "incoming" IMPORTS_DIRECTORY = THIS_DIRECTORY / "x3_imports" EDI_997_DIRECTORY = THIS_DIRECTORY / "997_processing" SOURCE_867_FILENAME_RE = re.compile( r"\A 867_STASH-YAMAMOTOYAMA_ .* [.]edi \Z", re.X | re.M | re.S ) # Not needed, Shandex stores everything how they want so we need to look up in X3 # UOM_MAPPING = { # "CA" : "CS", # "EC" : "EA" #} #NAME_ADDRESS_CITY_TERRITORY_POSTAL : X3 Customer Code X3_CUSTOMER_MAPPING = { 'AVRI1000_AVRIQC' : 'AVRI0001', 'BULK1000_BULKAU' : 'BULK0001', 'COOP2000_190148' : 'FEDE0006', 'COOP2000_190149' : 'FEDE0005', 'COOP2000_607S' : 'FEDE0003', 'COOP2000_CAL' : 'FEDE0007', 'HORI1000_HORIBC' : 'HORI0001', 'LOND1000_190005' : 'LOND0001', 'NATI1000_28' : 'LOBL0002', 'NATI1000_34' : 'LOBL0006', 'NATI1000_D022' : 'LOBL0001', 'ONTA1100_ONTAON' : 'ONTA0002', 'OVER1000_5111' : 'OVER0002', 'OVER1000_A24' : 'OVER0004', 'PARA1100_PARABC' : 'PARA0004', 'PSCN1000_PSCBC' : 'PSCN0002', 'PURE1000_PUREON' : 'PURE0004', 'PURI1000_PURION' : 'PURI0002', 'SATA1000_SATAQC' : 'SATA0002', 'UNFI1000_UNFIBC' : 'UNFI0011', 'UNFI1000_UNFION' : 'UNFI0004', 'SAMP1000_0000' : 'YARI0001', 'PURI1000_PURIQC' : 'PURI0005', 'JIVA1000_JIVABC' : 'JIVA0002', 'WELL1000_WELL' : 'WELL0002', 'WELL1000_WELCAL' : 'WELL0003', 'AMAZ1200_YYC4' : 'AMAZ0210', 'AMAZ1200_YVR2' : 'AMAZ0014', 'AMAZ1200_YYZ4' : 'AMAZ0049', 'AMAZ1200_YXU1' : 'AMAZ0189', 'AMAZ1200_YOW3' : 'AMAZ0176', 'PARA1100_0000' : 'PARA0004', 'AMAZ1200_YVR4' : 'AMAZ0099', 'AMAZ1200_YHM1' : 'AMAZ0169', 'AMAZ1200_YYZ7' :'AMAZ0100', 'PURI1000_PURIBC' : 'PURI0003', 'PURI1000_PURIAB' : 'PURI0004', 'AMAZ1200_YEG2' : 'AMAZ0179', } def main(): """ Do it! """ for edi_filename in X12_DIRECTORY.iterdir(): if SOURCE_867_FILENAME_RE.match(edi_filename.name): process_file(edi_filename) shutil.copy(edi_filename, EDI_997_DIRECTORY / edi_filename.name) shutil.move(edi_filename, THIS_DIRECTORY / "processed_867s" / edi_filename.name) #They go in here so we can use them in the dashboard script combine_zship867s() def missing_customer_alert(customer_key): msg = MIMEMultipart() msg['Subject'] = 'Shandex 867 - Missing X3 Customer' msg['Precedence'] = 'bulk' msg['From'] = 'x3report@stashtea.com' msg['To'] = 'technical-contact@stashtea.com' emailtext = f'Missing value: {customer_key}' msg.attach(MIMEText(emailtext, 'plain')) with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp: smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n typing.Iterator[typing.List[str]]: """ Read tokens from EDI file """ with edi_filename.open(encoding="utf-8", newline="") as edi_file: for record in edi_file.read().split("~"): fields = record.split("*") if fields[0] in { "ISA", "GS", "ST", "BPT", }: continue yield fields def get_product_from_gtin(gtin): #pprint.pprint(gtin) with yamamotoyama.get_connection() as database: result = database.query( """ select [ITM].[ITMREF_0], [ITM].[ITMDES1_0], [ITM].[EANCOD_0], [ITM].[ZCASEUPC_0], [ITM].[STU_0] from PROD.ITMMASTER ITM join PROD.ITMFACILIT ITF on ITM.ITMREF_0 = ITF.ITMREF_0 and ITF.STOFCY_0 = 'WON' where replace([ITM].[ZCASEUPC_0],' ','') = :zcaseupc """, zcaseupc=gtin, ).first() if result is None: result = database.query( """ select [ITM].[ITMREF_0], [ITM].[ITMDES1_0], [ITM].[EANCOD_0], [ITM].[ZCASEUPC_0], [ITM].[STU_0] from [PROD].[ITMMASTER] [ITM] join [PROD].[ITMFACILIT] [ITF] on [ITM].[ITMREF_0] = [ITF].[ITMREF_0] and [ITF].[STOFCY_0] = 'WON' where replace([ITM].[EANCOD_0],' ','') = :zcaseupc """, zcaseupc=gtin, ).first() return result def process_file(edi_filename: pathlib.Path): """ Convert a specific EDI file into an import file. """ shipping_date = '' previous_picking_number = '' po_number = '' cust_po_number = '' warehouse_shipment = WarehouseShipment() for fields in tokens_from_edi_file(edi_filename): if fields[0] == "DTM": shipping_date = fields[2] if fields[0] == "PTD" and len(fields) > 2:#There is one PTD in the header that is not used picking_number = fields[5] warehouse_shipment.header.ylicplate = f'{previous_picking_number}' if po_number != '': warehouse_shipment.header.yclippership = cust_po_number warehouse_shipment.header.ylicplate = f'{po_number}' if picking_number != previous_picking_number and previous_picking_number != '': if warehouse_shipment.header.bpdnam != 'Shandex Group': #pprint.pprint(warehouse_shipment.header.ylicplate) warehouse_shipment.header.shidat = datetime.datetime.strptime( shipping_date, "%Y%m%d") time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") with yamamotoyama.x3_imports.open_import_file( IMPORTS_DIRECTORY / f"ZSHIP867_{warehouse_shipment.header.ylicplate}_{time_stamp}.dat" ) as import_file: warehouse_shipment.output(import_file) warehouse_shipment = WarehouseShipment() po_number = '' warehouse_shipment.header.ylicplate = f'{picking_number}' previous_picking_number = picking_number if fields[0] =='REF' and fields[1] == 'PO': cust_po_number = fields[2] if fields[0] =='REF' and fields[1] == 'IL': po_number = fields[2] if fields[0] == "N1" and fields[1] == 'ST': ship_to_customer = fields[2] shandex_code_part1 = fields[4] warehouse_shipment.header.bpdnam = ship_to_customer if fields[0] == "N1" and fields[1] == 'BY': shandex_code_part2 = fields[4] if fields[0] == "N3": ship_to_address = fields[1] warehouse_shipment.header.bpdaddlig = ship_to_address if fields[0] == "N4": ship_to_city = fields[1] ship_to_province = fields[2] ship_to_zip = fields[3] warehouse_shipment.header.bpdposcod = ship_to_zip warehouse_shipment.header.bpdcty = ship_to_city warehouse_shipment.header.bpdsat = ship_to_province customer_key = warehouse_shipment.create_customer_key(shandex_code_part2, shandex_code_part1) if customer_key == 'SAMP1000_0000': #flag sample orders better warehouse_shipment.header.bpdnam = 'SMP: ' + warehouse_shipment.header.bpdnam if customer_key not in X3_CUSTOMER_MAPPING.keys(): pprint.pprint(customer_key + ' not found.') missing_customer_alert(customer_key) raise NotImplementedError else: warehouse_shipment.header.bpcord = X3_CUSTOMER_MAPPING[customer_key] if fields[0] == "QTY": #QTY*39*10*CA _, _, qty_str, uom = fields[:4] #warehouse_shipment.sohnum = sohnum if fields[0] == "LIN": #LIN**VN*10077652082224*LT*09032026C# _, _, _, gtin, _, lot = fields[:6] if fields[0] == "AMT": #AMT*LP*53.90 _, _, price = fields[:3] lookup_values = get_product_from_gtin(gtin) itmref = lookup_values['ITMREF_0'] itmdes = lookup_values['ITMDES1_0'] sau = lookup_values['STU_0'] #pprint.pprint(itmdes) subdetail = WarehouseShipmentSubDetail( qtypcu=-1 * int(qty_str), lot=lot, ) warehouse_shipment.append( WarehouseShipmentDetail( #sohnum=warehouse_shipment.sohnum, itmref=itmref, itmdes=itmdes, qty=int(qty_str), gropri=price, sau=sau ), subdetail, ) #pprint.pprint(warehouse_shipment.header.ylicplate) warehouse_shipment.header.shidat = datetime.datetime.strptime( shipping_date, "%Y%m%d") time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") with yamamotoyama.x3_imports.open_import_file( IMPORTS_DIRECTORY / f"ZSHIP867_{picking_number}_{time_stamp}.dat" ) as import_file: warehouse_shipment.output(import_file) @dataclasses.dataclass class WarehouseShipmentSubDetail: """ Information that goes onto a shipment sub-detail line, taken from ZSHIP867 template. """ sta: str = "A" pcu: str = "" qtypcu: int = 0 loc: str = "" lot: str = "" sernum: str = "" def convert_to_strings(self) -> typing.List[str]: """ Convert to strings for X3 import writing. """ return yamamotoyama.x3_imports.convert_to_strings( [ "S", self.sta, self.pcu, self.qtypcu, self.loc, self.lot, self.sernum, ] ) @dataclasses.dataclass class WarehouseShipmentDetail: """ Information that goes on a shipment detail line, taken from ZSHIP867 template. """ sohnum: str = "" soplin: int = 0 itmref: str = "" itmdes: str = "" sau: str = "" qty: int = 0 gropri: decimal.Decimal = decimal.Decimal() star91: str = "" star92: str = "" subdetails: typing.List[WarehouseShipmentSubDetail] = dataclasses.field( default_factory=list ) def append(self, subdetail: WarehouseShipmentSubDetail): """ Add subdetail """ subdetail.pcu = self.sau self.subdetails.append(subdetail) def check_subdetail_qty(self): """ Check for shortages by totaling up subdetail quantities. """ total_cases = 0 for subdetail in self.subdetails: total_cases += subdetail.qtypcu return abs(total_cases) def convert_to_strings(self) -> typing.List[str]: """ Convert to strings for X3 import writing. """ self.qty = self.check_subdetail_qty() return yamamotoyama.x3_imports.convert_to_strings( [ "L", self.sohnum, self.soplin, self.itmref, self.itmdes, self.sau, self.qty, self.gropri, self.star91, self.star92, ] ) def __eq__(self, item: typing.Any) -> bool: """ Test for equality """ if isinstance(item, str): return self.itmref == item if isinstance(item, WarehouseShipmentDetail): return self.itmref == item.itmref return False def fill(self): """ Set soplin & itmdes from itmref & sohnum """ def get() -> records.Record: with yamamotoyama.get_connection() as database: how_many = ( database.query( """ select count(*) as [how_many] from [PROD].[SORDERP] as [SOP] where [SOP].[SOHNUM_0] = :sohnum and [SOP].[ITMREF_0] = :itmref """, sohnum=self.sohnum, itmref=self.itmref, ) .first() .how_many ) if how_many == 1: return database.query( """ select top 1 [SOP].[SOPLIN_0] ,[SOP].[ITMDES1_0] ,[SOP].[SAU_0] from [PROD].[SORDERP] as [SOP] where [SOP].[SOHNUM_0] = :sohnum and [SOP].[ITMREF_0] = :itmref order by [SOP].[SOPLIN_0] """, sohnum=self.sohnum, itmref=self.itmref, ).first() else: emailtext = str(self.sohnum +' '+str(self.itmref)) msg.attach(MIMEText(emailtext, 'plain')) with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp: smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n typing.List[str]: """ Convert to X3 import line """ return yamamotoyama.x3_imports.convert_to_strings( [ "H", self.salfcy, self.stofcy, self.sdhnum, self.bpcinv, self.bpcord, self.bpaadd, self.cur, self.shidat.strftime("%Y%m%d"), self.cfmflg, self.pjt, self.bptnum, self.ylicplate, self.yclippership, self.invdtaamt_2, self.invdtaamt_3, self.invdtaamt_4, self.invdtaamt_5, self.invdtaamt_6, self.invdtaamt_7, self.invdtaamt_8, self.invdtaamt_9, self.die, self.die_1, self.die_2, self.die_3, self.die_4, self.die_5, self.die_6, self.die_7, self.die_8, self.die_9, self.die_10, self.die_11, self.die_12, self.die_13, self.die_14, self.die_15, self.die_16, self.die_17, self.die_18, self.die_19, self.cce, self.cce_1, self.cce_2, self.cce_3, self.cce_4, self.cce_5, self.cce_6, self.cce_7, self.cce_8, self.cce_9, self.cce_10, self.cce_11, self.cce_12, self.cce_13, self.cce_14, self.cce_15, self.cce_16, self.cce_17, self.cce_18, self.cce_19, self.bpdnam, self.bpdaddlig, self.bpdaddlig_1, self.bpdaddlig_2, self.bpdposcod, self.bpdcty, self.bpdsat, self.bpdcry, self.bpdcrynam, self.sdhtyp, self.growei, self.pacnbr, self.star71, self.star72, self.star81, self.star82, ] ) class WarehouseShipmentDetailList: """ List of shipment details """ _details: typing.List[WarehouseShipmentDetail] _item_set: typing.Set[str] def __init__(self): self._details = [] self._item_set = set() def append( self, shipment_detail: WarehouseShipmentDetail, shipment_subdetail: WarehouseShipmentSubDetail, ): """ Append """ itmref = shipment_detail.itmref # if itmref in self._item_set: # for detail in self._details: # if detail == itmref: # detail.subdetails.append(shipment_subdetail) # return self._item_set.add(itmref) #shipment_detail.fill() shipment_detail.append(shipment_subdetail) self._details.append(shipment_detail) def __iter__(self): return iter(self._details) class WarehouseShipment: """ Warehosue shipment, both header & details """ header: WarehouseShipmentHeader details: WarehouseShipmentDetailList _sohnum: str def __init__(self): self.header = WarehouseShipmentHeader() self._sohnum = "" self.details = WarehouseShipmentDetailList() def append( self, shipment_detail: WarehouseShipmentDetail, shipment_subdetail: WarehouseShipmentSubDetail, ): """ Add detail information. """ self.details.append(shipment_detail, shipment_subdetail) @property def sohnum(self): """ Sales order number """ return self._sohnum @sohnum.setter def sohnum(self, value: str): if self._sohnum != value: self._sohnum = value if value: self._fill_info_from_so() def create_customer_key(self, part1, part2): key = (part1 + '_' + part2).replace(' ', '_') return key def _get_so_from_x3(self) -> records.Record: """ Fetch sales order from X3 database. """ with yamamotoyama.get_connection() as db_connection: return db_connection.query( """ select [SOH].[SALFCY_0] ,[SOH].[STOFCY_0] ,[SOH].[BPCORD_0] ,[SOH].[BPAADD_0] ,[SOH].[CUR_0] ,[SOH].[INVDTAAMT_2] ,[SOH].[INVDTAAMT_3] ,[SOH].[INVDTAAMT_4] ,[SOH].[INVDTAAMT_5] ,[SOH].[INVDTAAMT_6] ,[SOH].[INVDTAAMT_7] ,[SOH].[INVDTAAMT_8] ,[SOH].[INVDTAAMT_9] ,[SOH].[DIE_0] ,[SOH].[DIE_1] ,[SOH].[DIE_2] ,[SOH].[DIE_3] ,[SOH].[DIE_4] ,[SOH].[DIE_5] ,[SOH].[DIE_6] ,[SOH].[DIE_7] ,[SOH].[DIE_8] ,[SOH].[DIE_9] ,[SOH].[DIE_10] ,[SOH].[DIE_11] ,[SOH].[DIE_12] ,[SOH].[DIE_13] ,[SOH].[DIE_14] ,[SOH].[DIE_15] ,[SOH].[DIE_16] ,[SOH].[DIE_17] ,[SOH].[DIE_18] ,[SOH].[DIE_19] ,[SOH].[CCE_0] ,[SOH].[CCE_1] ,[SOH].[CCE_2] ,[SOH].[CCE_3] ,[SOH].[CCE_4] ,[SOH].[CCE_5] ,[SOH].[CCE_6] ,[SOH].[CCE_7] ,[SOH].[CCE_8] ,[SOH].[CCE_9] ,[SOH].[CCE_10] ,[SOH].[CCE_11] ,[SOH].[CCE_12] ,[SOH].[CCE_13] ,[SOH].[CCE_14] ,[SOH].[CCE_15] ,[SOH].[CCE_16] ,[SOH].[CCE_17] ,[SOH].[CCE_18] ,[SOH].[CCE_19] ,[SOH].[BPDNAM_0] ,[SOH].[BPDADDLIG_0] ,[SOH].[BPDADDLIG_1] ,[SOH].[BPDADDLIG_2] ,[SOH].[BPDPOSCOD_0] ,[SOH].[BPDCTY_0] ,[SOH].[BPDSAT_0] ,[SOH].[BPDCRY_0] ,[SOH].[BPDCRYNAM_0] from [PROD].[SORDER] as [SOH] where [SOH].[SOHNUM_0] = :order """, order=self.sohnum, ).first() def _copy_accounting_codes(self, result: records.Record): """ Fill in all the accounting codes """ self.header.die = result.DIE_0 self.header.die_1 = result.DIE_1 self.header.die_2 = result.DIE_2 self.header.die_3 = result.DIE_3 self.header.die_4 = result.DIE_4 self.header.die_5 = result.DIE_5 self.header.die_6 = result.DIE_6 self.header.die_7 = result.DIE_7 self.header.die_8 = result.DIE_8 self.header.die_9 = result.DIE_9 self.header.die_10 = result.DIE_10 self.header.die_11 = result.DIE_11 self.header.die_12 = result.DIE_12 self.header.die_13 = result.DIE_13 self.header.die_14 = result.DIE_14 self.header.die_15 = result.DIE_15 self.header.die_16 = result.DIE_16 self.header.die_17 = result.DIE_17 self.header.die_18 = result.DIE_18 self.header.die_19 = result.DIE_19 self.header.cce = result.CCE_0 self.header.cce_1 = result.CCE_1 self.header.cce_2 = result.CCE_2 self.header.cce_3 = result.CCE_3 self.header.cce_4 = result.CCE_4 self.header.cce_5 = result.CCE_5 self.header.cce_6 = result.CCE_6 self.header.cce_7 = result.CCE_7 self.header.cce_8 = result.CCE_8 self.header.cce_9 = result.CCE_9 self.header.cce_10 = result.CCE_10 self.header.cce_11 = result.CCE_11 self.header.cce_12 = result.CCE_12 self.header.cce_13 = result.CCE_13 self.header.cce_14 = result.CCE_14 self.header.cce_15 = result.CCE_15 self.header.cce_16 = result.CCE_16 self.header.cce_17 = result.CCE_17 self.header.cce_18 = result.CCE_18 self.header.cce_19 = result.CCE_19 def _fill_info_from_so(self): """ When we learn the SOHNUM, we can copy information from the sales order. """ result = self._get_so_from_x3() self.header.salfcy = result.SALFCY_0 self.header.stofcy = result.STOFCY_0 self.header.bpcord = result.BPCORD_0 self.header.bpaadd = result.BPAADD_0 self.header.cur = result.CUR_0 self.header.invdtaamt_2 = result.INVDTAAMT_2 self.header.invdtaamt_3 = result.INVDTAAMT_3 self.header.invdtaamt_4 = result.INVDTAAMT_4 self.header.invdtaamt_5 = result.INVDTAAMT_5 self.header.invdtaamt_6 = result.INVDTAAMT_6 self.header.invdtaamt_7 = result.INVDTAAMT_7 self.header.invdtaamt_8 = result.INVDTAAMT_8 self.header.invdtaamt_9 = result.INVDTAAMT_9 self._copy_accounting_codes(result) self.header.bpdnam = result.BPDNAM_0 self.header.bpdaddlig = result.BPDADDLIG_0 self.header.bpdaddlig_1 = result.BPDADDLIG_1 self.header.bpdaddlig_2 = result.BPDADDLIG_2 self.header.bpdposcod = result.BPDPOSCOD_0 self.header.bpdcty = result.BPDCTY_0 self.header.bpdsat = result.BPDSAT_0 self.header.bpdcry = result.BPDCRY_0 self.header.bpdcrynam = result.BPDCRYNAM_0 def output(self, import_file: typing.TextIO): """ Output entire order to import_file. """ output = functools.partial( yamamotoyama.x3_imports.output_with_file, import_file ) output(self.header.convert_to_strings()) for detail in self.details: output(detail.convert_to_strings()) for subdetail in detail.subdetails: output(subdetail.convert_to_strings()) if __name__ == "__main__": main()