#!/usr/bin/env python3 """ Make a 943 in "X12" format without using the Javascript Middleware developed by Tech4Biz. A 943 is a replenishment shipping notice: we notify our 3PL that we have sent them resupplies of our inventory. """ import datetime import dataclasses import typing import pathlib import pprint import records import yamamotoyama # type:ignore SITE_MAPPING = { 'WNJ' : 'SOURCELOGISTICS', 'WCA' : 'SOURCELOGISTICS', 'WON' : 'SHANDEX ' #Testing value is SHANDEXTEST with spaces to 15 characters } SHIPPING_CODE_MAPPING = { '' : 'LT', #Default to LTL 'AIR' : 'AP', #Air package carrier 'DEL' : 'LT', #LTL and the default if mode is not entered 'GRN' : 'D', #Parcel post 'OUR' : 'SR', #Supplier truck 'P/U' : 'CE', #Pickup 'WCALL' : 'CE', #Pickup } THIS_DIRECTORY = pathlib.Path(__file__).parent X12_SHANDEX = THIS_DIRECTORY / "outgoing" def main(): """ Do it! """ with yamamotoyama.get_connection() as database: shipments = list(get_shipments(database)) for shipment in shipments: write_943(database, shipment) class X12: """ X12 format parent class. """ @staticmethod def line(items: typing.List[str]) -> str: """ Return X12 EDI line with * and ~ """ def sanitize(thing: str) -> str: for bad_character in ("*", "/", "&", "#", ","): thing = thing.replace(bad_character, "") return thing return "*".join([sanitize(item) for item in items]) + "~" def x12(self) -> str: """ X12 format. """ raise NotImplementedError @staticmethod def control_number() -> int: """ Next EDI serial number """ filepath = pathlib.Path(__file__).with_suffix(".remember") encoding = "utf-8" newline = "\n" try: with filepath.open( "r", encoding=encoding, newline=newline ) as remember_file: number = int(remember_file.readline().rstrip("\n")) except (OSError, ValueError): number = 0 number += 1 with filepath.open("w", encoding=encoding, newline=newline) as remember_file: remember_file.write(f"{number}\n") return number def write_943(database: records.Connection, shipment: str): """ Write out a 943 to a file """ now = datetime.datetime.now() site = str(get_shipment_destination(database, shipment)) datestamp_string = now.strftime("%Y-%m-%d-%H-%M-%S") #2024-09-25 never sent multiple 943s, mark them as sent before processing with database.transaction() as _: database.query( """ update [PROD].[SDELIVERY] set [XX4S_943RDY_0] = 1 where [SOHNUM_0] = :shipment """, shipment=shipment, ) order = get_order_for_shipment(database, shipment) database.query( """ update [PROD].[SORDER] set [XX4S_UDF2_0] = :sent_message where [SOHNUM_0] = :order """, order=order, sent_message=f"943 Sent {datetime.date.today().isoformat()}", ) with (X12_SHANDEX / f"{site}-{shipment}-{datestamp_string}-943.edi").open( "w", encoding="utf-8", newline="\n" ) as x12_file: output = x12_file.write is_header_output = False header = None detail_count = 0 package_count = 0 for shipment_database_row in get_shipment(database, shipment): ship_to_site = shipment_database_row.BPCORD_0 mdl = shipment_database_row.MDL_0 x12_ship_to = SITE_MAPPING[ship_to_site] x12_mdl = SHIPPING_CODE_MAPPING[mdl] header = ShipmentHeader(shipment_database_row) detail = ShipmentDetail(shipment_database_row) if not is_header_output: output(header.x12(x12_ship_to,x12_mdl)) is_header_output = True output(detail.x12()) detail_count += 1 package_count += detail.qtystu_0 if header: output(header.footer(package_count, detail_count)) def get_shipment_destination(database: records.Connection, shipment: str) -> str: """ Get the destination site """ return ( database.query( """ select [SDH].[BPCORD_0] from [PROD].[SDELIVERY] as [SDH] where [SDH].[SDHNUM_0] = :shipment """, shipment=shipment, ) .first() .BPCORD_0 ) def get_order_for_shipment(database: records.Connection, shipment: str) -> str: """ What is the order for this shipment? """ return ( database.query( """ select [SDH].[SOHNUM_0] from [PROD].[SDELIVERY] as [SDH] where [SDH].[SDHNUM_0] = :shipment """, shipment=shipment, ) .first() .SOHNUM_0 ) def get_shipments(database: records.Connection) -> typing.Iterator[str]: """ What have we shipped? Fetch from X3. """ for shipment_result in database.query( """ select [SDH].[SDHNUM_0] from [PROD].[SDELIVERY] as [SDH] join [PROD].[SORDER] as [SOH] on [SOH].[SOHNUM_0] = [SDH].[SOHNUM_0] where ( [SDH].[STOFCY_0] in ('PMW','WCA','WNJ') and [SDH].[BPCORD_0] in ('WON') and nullif([SDH].[MDL_0],'') is not null and nullif([SDH].[BPTNUM_0],'') is not null ) and [SDH].[SHIDAT_0] >= {d'2023-10-09'} and ( [SOH].[XX4S_UDF2_0] not like '943%' or [SDH].[XX4S_943RDY_0] = 2 ) and ( [SDH].[CFMFLG_0] = 2 or [SDH].[YLICPLATE_0] <> '' or [SDH].[XX4S_943RDY_0] = 2 ) """ ): yield shipment_result.SDHNUM_0 def get_shipment( database: records.Connection, shipment: str ) -> typing.Iterator[records.Record]: """ Get shipment information from X3 database. """ yield from database.query( """ select [SDHNUM_0] ,[STOFCY_0] ,[STOFCY] ,[SDHCAT_0] ,[SALFCY_0] ,[SALFCY] ,[SOHNUM_0] ,[CUSORDREF_0] ,[BPCORD_0] ,[BPDNAM_0] ,[BPDADDLIG_0] ,[BPDADDLIG_1] ,[BPDADDLIG_2] ,[CTY_0] ,[SAT_0] ,[POSCOD_0] ,[CRY_0] ,[CRYNAM_0] ,[BPCINV_0] ,[BPINAM_0] ,[BPTNUM_0] ,[BPTNAM_0] ,[MDL_0] ,[SCAC_0] ,[YLICPLATE_0] ,[DLVDAT_0] ,[CREDAT_0] ,[UPDDAT_0] ,[CCE_0] ,[SOPLIN_0] ,[ITMREF_0] ,[ITMDES1_0] ,[ZCASEUPC_0] ,[EANCOD_0] ,[STU_0] ,[QTYSTU_0] ,[LotQty] ,[LOT_0] ,[NETWEI_0] ,[GROWEI_0] ,[CFMFLG_0] ,[SHIDAT_0] from [PROD].[zyumiddleware_shipment_shandex] as [SDH] where [SDH].[SDHNUM_0] = :shipment """, shipment=shipment, ) @dataclasses.dataclass class ShipmentHeader(X12): """ Header """ sdhnum: str stofcy: str stofcy: str sdhcat: str salfcy: str salfcy: str sohnum: str cusordref: str bpcord: str bpdnam: str bpdaddlig: str bpdaddlig_1: str bpdaddlig_2: str cty: str sat: str poscod: str cry: str crynam: str bpcinv: str bpinam: str bptnum: str bptnam: str scac: str ylicplate: str dlvdat: datetime.date credat: datetime.date upddat: datetime.date cce: str short_control_number: str interchange_control_number: str header_segments: int footer_segments: int def __init__(self, database_row: records.Record): self.sdhnum = database_row.SDHNUM_0 self.stofcy = database_row.STOFCY_0 self.sdhcat = database_row.SDHCAT_0 self.salfcy = database_row.SALFCY_0 self.salfcy = database_row.SALFCY_0 self.sohnum = database_row.SOHNUM_0 self.cusordref = database_row.CUSORDREF_0 self.bpcord = database_row.BPCORD_0 self.bpdnam = database_row.BPDNAM_0 self.bpdaddlig = database_row.BPDADDLIG_0 self.bpdaddlig_1 = database_row.BPDADDLIG_1 self.bpdaddlig_2 = database_row.BPDADDLIG_2 self.cty = database_row.CTY_0 self.sat = database_row.SAT_0 self.poscod = database_row.POSCOD_0 self.cry = database_row.CRY_0 self.crynam = database_row.CRYNAM_0 self.bpcinv = database_row.BPCINV_0 self.bpinam = database_row.BPINAM_0 self.bptnum = database_row.BPTNUM_0 self.bptnam = database_row.BPTNAM_0 self.scac = database_row.SCAC_0 self.ylicplate = database_row.YLICPLATE_0 self.dlvdat = database_row.DLVDAT_0 self.credat = database_row.CREDAT_0 self.upddat = database_row.UPDDAT_0 self.cce = database_row.CCE_0 self.shidat = database_row.SHIDAT_0 raw_control_number = self.control_number() self.short_control_number = f"{raw_control_number:04}" self.interchange_control_number = ( f"{raw_control_number:09}" # Format to 9 characters ) self.now = datetime.date.today() self.date = self.now.strftime("%y%m%d") self.long_date = self.now.strftime("%Y%m%d") self.time = self.now.strftime("%H%m") self.header_segments = 10 self.footer_segments = 2 def x12(self, receiver_id, mdl) -> str: return "".join( [ f"ISA*00* *00* *ZZ*YAMAMOTOYAMA *ZZ*{receiver_id}*", self.date, "*", self.time, "*U*00401*", self.interchange_control_number, "*0*P*>~", self.line( [ "GS", "OW",#should this be AR? Shandex okayed "OW" "YAMAMOTOYAMA", f"{receiver_id}", self.long_date, self.time, self.interchange_control_number, "X", "004010", ] ), self.line( [ "ST", "943", self.short_control_number, ] ), self.line( [ "W06", "N", self.sohnum, self.shidat.strftime("%Y%m%d"), self.sdhnum, "", "", "", "", "", "", "AS", ] ), "N1*RE*Pomona Wholesale*1*008930755~", "N4*Pomona*CA*91768*US~", self.line( [ "N1", "ST", # Ship to self.bpdnam, "53", # Building self.bpcord, ] ), self.line( [ "N3", self.bpdaddlig, ] ), self.line( [ "N4", self.cty, self.sat, self.poscod, self.cry, ] ), self.line( [ "N9", "PO", self.cusordref, ] ), self.line( [ "G62", "17", self.dlvdat.strftime("%Y%m%d"), ] ), self.line( [ "W27", f"{mdl}", self.scac, ] ), ] ) def footer(self, package_count: int, detail_count: int): """ End footer """ segment_count = self.header_segments + (detail_count * 3) + self.footer_segments return "".join( [ self.line( [ "W03", str(int(package_count)), ] ), self.line( [ "SE", str(segment_count), self.short_control_number, ] ), self.line( [ "GE", "1", self.interchange_control_number, ] ), self.line( [ "IEA", "1", self.interchange_control_number, ] ), ] ) @dataclasses.dataclass class ShipmentDetail(X12): """ Shipment detail. """ soplin_0: str itmref_0: str itmdes1_0: str stu_0: str qtystu_0: int lot_0: str stofcy_0: str zcaseupc_0: str eancod_0: str gtin_or_upc_code: str gtin_or_upc_marker: str def __init__(self, db_record: records.Record): self.soplin_0 = str(db_record.SOPLIN_0) self.itmref_0 = db_record.ITMREF_0 self.itmdes1_0 = db_record.ITMDES1_0 self.stu_0 = db_record.STU_0 self.qtystu_0 = db_record.LotQty self.lot_0 = db_record.LOT_0 self.stofcy_0 = db_record.STOFCY_0 if self.stofcy_0 == 'WON' and self.stu_0 == 'CS': #Shadex requires CA for cases self.stu_0 = 'CA' self.zcaseupc_0 = db_record.ZCASEUPC_0 self.eancod_0 = db_record.EANCOD_0 self.gtin_or_upc_marker = 'UK' if self.stu_0 == 'CS': self.gtin_or_upc_code = self.zcaseupc_0 else: self.gtin_or_upc_code = self.eancod_0 def x12(self) -> str: """ Format in X12 """ return "".join( [ self.line( [ "W04", str(int(self.qtystu_0)), self.stu_0, "", "VN", # Vendor's (Seller's) Item Number self.itmref_0, "LT", # Lot number self.lot_0, "", "", "", "", "", "", self.gtin_or_upc_marker, self.gtin_or_upc_code.replace(' ',''),#W04-15 ] ), self.line( [ "G69", self.itmdes1_0, ] ), self.line( [ "N9", "LI", self.soplin_0, ] ), ] ) if __name__ == "__main__": main()