shandex_edi_2024/edi_943.py

574 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Make a 943 in "X12" format without using the Javascript Middleware
developed by Tech4Biz.
A 943 is a replenishment shipping notice: we notify our 3PL that we have
sent them resupplies of our inventory.
"""
import datetime
import dataclasses
import typing
import pathlib
import pprint
import records
import yamamotoyama # type:ignore
SITE_MAPPING = {
'WNJ' : 'SOURCELOGISTICS',
'WCA' : 'SOURCELOGISTICS',
'WON' : 'SHANDEXTEST ' # TODO CHANGE TO SHANDEX, needs to be 15 characters
}
SHIPPING_CODE_MAPPING = {
'' : 'LT', #Default to LTL
'AIR' : 'AP', #Air package carrier
'DEL' : 'LT', #LTL and the default if mode is not entered
'GRN' : 'D', #Parcel post
'OUR' : 'SR', #Supplier truck
'P/U' : 'CE', #Pickup
'WCALL' : 'CE', #Pickup
}
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_SHANDEX = THIS_DIRECTORY / "outgoing"
# X12_SOURCELOGISTICS = THIS_DIRECTORY / "edi-testing" #test directories
# X12_SHANDEX = THIS_DIRECTORY / "edi-testing"
def main():
"""
Do it!
"""
with yamamotoyama.get_connection('test') as database:
shipments = list(get_shipments(database))
for shipment in shipments:
write_943(database, shipment)
class X12:
"""
X12 format parent class.
"""
@staticmethod
def line(items: typing.List[str]) -> str:
"""
Return X12 EDI line with * and ~
"""
def sanitize(thing: str) -> str:
for bad_character in ("*", "/", "&", "#", ","):
thing = thing.replace(bad_character, "")
return thing
return "*".join([sanitize(item) for item in items]) + "~"
def x12(self) -> str:
"""
X12 format.
"""
raise NotImplementedError
@staticmethod
def control_number() -> int:
"""
Next EDI serial number
"""
filepath = pathlib.Path(__file__).with_suffix(".remember")
encoding = "utf-8"
newline = "\n"
try:
with filepath.open(
"r", encoding=encoding, newline=newline
) as remember_file:
number = int(remember_file.readline().rstrip("\n"))
except (OSError, ValueError):
number = 0
number += 1
with filepath.open("w", encoding=encoding, newline=newline) as remember_file:
remember_file.write(f"{number}\n")
return number
def write_943(database: records.Connection, shipment: str):
"""
Write out a 943 to a file
"""
now = datetime.datetime.now()
site = str(get_shipment_destination(database, shipment))
datestamp_string = now.strftime("%Y-%m-%d-%H-%M-%S")
with (X12_SHANDEX / f"{site}-{shipment}-{datestamp_string}-943.edi").open(
"w", encoding="utf-8", newline="\n"
) as x12_file:
output = x12_file.write
is_header_output = False
header = None
detail_count = 0
package_count = 0
for shipment_database_row in get_shipment(database, shipment):
ship_to_site = shipment_database_row.BPCORD_0
mdl = shipment_database_row.MDL_0
x12_ship_to = SITE_MAPPING[ship_to_site]
x12_mdl = SHIPPING_CODE_MAPPING[mdl]
header = ShipmentHeader(shipment_database_row)
detail = ShipmentDetail(shipment_database_row)
if not is_header_output:
output(header.x12(x12_ship_to,x12_mdl))
is_header_output = True
output(detail.x12())
detail_count += 1
package_count += detail.qtystu_0
if header:
output(header.footer(package_count, detail_count))
with database.transaction() as _:
database.query(
"""
update [FY23TEST].[SDELIVERY]
set [XX4S_943RDY_0] = 1
where [SOHNUM_0] = :shipment
""",
shipment=shipment,
)
order = get_order_for_shipment(database, shipment)
database.query(
"""
update [FY23TEST].[SORDER]
set [XX4S_UDF2_0] = :sent_message
where [SOHNUM_0] = :order
""",
order=order,
sent_message=f"943 Sent {datetime.date.today().isoformat()}",
)
def get_shipment_destination(database: records.Connection, shipment: str) -> str:
"""
Get the destination site
"""
return (
database.query(
"""
select
[SDH].[BPCORD_0]
from [FY23TEST].[SDELIVERY] as [SDH]
where
[SDH].[SDHNUM_0] = :shipment
""",
shipment=shipment,
)
.first()
.BPCORD_0
)
def get_order_for_shipment(database: records.Connection, shipment: str) -> str:
"""
What is the order for this shipment?
"""
return (
database.query(
"""
select
[SDH].[SOHNUM_0]
from [FY23TEST].[SDELIVERY] as [SDH]
where
[SDH].[SDHNUM_0] = :shipment
""",
shipment=shipment,
)
.first()
.SOHNUM_0
)
def get_shipments(database: records.Connection) -> typing.Iterator[str]:
"""
What have we shipped? Fetch from X3.
"""
for shipment_result in database.query(
"""
select
[SDH].[SDHNUM_0]
from [FY23TEST].[SDELIVERY] as [SDH]
join [FY23TEST].[SORDER] as [SOH]
on [SOH].[SOHNUM_0] = [SDH].[SOHNUM_0]
where
(
[SDH].[STOFCY_0] in ('PMW','WCA','WNJ')
and [SDH].[BPCORD_0] in ('WON')
and nullif([SDH].[MDL_0],'') is not null
and nullif([SDH].[BPTNUM_0],'') is not null
)
and [SDH].[SHIDAT_0] >= {d'2023-10-09'}
and (
[SOH].[XX4S_UDF2_0] = ''
or [SDH].[XX4S_943RDY_0] = 2
)
and (
[SDH].[CFMFLG_0] = 2
or [SDH].[YLICPLATE_0] <> ''
or [SDH].[XX4S_943RDY_0] = 2
)
"""
):
yield shipment_result.SDHNUM_0
def get_shipment(
database: records.Connection, shipment: str
) -> typing.Iterator[records.Record]:
"""
Get shipment information from X3 database.
"""
yield from database.query(
"""
select
[SDHNUM_0]
,[STOFCY_0]
,[STOFCY]
,[SDHCAT_0]
,[SALFCY_0]
,[SALFCY]
,[SOHNUM_0]
,[CUSORDREF_0]
,[BPCORD_0]
,[BPDNAM_0]
,[BPDADDLIG_0]
,[BPDADDLIG_1]
,[BPDADDLIG_2]
,[CTY_0]
,[SAT_0]
,[POSCOD_0]
,[CRY_0]
,[CRYNAM_0]
,[BPCINV_0]
,[BPINAM_0]
,[BPTNUM_0]
,[BPTNAM_0]
,[MDL_0]
,[SCAC_0]
,[YLICPLATE_0]
,[DLVDAT_0]
,[CREDAT_0]
,[UPDDAT_0]
,[CCE_0]
,[SOPLIN_0]
,[ITMREF_0]
,[ITMDES1_0]
,[ZCASEUPC_0]
,[EANCOD_0]
,[STU_0]
,[QTYSTU_0]
,[LotQty]
,[LOT_0]
,[NETWEI_0]
,[GROWEI_0]
,[CFMFLG_0]
,[SHIDAT_0]
from [FY23TEST].[zyumiddleware_shipment] as [SDH]
where
[SDH].[SDHNUM_0] = :shipment
""",
shipment=shipment,
)
@dataclasses.dataclass
class ShipmentHeader(X12):
"""
Header
"""
sdhnum: str
stofcy: str
stofcy: str
sdhcat: str
salfcy: str
salfcy: str
sohnum: str
cusordref: str
bpcord: str
bpdnam: str
bpdaddlig: str
bpdaddlig_1: str
bpdaddlig_2: str
cty: str
sat: str
poscod: str
cry: str
crynam: str
bpcinv: str
bpinam: str
bptnum: str
bptnam: str
scac: str
ylicplate: str
dlvdat: datetime.date
credat: datetime.date
upddat: datetime.date
cce: str
short_control_number: str
interchange_control_number: str
header_segments: int
footer_segments: int
def __init__(self, database_row: records.Record):
self.sdhnum = database_row.SDHNUM_0
self.stofcy = database_row.STOFCY_0
self.sdhcat = database_row.SDHCAT_0
self.salfcy = database_row.SALFCY_0
self.salfcy = database_row.SALFCY_0
self.sohnum = database_row.SOHNUM_0
self.cusordref = database_row.CUSORDREF_0
self.bpcord = database_row.BPCORD_0
self.bpdnam = database_row.BPDNAM_0
self.bpdaddlig = database_row.BPDADDLIG_0
self.bpdaddlig_1 = database_row.BPDADDLIG_1
self.bpdaddlig_2 = database_row.BPDADDLIG_2
self.cty = database_row.CTY_0
self.sat = database_row.SAT_0
self.poscod = database_row.POSCOD_0
self.cry = database_row.CRY_0
self.crynam = database_row.CRYNAM_0
self.bpcinv = database_row.BPCINV_0
self.bpinam = database_row.BPINAM_0
self.bptnum = database_row.BPTNUM_0
self.bptnam = database_row.BPTNAM_0
self.scac = database_row.SCAC_0
self.ylicplate = database_row.YLICPLATE_0
self.dlvdat = database_row.DLVDAT_0
self.credat = database_row.CREDAT_0
self.upddat = database_row.UPDDAT_0
self.cce = database_row.CCE_0
self.shidat = database_row.SHIDAT_0
raw_control_number = self.control_number()
self.short_control_number = f"{raw_control_number:04}"
self.interchange_control_number = (
f"{raw_control_number:09}" # Format to 9 characters
)
self.now = datetime.date.today()
self.date = self.now.strftime("%y%m%d")
self.long_date = self.now.strftime("%Y%m%d")
self.time = self.now.strftime("%H%m")
self.header_segments = 10
self.footer_segments = 2
def x12(self, receiver_id, mdl) -> str:
return "".join(
[
f"ISA*00* *00* *ZZ*YAMAMOTOYAMA *ZZ*{receiver_id}*",
self.date,
"*",
self.time,
"*U*00401*",
self.interchange_control_number,
"*0*P*>~",
self.line(
[
"GS",
"OW",#should this be AR? Shandex okayed "OW"
"YAMAMOTOYAMA",
f"{receiver_id}",
self.long_date,
self.time,
self.interchange_control_number,
"X",
"004010",
]
),
self.line(
[
"ST",
"943",
self.short_control_number,
]
),
self.line(
[
"W06",
"N",
self.sohnum,
self.shidat.strftime("%Y%m%d"),
self.sdhnum,
"",
"",
"",
"",
"",
"",
"AS",
]
),
"N1*RE*Pomona Wholesale*1*008930755~",
"N4*Pomona*CA*91768*US~",
self.line(
[
"N1",
"ST", # Ship to
self.bpdnam,
"53", # Building
self.bpcord,
]
),
self.line(
[
"N3",
self.bpdaddlig,
]
),
self.line(
[
"N4",
self.cty,
self.sat,
self.poscod,
self.cry,
]
),
self.line(
[
"N9",
"PO",
self.cusordref,
]
),
self.line(
[
"G62",
"17",
self.dlvdat.strftime("%Y%m%d"),
]
),
self.line(
[
"W27",
f"{mdl}",
self.scac,
]
),
]
)
def footer(self, package_count: int, detail_count: int):
"""
End footer
"""
segment_count = self.header_segments + (detail_count * 3) + self.footer_segments
return "".join(
[
self.line(
[
"W03",
str(int(package_count)),
]
),
self.line(
[
"SE",
str(segment_count),
self.short_control_number,
]
),
self.line(
[
"GE",
"1",
self.interchange_control_number,
]
),
self.line(
[
"IEA",
"1",
self.interchange_control_number,
]
),
]
)
@dataclasses.dataclass
class ShipmentDetail(X12):
"""
Shipment detail.
"""
soplin_0: str
itmref_0: str
itmdes1_0: str
stu_0: str
qtystu_0: int
lot_0: str
stofcy_0: str
zcaseupc_0: str
eancod_0: str
gtin_or_upc_code: str
gtin_or_upc_marker: str
def __init__(self, db_record: records.Record):
self.soplin_0 = str(db_record.SOPLIN_0)
self.itmref_0 = db_record.ITMREF_0
self.itmdes1_0 = db_record.ITMDES1_0
self.stu_0 = db_record.STU_0
self.qtystu_0 = db_record.LotQty
self.lot_0 = db_record.LOT_0
self.stofcy_0 = db_record.STOFCY_0
if self.stofcy_0 == 'WON' and self.stu_0 == 'CS': #Shadex requires CA for cases
self.stu_0 = 'CA'
self.zcaseupc_0 = db_record.ZCASEUPC_0
self.eancod_0 = db_record.EANCOD_0
self.gtin_or_upc_marker = 'UK'
if self.stu_0 == 'CS':
self.gtin_or_upc_code = self.zcaseupc_0
else:
self.gtin_or_upc_code = self.eancod_0
def x12(self) -> str:
"""
Format in X12
"""
return "".join(
[
self.line(
[
"W04",
str(int(self.qtystu_0)),
self.stu_0,
"",
"VN", # Vendor's (Seller's) Item Number
self.itmref_0,
"LT", # Lot number
self.lot_0,
"",
"",
"",
"",
"",
"",
self.gtin_or_upc_marker,
self.gtin_or_upc_code,#W04-15
]
),
self.line(
[
"G69",
self.itmdes1_0,
]
),
self.line(
[
"N9",
"LI",
self.soplin_0,
]
),
]
)
if __name__ == "__main__":
main()