Initial commit.

master
bleeson 2024-03-13 14:53:19 -07:00
parent a2a1130383
commit 3193fcc3fb
12 changed files with 3847 additions and 0 deletions

716
edi_867.py Normal file
View File

@ -0,0 +1,716 @@
#!/usr/bin/env python3
"""
Consume a 867 file from Shandex, and translate into a Sage X3
readable file-ZSHIP867.
"""
# pylint: disable=too-many-instance-attributes
import dataclasses
import datetime
import decimal
import functools
import pathlib
import re
import shutil
import typing
import pprint
import records # type: ignore
import yamamotoyama # type: ignore
import yamamotoyama.x3_imports # type: ignore
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "incoming"
IMPORTS_DIRECTORY = THIS_DIRECTORY / "x3_imports"
SOURCE_867_FILENAME_RE = re.compile(
r"\A 867_YAMAMOTOYAMA_ .* [.]edi \Z", re.X | re.M | re.S
)
UOM_MAPPING = {
"CA" : "CS",
"EC" : "EA"
}
def main():
"""
Do it!
"""
for edi_filename in X12_DIRECTORY.iterdir():
if SOURCE_867_FILENAME_RE.match(edi_filename.name):
process_file(edi_filename)
#TODO respond with 997?
#shutil.move(edi_filename, X12_DIRECTORY / "archive" / edi_filename.name)#TODO uncomment
combine_zship867s()
def combine_zship867s():
"""
Collect all ZSHIP867 imports into a single file for easy import.
"""
archive_directory = IMPORTS_DIRECTORY / "archive"
archive_directory.mkdir(exist_ok=True)
with (IMPORTS_DIRECTORY / "ZSHIP867.dat").open(
"w", encoding="utf-8", newline="\n"
) as combined_import_file:
for individual_import_filename in IMPORTS_DIRECTORY.glob(
"ZSHIP867_*.dat"
):
with individual_import_filename.open(
"r", encoding="utf-8", newline="\n"
) as individual_import_file:
for line in individual_import_file:
combined_import_file.write(line)
shutil.move(
individual_import_filename,
archive_directory / individual_import_filename.name,
)
def tokens_from_edi_file(
edi_filename: pathlib.Path,
) -> typing.Iterator[typing.List[str]]:
"""
Read tokens from EDI file
"""
with edi_filename.open(encoding="utf-8", newline="") as edi_file:
for record in edi_file.read().split("~"):
fields = record.split("*")
if fields[0] in {
"ISA",
"ST",
"BPT",
"N1",
"PTD",
"REF",
}:
continue
yield fields
def get_product_from_gtin(gtin):
with yamamotoyama.get_connection() as database:
result = database.query(
"""
select
ITM.ITMREF_0,
ITM.ITMDES1_0
from PROD.ITMMASTER ITM
join PROD.ITMFACILIT ITF
on ITM.ITMREF_0 = ITF.ITMREF_0
and ITF.STOFCY_0 = 'WON'
where
ZCASEUPC_0 = :zcaseupc
""",
zcaseupc=gtin,
).first()
return result
def process_file(edi_filename: pathlib.Path):
"""
Convert a specific EDI file into an import file.
"""
warehouse_shipment = WarehouseShipment()
for fields in tokens_from_edi_file(edi_filename):
if fields[0] == "GS":
control_number = fields[5]
warehouse_shipment.header.ylicplate = f'SHANDEX-{control_number}'
if fields[0] == "DTM":
date_field = fields[2]
warehouse_shipment.header.shidat = datetime.datetime.strptime(
date_field, "%Y%m%d")
if fields[0] == "QTY":
#QTY*39*10*CA
_, _, qty_str, uom = fields[:4]
#warehouse_shipment.sohnum = sohnum
if fields[0] == "LIN":
#LIN**VN*10077652082224*LT*09032026C#
_, _, _, gtin, _, lot = fields[:6]
if fields[0] == "UIT":
#UIT*CA*0.00
_, _, price = fields[:3]
sau = UOM_MAPPING[uom]
lookup_values = get_product_from_gtin(gtin)
itmref = lookup_values['ITMREF_0']
itmdes = lookup_values['ITMDES1_0']
subdetail = WarehouseShipmentSubDetail(
qtypcu=-1 * int(qty_str),
lot=lot,
)
warehouse_shipment.append(
WarehouseShipmentDetail(
#sohnum=warehouse_shipment.sohnum,
itmref=itmref,
itmdes=itmdes,
qty=int(qty_str),
netpri=price,
sau=sau
),
subdetail,
)
time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
with yamamotoyama.x3_imports.open_import_file(
IMPORTS_DIRECTORY / f"ZSHIP867_{control_number}_{time_stamp}.dat"
) as import_file:
warehouse_shipment.output(import_file)
@dataclasses.dataclass
class WarehouseShipmentSubDetail:
"""
Information that goes onto a shipment sub-detail line, taken from ZSHIP867 template.
"""
sta: str = "A"
pcu: str = ""
qtypcu: int = 0
loc: str = ""
lot: str = ""
sernum: str = ""
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to strings for X3 import writing.
"""
return yamamotoyama.x3_imports.convert_to_strings(
[
"S",
self.sta,
self.pcu,
self.qtypcu,
self.loc,
self.lot,
self.sernum,
]
)
@dataclasses.dataclass
class WarehouseShipmentDetail:
"""
Information that goes on a shipment detail line, taken from ZSHIP867 template.
"""
sohnum: str = ""
soplin: int = 0
itmref: str = ""
itmdes: str = ""
sau: str = ""
qty: int = 0
netpri: decimal.Decimal = decimal.Decimal()
star91: str = ""
star92: str = ""
subdetails: typing.List[WarehouseShipmentSubDetail] = dataclasses.field(
default_factory=list
)
def append(self, subdetail: WarehouseShipmentSubDetail):
"""
Add subdetail
"""
subdetail.pcu = self.sau
self.subdetails.append(subdetail)
def check_subdetail_qty(self):
"""
Check for shortages by totaling up subdetail quantities.
"""
total_cases = 0
for subdetail in self.subdetails:
total_cases += subdetail.qtypcu
return abs(total_cases)
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to strings for X3 import writing.
"""
self.qty = self.check_subdetail_qty()
return yamamotoyama.x3_imports.convert_to_strings(
[
"L",
self.sohnum,
self.soplin,
self.itmref,
self.itmdes,
self.sau,
self.qty,
self.netpri,
self.star91,
self.star92,
]
)
def __eq__(self, item: typing.Any) -> bool:
"""
Test for equality
"""
if isinstance(item, str):
return self.itmref == item
if isinstance(item, WarehouseShipmentDetail):
return self.itmref == item.itmref
return False
def fill(self):
"""
Set soplin & itmdes from itmref & sohnum
"""
def get() -> records.Record:
with yamamotoyama.get_connection() as database:
how_many = (
database.query(
"""
select
count(*) as [how_many]
from [PROD].[SORDERP] as [SOP]
where
[SOP].[SOHNUM_0] = :sohnum
and [SOP].[ITMREF_0] = :itmref
""",
sohnum=self.sohnum,
itmref=self.itmref,
)
.first()
.how_many
)
if how_many == 1:
return database.query(
"""
select top 1
[SOP].[SOPLIN_0]
,[SOP].[ITMDES1_0]
,[SOP].[SAU_0]
from [PROD].[SORDERP] as [SOP]
where
[SOP].[SOHNUM_0] = :sohnum
and [SOP].[ITMREF_0] = :itmref
order by
[SOP].[SOPLIN_0]
""",
sohnum=self.sohnum,
itmref=self.itmref,
).first()
else:
emailtext = str(self.sohnum +' '+str(self.itmref))
msg.attach(MIMEText(emailtext, 'plain'))
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
smtp.send_message(msg)
raise NotImplementedError # TODO
result = get()
self.soplin = result.SOPLIN_0
self.itmdes = result.ITMDES1_0
self.sau = result.SAU_0
@dataclasses.dataclass
class WarehouseShipmentHeader:
"""
Information that goes on a shipment header, taken from ZSHIP867 template.
"""
salfcy: str = "STC"
stofcy: str = "WON"
sdhnum: str = ""
bpcord: str = "SHAN0001"
bpaadd: str = "BL001"
cur: str = "CAD"
shidat: datetime.date = datetime.date(1753, 1, 1)
cfmflg: int = 1
pjt: str = ""
bptnum: str = ""
ylicplate: str = "SHANDEX"
invdtaamt_2: decimal.Decimal = decimal.Decimal()
invdtaamt_3: decimal.Decimal = decimal.Decimal()
invdtaamt_4: decimal.Decimal = decimal.Decimal()
invdtaamt_5: decimal.Decimal = decimal.Decimal()
invdtaamt_6: decimal.Decimal = decimal.Decimal()
invdtaamt_7: decimal.Decimal = decimal.Decimal()
invdtaamt_8: decimal.Decimal = decimal.Decimal()
invdtaamt_9: decimal.Decimal = decimal.Decimal()
die: str = "" #TODO consider adding dimension codes?
die_1: str = ""
die_2: str = ""
die_3: str = ""
die_4: str = ""
die_5: str = ""
die_6: str = ""
die_7: str = ""
die_8: str = ""
die_9: str = ""
die_10: str = ""
die_11: str = ""
die_12: str = ""
die_13: str = ""
die_14: str = ""
die_15: str = ""
die_16: str = ""
die_17: str = ""
die_18: str = ""
die_19: str = ""
cce: str = ""
cce_1: str = ""
cce_2: str = ""
cce_3: str = ""
cce_4: str = ""
cce_5: str = ""
cce_6: str = ""
cce_7: str = ""
cce_8: str = ""
cce_9: str = ""
cce_10: str = ""
cce_11: str = ""
cce_12: str = ""
cce_13: str = ""
cce_14: str = ""
cce_15: str = ""
cce_16: str = ""
cce_17: str = ""
cce_18: str = ""
cce_19: str = ""
bpdnam: str = "Shandex Group"
bpdaddlig: str = ""
bpdaddlig_1: str = ""
bpdaddlig_2: str = ""
bpdposcod: str = ""
bpdcty: str = ""
bpdsat: str = ""
bpdcry: str = "CA"
bpdcrynam: str = "Canada"
sdhtyp: str = "SDN"
growei: decimal.Decimal = decimal.Decimal()#TODO consider gross weight?
pacnbr: int = 0
star71: str = ""
star72: str = ""
star81: str = ""
star82: str = ""
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to X3 import line
"""
return yamamotoyama.x3_imports.convert_to_strings(
[
"H",
self.salfcy,
self.stofcy,
self.sdhnum,
self.bpcord,
self.bpaadd,
self.cur,
self.shidat.strftime("%Y%m%d"),
self.cfmflg,
self.pjt,
self.bptnum,
self.ylicplate,
self.invdtaamt_2,
self.invdtaamt_3,
self.invdtaamt_4,
self.invdtaamt_5,
self.invdtaamt_6,
self.invdtaamt_7,
self.invdtaamt_8,
self.invdtaamt_9,
self.die,
self.die_1,
self.die_2,
self.die_3,
self.die_4,
self.die_5,
self.die_6,
self.die_7,
self.die_8,
self.die_9,
self.die_10,
self.die_11,
self.die_12,
self.die_13,
self.die_14,
self.die_15,
self.die_16,
self.die_17,
self.die_18,
self.die_19,
self.cce,
self.cce_1,
self.cce_2,
self.cce_3,
self.cce_4,
self.cce_5,
self.cce_6,
self.cce_7,
self.cce_8,
self.cce_9,
self.cce_10,
self.cce_11,
self.cce_12,
self.cce_13,
self.cce_14,
self.cce_15,
self.cce_16,
self.cce_17,
self.cce_18,
self.cce_19,
self.bpdnam,
self.bpdaddlig,
self.bpdaddlig_1,
self.bpdaddlig_2,
self.bpdposcod,
self.bpdcty,
self.bpdsat,
self.bpdcry,
self.bpdcrynam,
self.sdhtyp,
self.growei,
self.pacnbr,
self.star71,
self.star72,
self.star81,
self.star82,
]
)
class WarehouseShipmentDetailList:
"""
List of shipment details
"""
_details: typing.List[WarehouseShipmentDetail]
_item_set: typing.Set[str]
def __init__(self):
self._details = []
self._item_set = set()
def append(
self,
shipment_detail: WarehouseShipmentDetail,
shipment_subdetail: WarehouseShipmentSubDetail,
):
"""
Append
"""
itmref = shipment_detail.itmref
# if itmref in self._item_set:
# for detail in self._details:
# if detail == itmref:
# detail.subdetails.append(shipment_subdetail)
# return
self._item_set.add(itmref)
#shipment_detail.fill()
shipment_detail.append(shipment_subdetail)
self._details.append(shipment_detail)
def __iter__(self):
return iter(self._details)
class WarehouseShipment:
"""
Warehosue shipment, both header & details
"""
header: WarehouseShipmentHeader
details: WarehouseShipmentDetailList
_sohnum: str
def __init__(self):
self.header = WarehouseShipmentHeader()
self._sohnum = ""
self.details = WarehouseShipmentDetailList()
def append(
self,
shipment_detail: WarehouseShipmentDetail,
shipment_subdetail: WarehouseShipmentSubDetail,
):
"""
Add detail information.
"""
self.details.append(shipment_detail, shipment_subdetail)
@property
def sohnum(self):
"""
Sales order number
"""
return self._sohnum
@sohnum.setter
def sohnum(self, value: str):
if self._sohnum != value:
self._sohnum = value
if value:
self._fill_info_from_so()
def _get_so_from_x3(self) -> records.Record:
"""
Fetch sales order from X3 database.
"""
with yamamotoyama.get_connection() as db_connection:
return db_connection.query(
"""
select
[SOH].[SALFCY_0]
,[SOH].[STOFCY_0]
,[SOH].[BPCORD_0]
,[SOH].[BPAADD_0]
,[SOH].[CUR_0]
,[SOH].[INVDTAAMT_2]
,[SOH].[INVDTAAMT_3]
,[SOH].[INVDTAAMT_4]
,[SOH].[INVDTAAMT_5]
,[SOH].[INVDTAAMT_6]
,[SOH].[INVDTAAMT_7]
,[SOH].[INVDTAAMT_8]
,[SOH].[INVDTAAMT_9]
,[SOH].[DIE_0]
,[SOH].[DIE_1]
,[SOH].[DIE_2]
,[SOH].[DIE_3]
,[SOH].[DIE_4]
,[SOH].[DIE_5]
,[SOH].[DIE_6]
,[SOH].[DIE_7]
,[SOH].[DIE_8]
,[SOH].[DIE_9]
,[SOH].[DIE_10]
,[SOH].[DIE_11]
,[SOH].[DIE_12]
,[SOH].[DIE_13]
,[SOH].[DIE_14]
,[SOH].[DIE_15]
,[SOH].[DIE_16]
,[SOH].[DIE_17]
,[SOH].[DIE_18]
,[SOH].[DIE_19]
,[SOH].[CCE_0]
,[SOH].[CCE_1]
,[SOH].[CCE_2]
,[SOH].[CCE_3]
,[SOH].[CCE_4]
,[SOH].[CCE_5]
,[SOH].[CCE_6]
,[SOH].[CCE_7]
,[SOH].[CCE_8]
,[SOH].[CCE_9]
,[SOH].[CCE_10]
,[SOH].[CCE_11]
,[SOH].[CCE_12]
,[SOH].[CCE_13]
,[SOH].[CCE_14]
,[SOH].[CCE_15]
,[SOH].[CCE_16]
,[SOH].[CCE_17]
,[SOH].[CCE_18]
,[SOH].[CCE_19]
,[SOH].[BPDNAM_0]
,[SOH].[BPDADDLIG_0]
,[SOH].[BPDADDLIG_1]
,[SOH].[BPDADDLIG_2]
,[SOH].[BPDPOSCOD_0]
,[SOH].[BPDCTY_0]
,[SOH].[BPDSAT_0]
,[SOH].[BPDCRY_0]
,[SOH].[BPDCRYNAM_0]
from [PROD].[SORDER] as [SOH]
where
[SOH].[SOHNUM_0] = :order
""",
order=self.sohnum,
).first()
def _copy_accounting_codes(self, result: records.Record):
"""
Fill in all the accounting codes
"""
self.header.die = result.DIE_0
self.header.die_1 = result.DIE_1
self.header.die_2 = result.DIE_2
self.header.die_3 = result.DIE_3
self.header.die_4 = result.DIE_4
self.header.die_5 = result.DIE_5
self.header.die_6 = result.DIE_6
self.header.die_7 = result.DIE_7
self.header.die_8 = result.DIE_8
self.header.die_9 = result.DIE_9
self.header.die_10 = result.DIE_10
self.header.die_11 = result.DIE_11
self.header.die_12 = result.DIE_12
self.header.die_13 = result.DIE_13
self.header.die_14 = result.DIE_14
self.header.die_15 = result.DIE_15
self.header.die_16 = result.DIE_16
self.header.die_17 = result.DIE_17
self.header.die_18 = result.DIE_18
self.header.die_19 = result.DIE_19
self.header.cce = result.CCE_0
self.header.cce_1 = result.CCE_1
self.header.cce_2 = result.CCE_2
self.header.cce_3 = result.CCE_3
self.header.cce_4 = result.CCE_4
self.header.cce_5 = result.CCE_5
self.header.cce_6 = result.CCE_6
self.header.cce_7 = result.CCE_7
self.header.cce_8 = result.CCE_8
self.header.cce_9 = result.CCE_9
self.header.cce_10 = result.CCE_10
self.header.cce_11 = result.CCE_11
self.header.cce_12 = result.CCE_12
self.header.cce_13 = result.CCE_13
self.header.cce_14 = result.CCE_14
self.header.cce_15 = result.CCE_15
self.header.cce_16 = result.CCE_16
self.header.cce_17 = result.CCE_17
self.header.cce_18 = result.CCE_18
self.header.cce_19 = result.CCE_19
def _fill_info_from_so(self):
"""
When we learn the SOHNUM, we can copy information from the sales order.
"""
result = self._get_so_from_x3()
self.header.salfcy = result.SALFCY_0
self.header.stofcy = result.STOFCY_0
self.header.bpcord = result.BPCORD_0
self.header.bpaadd = result.BPAADD_0
self.header.cur = result.CUR_0
self.header.invdtaamt_2 = result.INVDTAAMT_2
self.header.invdtaamt_3 = result.INVDTAAMT_3
self.header.invdtaamt_4 = result.INVDTAAMT_4
self.header.invdtaamt_5 = result.INVDTAAMT_5
self.header.invdtaamt_6 = result.INVDTAAMT_6
self.header.invdtaamt_7 = result.INVDTAAMT_7
self.header.invdtaamt_8 = result.INVDTAAMT_8
self.header.invdtaamt_9 = result.INVDTAAMT_9
self._copy_accounting_codes(result)
self.header.bpdnam = result.BPDNAM_0
self.header.bpdaddlig = result.BPDADDLIG_0
self.header.bpdaddlig_1 = result.BPDADDLIG_1
self.header.bpdaddlig_2 = result.BPDADDLIG_2
self.header.bpdposcod = result.BPDPOSCOD_0
self.header.bpdcty = result.BPDCTY_0
self.header.bpdsat = result.BPDSAT_0
self.header.bpdcry = result.BPDCRY_0
self.header.bpdcrynam = result.BPDCRYNAM_0
def output(self, import_file: typing.TextIO):
"""
Output entire order to import_file.
"""
output = functools.partial(
yamamotoyama.x3_imports.output_with_file, import_file
)
output(self.header.convert_to_strings())
for detail in self.details:
output(detail.convert_to_strings())
for subdetail in detail.subdetails:
output(subdetail.convert_to_strings())
if __name__ == "__main__":
main()

531
edi_943.py Normal file
View File

@ -0,0 +1,531 @@
#!/usr/bin/env python3
"""
Make a 943 in "X12" format without using the Javascript Middleware
developed by Tech4Biz.
A 943 is a replenishment shipping notice: we notify our 3PL that we have
sent them resupplies of our inventory.
"""
import datetime
import dataclasses
import typing
import pathlib
import pprint
import records
import yamamotoyama # type:ignore
from edi_940 import X12_DIRECTORY, X12
SITE_MAPPING = {
'WNJ' : 'SOURCELOGISTICS',
'WCA' : 'SOURCELOGISTICS',
'WON' : 'SHANDEXTEST ' # TODO CHANGE TO SHANDEX, needs to be 15 characters
}
SHIPPING_CODE_MAPPING = {
'' : 'LT', #Default to LTL
'AIR' : 'AP', #Air package carrier
'DEL' : 'LT', #LTL and the default if mode is not entered
'GRN' : 'D', #Parcel post
'OUR' : 'SR', #Supplier truck
'P/U' : 'CE', #Pickup
'WCALL' : 'CE', #Pickup
}
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_SOURCELOGISTICS = THIS_DIRECTORY / "outgoing"
X12_SHANDEX = THIS_DIRECTORY / "outgoing"
# X12_SOURCELOGISTICS = THIS_DIRECTORY / "edi-testing" #test directories
# X12_SHANDEX = THIS_DIRECTORY / "edi-testing"
def main():
"""
Do it!
"""
with yamamotoyama.get_connection() as database:
shipments = list(get_shipments(database))
for shipment in shipments:
write_943(database, shipment)
def write_943(database: records.Connection, shipment: str):
"""
Write out a 943 to a file
"""
now = datetime.datetime.now()
site = str(get_shipment_destination(database, shipment))
datestamp_string = now.strftime("%Y-%m-%d-%H-%M-%S")
with (X12_SHANDEX / f"{site}-{shipment}-{datestamp_string}-943.edi").open(
"w", encoding="utf-8", newline="\n"
) as x12_file:
output = x12_file.write
is_header_output = False
header = None
detail_count = 0
package_count = 0
for shipment_database_row in get_shipment(database, shipment):
ship_to_site = shipment_database_row.BPCORD_0
mdl = shipment_database_row.MDL_0
x12_ship_to = SITE_MAPPING[ship_to_site]
x12_mdl = SHIPPING_CODE_MAPPING[mdl]
header = ShipmentHeader(shipment_database_row)
detail = ShipmentDetail(shipment_database_row)
if not is_header_output:
output(header.x12(x12_ship_to,x12_mdl))
is_header_output = True
output(detail.x12())
detail_count += 1
package_count += detail.qtystu_0
if header:
output(header.footer(package_count, detail_count))
with database.transaction() as _:
database.query(
"""
update [PROD].[SDELIVERY]
set [XX4S_943RDY_0] = 1
where [SOHNUM_0] = :shipment
""",
shipment=shipment,
)
order = get_order_for_shipment(database, shipment)
database.query(
"""
update [PROD].[SORDER]
set [XX4S_UDF2_0] = :sent_message
where [SOHNUM_0] = :order
""",
order=order,
sent_message=f"943 Sent {datetime.date.today().isoformat()}",
)
def get_shipment_destination(database: records.Connection, shipment: str) -> str:
"""
Get the destination site
"""
return (
database.query(
"""
select
[SDH].[BPCORD_0]
from [PROD].[SDELIVERY] as [SDH]
where
[SDH].[SDHNUM_0] = :shipment
""",
shipment=shipment,
)
.first()
.BPCORD_0
)
def get_order_for_shipment(database: records.Connection, shipment: str) -> str:
"""
What is the order for this shipment?
"""
return (
database.query(
"""
select
[SDH].[SOHNUM_0]
from [PROD].[SDELIVERY] as [SDH]
where
[SDH].[SDHNUM_0] = :shipment
""",
shipment=shipment,
)
.first()
.SOHNUM_0
)
def get_shipments(database: records.Connection) -> typing.Iterator[str]:
"""
What have we shipped? Fetch from X3.
"""
for shipment_result in database.query(
"""
select
[SDH].[SDHNUM_0]
from [PROD].[SDELIVERY] as [SDH]
join [PROD].[SORDER] as [SOH]
on [SOH].[SOHNUM_0] = [SDH].[SOHNUM_0]
where
(
[SDH].[STOFCY_0] in ('PMW','WCA','WNJ')
and [SDH].[BPCORD_0] in ('WON')
and nullif([SDH].[MDL_0],'') is not null
and nullif([SDH].[BPTNUM_0],'') is not null
)
and [SDH].[SHIDAT_0] >= {d'2023-10-09'}
and (
[SOH].[XX4S_UDF2_0] = ''
or [SDH].[XX4S_943RDY_0] = 2
)
and (
[SDH].[CFMFLG_0] = 2
or [SDH].[YLICPLATE_0] <> ''
or [SDH].[XX4S_943RDY_0] = 2
)
"""
):
yield shipment_result.SDHNUM_0
def get_shipment(
database: records.Connection, shipment: str
) -> typing.Iterator[records.Record]:
"""
Get shipment information from X3 database.
"""
yield from database.query(
"""
select
[SDHNUM_0]
,[STOFCY_0]
,[STOFCY]
,[SDHCAT_0]
,[SALFCY_0]
,[SALFCY]
,[SOHNUM_0]
,[CUSORDREF_0]
,[BPCORD_0]
,[BPDNAM_0]
,[BPDADDLIG_0]
,[BPDADDLIG_1]
,[BPDADDLIG_2]
,[CTY_0]
,[SAT_0]
,[POSCOD_0]
,[CRY_0]
,[CRYNAM_0]
,[BPCINV_0]
,[BPINAM_0]
,[BPTNUM_0]
,[BPTNAM_0]
,[MDL_0]
,[SCAC_0]
,[YLICPLATE_0]
,[DLVDAT_0]
,[CREDAT_0]
,[UPDDAT_0]
,[CCE_0]
,[SOPLIN_0]
,[ITMREF_0]
,[ITMDES1_0]
,[ZCASEUPC_0]
,[EANCOD_0]
,[STU_0]
,[QTYSTU_0]
,[LotQty]
,[LOT_0]
,[NETWEI_0]
,[GROWEI_0]
,[CFMFLG_0]
,[SHIDAT_0]
from [PROD].[zyumiddleware_shipment] as [SDH]
where
[SDH].[SDHNUM_0] = :shipment
""",
shipment=shipment,
)
@dataclasses.dataclass
class ShipmentHeader(X12):
"""
Header
"""
sdhnum: str
stofcy: str
stofcy: str
sdhcat: str
salfcy: str
salfcy: str
sohnum: str
cusordref: str
bpcord: str
bpdnam: str
bpdaddlig: str
bpdaddlig_1: str
bpdaddlig_2: str
cty: str
sat: str
poscod: str
cry: str
crynam: str
bpcinv: str
bpinam: str
bptnum: str
bptnam: str
scac: str
ylicplate: str
dlvdat: datetime.date
credat: datetime.date
upddat: datetime.date
cce: str
short_control_number: str
interchange_control_number: str
header_segments: int
footer_segments: int
def __init__(self, database_row: records.Record):
self.sdhnum = database_row.SDHNUM_0
self.stofcy = database_row.STOFCY_0
self.sdhcat = database_row.SDHCAT_0
self.salfcy = database_row.SALFCY_0
self.salfcy = database_row.SALFCY_0
self.sohnum = database_row.SOHNUM_0
self.cusordref = database_row.CUSORDREF_0
self.bpcord = database_row.BPCORD_0
self.bpdnam = database_row.BPDNAM_0
self.bpdaddlig = database_row.BPDADDLIG_0
self.bpdaddlig_1 = database_row.BPDADDLIG_1
self.bpdaddlig_2 = database_row.BPDADDLIG_2
self.cty = database_row.CTY_0
self.sat = database_row.SAT_0
self.poscod = database_row.POSCOD_0
self.cry = database_row.CRY_0
self.crynam = database_row.CRYNAM_0
self.bpcinv = database_row.BPCINV_0
self.bpinam = database_row.BPINAM_0
self.bptnum = database_row.BPTNUM_0
self.bptnam = database_row.BPTNAM_0
self.scac = database_row.SCAC_0
self.ylicplate = database_row.YLICPLATE_0
self.dlvdat = database_row.DLVDAT_0
self.credat = database_row.CREDAT_0
self.upddat = database_row.UPDDAT_0
self.cce = database_row.CCE_0
self.shidat = database_row.SHIDAT_0
raw_control_number = self.control_number()
self.short_control_number = f"{raw_control_number:04}"
self.interchange_control_number = (
f"{raw_control_number:09}" # Format to 9 characters
)
self.now = datetime.date.today()
self.date = self.now.strftime("%y%m%d")
self.long_date = self.now.strftime("%Y%m%d")
self.time = self.now.strftime("%H%m")
self.header_segments = 10
self.footer_segments = 2
def x12(self, receiver_id, mdl) -> str:
return "".join(
[
f"ISA*00* *00* *ZZ*YAMAMOTOYAMA *ZZ*{receiver_id}*",
self.date,
"*",
self.time,
"*U*00401*",
self.interchange_control_number,
"*0*P*>~",
self.line(
[
"GS",
"OW",#should this be AR? Shandex okayed "OW"
"YAMAMOTOYAMA",
f"{receiver_id}",
self.long_date,
self.time,
self.interchange_control_number,
"X",
"004010",
]
),
self.line(
[
"ST",
"943",
self.short_control_number,
]
),
self.line(
[
"W06",
"N",
self.sohnum,
self.shidat.strftime("%Y%m%d"),
self.sdhnum,
"",
"",
"",
"",
"",
"",
"AS",
]
),
"N1*RE*Pomona Wholesale*1*008930755~",
"N4*Pomona*CA*91768*US~",
self.line(
[
"N1",
"ST", # Ship to
self.bpdnam,
"53", # Building
self.bpcord,
]
),
self.line(
[
"N3",
self.bpdaddlig,
]
),
self.line(
[
"N4",
self.cty,
self.sat,
self.poscod,
self.cry,
]
),
self.line(
[
"N9",
"PO",
self.cusordref,
]
),
self.line(
[
"G62",
"17",
self.dlvdat.strftime("%Y%m%d"),
]
),
self.line(
[
"W27",
f"{mdl}",
self.scac,
]
),
]
)
def footer(self, package_count: int, detail_count: int):
"""
End footer
"""
segment_count = self.header_segments + (detail_count * 3) + self.footer_segments
return "".join(
[
self.line(
[
"W03",
str(int(package_count)),
]
),
self.line(
[
"SE",
str(segment_count),
self.short_control_number,
]
),
self.line(
[
"GE",
"1",
self.interchange_control_number,
]
),
self.line(
[
"IEA",
"1",
self.interchange_control_number,
]
),
]
)
@dataclasses.dataclass
class ShipmentDetail(X12):
"""
Shipment detail.
"""
soplin_0: str
itmref_0: str
itmdes1_0: str
stu_0: str
qtystu_0: int
lot_0: str
stofcy_0: str
zcaseupc_0: str
eancod_0: str
gtin_or_upc_code: str
gtin_or_upc_marker: str
def __init__(self, db_record: records.Record):
self.soplin_0 = str(db_record.SOPLIN_0)
self.itmref_0 = db_record.ITMREF_0
self.itmdes1_0 = db_record.ITMDES1_0
self.stu_0 = db_record.STU_0
self.qtystu_0 = db_record.LotQty
self.lot_0 = db_record.LOT_0
self.stofcy_0 = db_record.STOFCY_0
if self.stofcy_0 == 'WON' and self.stu_0 == 'CS': #Shadex requires CA for cases
self.stu_0 = 'CA'
self.zcaseupc_0 = db_record.ZCASEUPC_0
self.eancod_0 = db_record.EANCOD_0
self.gtin_or_upc_marker = 'UK'
if self.stu_0 == 'CS':
self.gtin_or_upc_code = self.zcaseupc_0
else:
self.gtin_or_upc_code = self.eancod_0
def x12(self) -> str:
"""
Format in X12
"""
return "".join(
[
self.line(
[
"W04",
str(int(self.qtystu_0)),
self.stu_0,
"",
"VN", # Vendor's (Seller's) Item Number
self.itmref_0,
"LT", # Lot number
self.lot_0,
"",
"",
"",
"",
"",
"",
self.gtin_or_upc_marker,
self.gtin_or_upc_code,#W04-15
]
),
self.line(
[
"G69",
self.itmdes1_0,
]
),
self.line(
[
"N9",
"LI",
self.soplin_0,
]
),
]
)
if __name__ == "__main__":
main()

459
edi_944.py Normal file
View File

@ -0,0 +1,459 @@
#!/usr/bin/env python3
"""
Consume a generic 944 file from 3PLs, and translate into a Sage X3
readable file - import template ZPTHI.
For Shadex we also need to reply with a 997
"""
# pylint: disable=too-many-instance-attributes
import dataclasses
import datetime
import decimal
import functools
import pathlib
import re
import shutil
import typing
import records # type: ignore
import yamamotoyama # type: ignore
import yamamotoyama.x3_imports # type: ignore
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "incoming"
IMPORTS_DIRECTORY = THIS_DIRECTORY / "x3_imports"
SHANDEX_944_FILENAME_RE = re.compile(
r"\A 944_YAMAMOTOYAMA_ \S+ [.]edi \Z", re.X | re.M | re.S
)
TEST_FILE = THIS_DIRECTORY / "edi-testing" / "944_YAMAMOTOYAMA_765aaebb-06c4-4eea-8d2a-7dddf2fd9ec2.edi"#TODO remove this
TEST_DIR = THIS_DIRECTORY / "edi-testing"
def main():
"""
Do it!
"""
# if SHANDEX_944_FILENAME_RE.match(TEST_FILE.name):#TODO remove these 2 lines
# process_file(TEST_FILE)
for edi_filename in X12_DIRECTORY.iterdir(): #TODO uncomment and review
if SHANDEX_944_FILENAME_RE.match(edi_filename.name):
process_file(edi_filename)
# file moved to 997 processing folder to be sent later
shutil.move(edi_filename, X12_DIRECTORY / "997_processing" / edi_filename.name)
def tokens_from_edi_file(
edi_filename: pathlib.Path,
) -> typing.Iterator[typing.List[str]]:
"""
Read tokens from EDI file
"""
with edi_filename.open(encoding="utf-8", newline="") as edi_file:
for record in edi_file.read().split("~"):
fields = record.split("*")
if fields[0] in { #TODO see if there are more fields used in vendor EDI
"ISA",
"ST",
"N2",
"N3",
"N4",
"LX",
}:
continue
yield fields
def process_file(edi_filename: pathlib.Path):
"""
Convert a specific EDI file into an import file.
"""
warehouse_receipt = Receipt()
for fields in tokens_from_edi_file(edi_filename):
if fields[0] == "W17":
_, _, rcpdat, _, sohnum, sdhnum = fields[:6]
warehouse_receipt.sdhnum = sdhnum
warehouse_receipt.header.rcpdat = datetime.datetime.strptime(
rcpdat, "%Y%m%d"
).date() # 20230922
if fields[0] == "N9" and fields[1] == "PO":
pohnum = fields[2]
if fields[0] == "W07":
# W07*1023*CA**PN*C08249*LT*07032026A***UK*10077652082491
# N9*LI*1000
_, qty_str, uom, _, _, itmref, _, lot = fields[:8]
subdetail = ReceiptSubDetail(
qtypcu=int(qty_str),
lot=lot,
)
if fields[0] == 'N9' and fields[1] == 'LI':
# N9*LI*1000
line = fields[2]
warehouse_receipt.append(
ReceiptDetail(
sdhnum=warehouse_receipt.sdhnum,
itmref=itmref,
qtyuom=int(qty_str),
poplin=int(line),
uom=uom
),
subdetail,
)
time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
#TODO explode out the stojou lines
with yamamotoyama.x3_imports.open_import_file(
TEST_DIR / f"ZPTHI_{warehouse_receipt.sdhnum}_{time_stamp}.dat" #todo change folder back to IMPORTS_DIRECTORY
) as import_file:
warehouse_receipt.output(import_file)
@dataclasses.dataclass
class ReceiptSubDetail:
"""
Information that goes onto a receipt sub-detail line, taken from ZPTHI template.
"""
sta: str = "A"
pcu: str = ""
qtypcu: int = 0
loc: str = ""
lot: str = ""
bpslot: str = ""
sernum: str = ""
def stojous(self, shipment, item) -> typing.List[str]:
"""
Convert grouped lot quantities into individual STOJOU records to fit on receipt
"""
with yamamotoyama.get_connection('test') as database: #todo remove 'test'
details = (
database.query(
"""
select
'S',
'A',
[STJ].[PCU_0],
cast(cast(-1*[STJ].[QTYSTU_0] as int) as nvarchar),
[STJ].[LOT_0],
'',
''
from [FY23TEST].[STOJOU] [STJ] --TODO change to PROD
where
[STJ].[VCRNUM_0] = :sdhnum
and [STJ].[ITMREF_0] = :itmref
and [STJ].[LOT_0] = :lot
and [STJ].[TRSTYP_0] = 4
""",
sdhnum=shipment,
itmref=item,
lot=self.lot,
)
.all()
)
return details
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to strings for X3 import writing.
"""
return yamamotoyama.x3_imports.convert_to_strings(
[
"S",
self.sta,
self.pcu,
self.qtypcu,
self.loc,
self.lot,
self.bpslot,
self.sernum,
]
)
@dataclasses.dataclass
class ReceiptDetail:
"""
Information that goes on a receipt detail line, taken from ZPTHI template.
"""
sdhnum: str = ""
poplin: int = 0
itmref: str = ""
itmdes: str = ""
uom: str = ""
qtyuom: int = 0
pjt: str = ""
star65: str = ""
star91: str = ""
star92: str = ""
subdetails: typing.List[ReceiptSubDetail] = dataclasses.field(
default_factory=list
)
def append(self, subdetail: ReceiptSubDetail):
"""
Add subdetail
"""
subdetail.pcu = self.uom
self.subdetails.append(subdetail)
def check_subdetail_qty(self):
"""
Check for shortages by totaling up subdetail quantities.
"""
total_cases = 0
for subdetail in self.subdetails:
total_cases -= subdetail.qtypcu
return abs(total_cases)
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to strings for X3 import writing.
"""
def fix_uom(uom):
x3_uom = ''
if uom == 'CA':
x3_uom = 'CS'
else:
x3_uom = uom
return x3_uom
self.qty = self.check_subdetail_qty()
return yamamotoyama.x3_imports.convert_to_strings(
[
"L",
self.sdhnum,
self.poplin,
self.itmref,
fix_uom(self.uom),
self.qty,
self.star65,
self.star91,
self.star92,
]
)
def __eq__(self, item: typing.Any) -> bool:
"""
Test for equality
"""
if isinstance(item, str):
return self.itmref == item
if isinstance(item, ReceiptDetail):
return self.itmref == item.itmref
return False
# def fill(self):#not needed for receipts
# """
# Set soplin & itmdes from itmref & sohnum
# """
# def get() -> records.Record:
# with yamamotoyama.get_connection() as database:
# how_many = (
# database.query(
# """
# select
# count(*) as [how_many]
# from [PROD].[SORDERP] as [SOP]
# where
# [SOP].[SOHNUM_0] = :sohnum
# and [SOP].[ITMREF_0] = :itmref
# """,
# sohnum=self.sohnum,
# itmref=self.itmref,
# )
# .first()
# .how_many
# )
# if how_many == 1:
# return database.query(
# """
# select top 1
# [SOP].[SOPLIN_0]
# ,[SOP].[ITMDES1_0]
# ,[SOP].[SAU_0]
# from [PROD].[SORDERP] as [SOP]
# where
# [SOP].[SOHNUM_0] = :sohnum
# and [SOP].[ITMREF_0] = :itmref
# order by
# [SOP].[SOPLIN_0]
# """,
# sohnum=self.sohnum,
# itmref=self.itmref,
# ).first()
result = get()
self.soplin = result.SOPLIN_0
self.itmdes = result.ITMDES1_0
self.sau = result.SAU_0
@dataclasses.dataclass
class ReceiptHeader:
"""
Information that goes on a receipt header, taken from ZPTHI template.
"""
stofcy: str = ""
bpcord: str = ""
prhfcy: str = ""
rcpdat: datetime.date = datetime.date(1753, 1, 1)
pthnum: str = ""
bpsnum: str = ""
cur: str = "USD"
star71 = ""
star72 = ""
star81 = ""
star82 = ""
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to X3 import line
"""
return yamamotoyama.x3_imports.convert_to_strings(
[
"E",
self.bpcord,
self.rcpdat.strftime("%Y%m%d"),
self.pthnum,
self.stofcy,
self.cur,
self.star71,
self.star72,
self.star81,
self.star82,
]
)
class ReceiptDetailList:
"""
List of receipt details
"""
_details: typing.List[ReceiptDetail]
_item_set: typing.Set[str]
def __init__(self):
self._details = []
self._item_set = set()
def append(
self,
receipt_detail: ReceiptDetail,
receipt_subdetail: ReceiptSubDetail,
):
"""
Append
"""
itmref = receipt_detail.itmref
if itmref in self._item_set:
for detail in self._details:
if detail == itmref:
detail.subdetails.append(receipt_subdetail)
return
self._item_set.add(itmref)
#receipt_detail.fill()
receipt_detail.append(receipt_subdetail)
self._details.append(receipt_detail)
def __iter__(self):
return iter(self._details)
class Receipt:
"""
Warehouse receipt, both header & details
"""
header: ReceiptHeader
details: ReceiptDetailList
_sdhnum: str
def __init__(self):
self.header = ReceiptHeader()
self._sdhnum = ""
self.details = ReceiptDetailList()
def append(
self,
receipt_detail: ReceiptDetail,
receipt_subdetail: ReceiptSubDetail,
):
"""
Add detail information.
"""
self.details.append(receipt_detail, receipt_subdetail)
@property
def sdhnum(self):
"""
shipment number
"""
return self._sdhnum
@sdhnum.setter
def sdhnum(self, value: str):
if self._sdhnum != value:
self._sdhnum = value
if value:
self._fill_info_from_shipment()
def _get_shipment_from_x3(self) -> records.Record:
"""
Fetch shipment from X3 database.
"""
with yamamotoyama.get_connection('test') as db_connection:#todo remove 'test'
return db_connection.query(
"""
select
[SDH].[STOFCY_0],
[SDH].[SDHNUM_0],
[SDH].[SALFCY_0],
[SDH].[BPCORD_0],
[SDH].[CUR_0],
[SDH].[SOHNUM_0]
from [FY23TEST].[SDELIVERY] [SDH]--TODO change back to [PROD]
where
[SDH].[SDHNUM_0] = :shipment
""",
shipment=self.sdhnum,
).first()
def _fill_info_from_shipment(self):
"""
When we learn the SOHNUM, we can copy information from the sales order.
"""
result = self._get_shipment_from_x3()
self.header.stofcy = result.STOFCY_0
self.header.sdhnum = result.SDHNUM_0
self.header.salfcy = result.SALFCY_0
self.header.bpcord = result.BPCORD_0
self.header.cur = result.CUR_0
self.header.sohnum = result.SOHNUM_0
def output(self, import_file: typing.TextIO):
"""
Output entire order to import_file.
"""
output = functools.partial(
yamamotoyama.x3_imports.output_with_file, import_file
)
output(self.header.convert_to_strings())
for detail in self.details:
output(detail.convert_to_strings())
for subdetail in detail.subdetails:
shipment = detail.sdhnum
item = detail.itmref
for record in subdetail.stojous(shipment, item):
#output(subdetail.convert_to_strings())
output(record)
if __name__ == "__main__":
main()

672
edi_945.py Normal file
View File

@ -0,0 +1,672 @@
#!/usr/bin/env python3
"""
Consume a 945 file from Source Logistics, and translate into a Sage X3
readable file.
"""
# pylint: disable=too-many-instance-attributes
import dataclasses
import datetime
import decimal
import functools
import pathlib
import re
import shutil
import typing
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import records # type: ignore
import yamamotoyama # type: ignore
import yamamotoyama.x3_imports # type: ignore
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "incoming"
IMPORTS_DIRECTORY = THIS_DIRECTORY / "x3_imports"
SOURCE_945_FILENAME_RE = re.compile(
r"\A Yamamotoyama_945_ .* [.]edi \Z", re.X | re.M | re.S
)
msg = MIMEMultipart()
msg['Subject'] = '945 processing error: Possible duplicate order lines?'
msg['Precedence'] = 'bulk'
msg['From'] = 'x3report@stashtea.com'
msg['To'] = 'bleeson@stashtea.com'
def main():
"""
Do it!
"""
for edi_filename in X12_DIRECTORY.iterdir():
if SOURCE_945_FILENAME_RE.match(edi_filename.name):
process_file(edi_filename)
shutil.move(edi_filename, X12_DIRECTORY / "archive" / edi_filename.name)
def tokens_from_edi_file(
edi_filename: pathlib.Path,
) -> typing.Iterator[typing.List[str]]:
"""
Read tokens from EDI file
"""
with edi_filename.open(encoding="utf-8", newline="") as edi_file:
for record in edi_file.read().split("~"):
fields = record.split("*")
if fields[0] in {
"ISA",
"GS",
"ST",
"N1",
"N2",
"N3",
"N4",
"G62",
"W27",
"W10",
"LX",
"MAN",
}:
continue
yield fields
def process_file(edi_filename: pathlib.Path):
"""
Convert a specific EDI file into an import file.
"""
tracking_number_not_found = True
warehouse_shipment = WarehouseShipment()
warehouse_shipment.header.ylicplate = '' #if we don't find a tracking number, submit a blank
for fields in tokens_from_edi_file(edi_filename):
if fields[0] == "W06":
_, _, sohnum, shidat_str = fields[:4]
warehouse_shipment.sohnum = sohnum
warehouse_shipment.header.shidat = datetime.datetime.strptime(
shidat_str, "%Y%m%d"
).date() # 20230922
if fields[0] == "N9" and fields[1] == "2I" and len(fields) > 2 and tracking_number_not_found:
warehouse_shipment.header.ylicplate = fields[2]
tracking_number_not_found = False
if fields[0] == "W12":
# W12*CC*32*32*0*CA**VN*08279*01112025C~
_, _, qty_str, det_qty, _, _, _, _, itmref, lot = fields[:10]
subdetail = WarehouseShipmentSubDetail(
qtypcu=-1 * int(det_qty),
lot=lot,
)
warehouse_shipment.append(
WarehouseShipmentDetail(
sohnum=warehouse_shipment.sohnum,
itmref=itmref,
qty=int(qty_str),
),
subdetail,
)
if fields[0] == "W03":
_, _, weight, _ = fields
warehouse_shipment.header.growei = weight
time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
with yamamotoyama.x3_imports.open_import_file(
IMPORTS_DIRECTORY / f"ZSHIP945S_{warehouse_shipment.sohnum}_{time_stamp}.dat"
) as import_file:
warehouse_shipment.output(import_file)
@dataclasses.dataclass
class WarehouseShipmentSubDetail:
"""
Information that goes onto a shipment sub-detail line, taken from ZSHIP945 template.
"""
sta: str = "A"
pcu: str = ""
qtypcu: int = 0
loc: str = ""
lot: str = ""
sernum: str = ""
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to strings for X3 import writing.
"""
return yamamotoyama.x3_imports.convert_to_strings(
[
"S",
self.sta,
self.pcu,
self.qtypcu,
self.loc,
self.lot,
self.sernum,
]
)
@dataclasses.dataclass
class WarehouseShipmentDetail:
"""
Information that goes on a shipment detail line, taken from ZSHIP945 template.
"""
sohnum: str = ""
soplin: int = 0
itmref: str = ""
itmdes: str = ""
sau: str = ""
qty: int = 0
star91: str = ""
star92: str = ""
subdetails: typing.List[WarehouseShipmentSubDetail] = dataclasses.field(
default_factory=list
)
def append(self, subdetail: WarehouseShipmentSubDetail):
"""
Add subdetail
"""
subdetail.pcu = self.sau
self.subdetails.append(subdetail)
def check_subdetail_qty(self):
"""
Check for shortages by totaling up subdetail quantities.
"""
total_cases = 0
for subdetail in self.subdetails:
total_cases += subdetail.qtypcu
return abs(total_cases)
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to strings for X3 import writing.
"""
self.qty = self.check_subdetail_qty()
return yamamotoyama.x3_imports.convert_to_strings(
[
"L",
self.sohnum,
self.soplin,
self.itmref,
self.itmdes,
self.sau,
self.qty,
self.star91,
self.star92,
]
)
def __eq__(self, item: typing.Any) -> bool:
"""
Test for equality
"""
if isinstance(item, str):
return self.itmref == item
if isinstance(item, WarehouseShipmentDetail):
return self.itmref == item.itmref
return False
def fill(self):
"""
Set soplin & itmdes from itmref & sohnum
"""
def get() -> records.Record:
with yamamotoyama.get_connection() as database:
how_many = (
database.query(
"""
select
count(*) as [how_many]
from [PROD].[SORDERP] as [SOP]
where
[SOP].[SOHNUM_0] = :sohnum
and [SOP].[ITMREF_0] = :itmref
""",
sohnum=self.sohnum,
itmref=self.itmref,
)
.first()
.how_many
)
if how_many == 1:
return database.query(
"""
select top 1
[SOP].[SOPLIN_0]
,[SOP].[ITMDES1_0]
,[SOP].[SAU_0]
from [PROD].[SORDERP] as [SOP]
where
[SOP].[SOHNUM_0] = :sohnum
and [SOP].[ITMREF_0] = :itmref
order by
[SOP].[SOPLIN_0]
""",
sohnum=self.sohnum,
itmref=self.itmref,
).first()
else:
emailtext = str(self.sohnum +' '+str(self.itmref))
msg.attach(MIMEText(emailtext, 'plain'))
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
smtp.send_message(msg)
raise NotImplementedError # TODO
result = get()
self.soplin = result.SOPLIN_0
self.itmdes = result.ITMDES1_0
self.sau = result.SAU_0
@dataclasses.dataclass
class WarehouseShipmentHeader:
"""
Information that goes on a shipment header, taken from ZSHIP945 template.
"""
salfcy: str = "STC"
stofcy: str = ""
sdhnum: str = ""
bpcord: str = ""
bpaadd: str = "SH001"
cur: str = "USD"
shidat: datetime.date = datetime.date(1753, 1, 1)
cfmflg: int = 1
pjt: str = ""
bptnum: str = ""
ylicplate: str = ""
invdtaamt_2: decimal.Decimal = decimal.Decimal()
invdtaamt_3: decimal.Decimal = decimal.Decimal()
invdtaamt_4: decimal.Decimal = decimal.Decimal()
invdtaamt_5: decimal.Decimal = decimal.Decimal()
invdtaamt_6: decimal.Decimal = decimal.Decimal()
invdtaamt_7: decimal.Decimal = decimal.Decimal()
invdtaamt_8: decimal.Decimal = decimal.Decimal()
invdtaamt_9: decimal.Decimal = decimal.Decimal()
die: str = ""
die_1: str = ""
die_2: str = ""
die_3: str = ""
die_4: str = ""
die_5: str = ""
die_6: str = ""
die_7: str = ""
die_8: str = ""
die_9: str = ""
die_10: str = ""
die_11: str = ""
die_12: str = ""
die_13: str = ""
die_14: str = ""
die_15: str = ""
die_16: str = ""
die_17: str = ""
die_18: str = ""
die_19: str = ""
cce: str = ""
cce_1: str = ""
cce_2: str = ""
cce_3: str = ""
cce_4: str = ""
cce_5: str = ""
cce_6: str = ""
cce_7: str = ""
cce_8: str = ""
cce_9: str = ""
cce_10: str = ""
cce_11: str = ""
cce_12: str = ""
cce_13: str = ""
cce_14: str = ""
cce_15: str = ""
cce_16: str = ""
cce_17: str = ""
cce_18: str = ""
cce_19: str = ""
bpdnam: str = ""
bpdaddlig: str = ""
bpdaddlig_1: str = ""
bpdaddlig_2: str = ""
bpdposcod: str = ""
bpdcty: str = ""
bpdsat: str = ""
bpdcry: str = ""
bpdcrynam: str = ""
sdhtyp: str = "SDN"
growei: decimal.Decimal = decimal.Decimal()
pacnbr: int = 0
star71: str = ""
star72: str = ""
star81: str = ""
star82: str = ""
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to X3 import line
"""
return yamamotoyama.x3_imports.convert_to_strings(
[
"H",
self.salfcy,
self.stofcy,
self.sdhnum,
self.bpcord,
self.bpaadd,
self.cur,
self.shidat.strftime("%Y%m%d"),
self.cfmflg,
self.pjt,
self.bptnum,
self.ylicplate,
self.invdtaamt_2,
self.invdtaamt_3,
self.invdtaamt_4,
self.invdtaamt_5,
self.invdtaamt_6,
self.invdtaamt_7,
self.invdtaamt_8,
self.invdtaamt_9,
self.die,
self.die_1,
self.die_2,
self.die_3,
self.die_4,
self.die_5,
self.die_6,
self.die_7,
self.die_8,
self.die_9,
self.die_10,
self.die_11,
self.die_12,
self.die_13,
self.die_14,
self.die_15,
self.die_16,
self.die_17,
self.die_18,
self.die_19,
self.cce,
self.cce_1,
self.cce_2,
self.cce_3,
self.cce_4,
self.cce_5,
self.cce_6,
self.cce_7,
self.cce_8,
self.cce_9,
self.cce_10,
self.cce_11,
self.cce_12,
self.cce_13,
self.cce_14,
self.cce_15,
self.cce_16,
self.cce_17,
self.cce_18,
self.cce_19,
self.bpdnam,
self.bpdaddlig,
self.bpdaddlig_1,
self.bpdaddlig_2,
self.bpdposcod,
self.bpdcty,
self.bpdsat,
self.bpdcry,
self.bpdcrynam,
self.sdhtyp,
self.growei,
self.pacnbr,
self.star71,
self.star72,
self.star81,
self.star82,
]
)
class WarehouseShipmentDetailList:
"""
List of shipment details
"""
_details: typing.List[WarehouseShipmentDetail]
_item_set: typing.Set[str]
def __init__(self):
self._details = []
self._item_set = set()
def append(
self,
shipment_detail: WarehouseShipmentDetail,
shipment_subdetail: WarehouseShipmentSubDetail,
):
"""
Append
"""
itmref = shipment_detail.itmref
if itmref in self._item_set:
for detail in self._details:
if detail == itmref:
detail.subdetails.append(shipment_subdetail)
return
self._item_set.add(itmref)
shipment_detail.fill()
shipment_detail.append(shipment_subdetail)
self._details.append(shipment_detail)
def __iter__(self):
return iter(self._details)
class WarehouseShipment:
"""
Warehosue shipment, both header & details
"""
header: WarehouseShipmentHeader
details: WarehouseShipmentDetailList
_sohnum: str
def __init__(self):
self.header = WarehouseShipmentHeader()
self._sohnum = ""
self.details = WarehouseShipmentDetailList()
def append(
self,
shipment_detail: WarehouseShipmentDetail,
shipment_subdetail: WarehouseShipmentSubDetail,
):
"""
Add detail information.
"""
self.details.append(shipment_detail, shipment_subdetail)
@property
def sohnum(self):
"""
Sales order number
"""
return self._sohnum
@sohnum.setter
def sohnum(self, value: str):
if self._sohnum != value:
self._sohnum = value
if value:
self._fill_info_from_so()
def _get_so_from_x3(self) -> records.Record:
"""
Fetch sales order from X3 database.
"""
with yamamotoyama.get_connection() as db_connection:
return db_connection.query(
"""
select
[SOH].[SALFCY_0]
,[SOH].[STOFCY_0]
,[SOH].[BPCORD_0]
,[SOH].[BPAADD_0]
,[SOH].[CUR_0]
,[SOH].[INVDTAAMT_2]
,[SOH].[INVDTAAMT_3]
,[SOH].[INVDTAAMT_4]
,[SOH].[INVDTAAMT_5]
,[SOH].[INVDTAAMT_6]
,[SOH].[INVDTAAMT_7]
,[SOH].[INVDTAAMT_8]
,[SOH].[INVDTAAMT_9]
,[SOH].[DIE_0]
,[SOH].[DIE_1]
,[SOH].[DIE_2]
,[SOH].[DIE_3]
,[SOH].[DIE_4]
,[SOH].[DIE_5]
,[SOH].[DIE_6]
,[SOH].[DIE_7]
,[SOH].[DIE_8]
,[SOH].[DIE_9]
,[SOH].[DIE_10]
,[SOH].[DIE_11]
,[SOH].[DIE_12]
,[SOH].[DIE_13]
,[SOH].[DIE_14]
,[SOH].[DIE_15]
,[SOH].[DIE_16]
,[SOH].[DIE_17]
,[SOH].[DIE_18]
,[SOH].[DIE_19]
,[SOH].[CCE_0]
,[SOH].[CCE_1]
,[SOH].[CCE_2]
,[SOH].[CCE_3]
,[SOH].[CCE_4]
,[SOH].[CCE_5]
,[SOH].[CCE_6]
,[SOH].[CCE_7]
,[SOH].[CCE_8]
,[SOH].[CCE_9]
,[SOH].[CCE_10]
,[SOH].[CCE_11]
,[SOH].[CCE_12]
,[SOH].[CCE_13]
,[SOH].[CCE_14]
,[SOH].[CCE_15]
,[SOH].[CCE_16]
,[SOH].[CCE_17]
,[SOH].[CCE_18]
,[SOH].[CCE_19]
,[SOH].[BPDNAM_0]
,[SOH].[BPDADDLIG_0]
,[SOH].[BPDADDLIG_1]
,[SOH].[BPDADDLIG_2]
,[SOH].[BPDPOSCOD_0]
,[SOH].[BPDCTY_0]
,[SOH].[BPDSAT_0]
,[SOH].[BPDCRY_0]
,[SOH].[BPDCRYNAM_0]
from [PROD].[SORDER] as [SOH]
where
[SOH].[SOHNUM_0] = :order
""",
order=self.sohnum,
).first()
def _copy_accounting_codes(self, result: records.Record):
"""
Fill in all the accounting codes
"""
self.header.die = result.DIE_0
self.header.die_1 = result.DIE_1
self.header.die_2 = result.DIE_2
self.header.die_3 = result.DIE_3
self.header.die_4 = result.DIE_4
self.header.die_5 = result.DIE_5
self.header.die_6 = result.DIE_6
self.header.die_7 = result.DIE_7
self.header.die_8 = result.DIE_8
self.header.die_9 = result.DIE_9
self.header.die_10 = result.DIE_10
self.header.die_11 = result.DIE_11
self.header.die_12 = result.DIE_12
self.header.die_13 = result.DIE_13
self.header.die_14 = result.DIE_14
self.header.die_15 = result.DIE_15
self.header.die_16 = result.DIE_16
self.header.die_17 = result.DIE_17
self.header.die_18 = result.DIE_18
self.header.die_19 = result.DIE_19
self.header.cce = result.CCE_0
self.header.cce_1 = result.CCE_1
self.header.cce_2 = result.CCE_2
self.header.cce_3 = result.CCE_3
self.header.cce_4 = result.CCE_4
self.header.cce_5 = result.CCE_5
self.header.cce_6 = result.CCE_6
self.header.cce_7 = result.CCE_7
self.header.cce_8 = result.CCE_8
self.header.cce_9 = result.CCE_9
self.header.cce_10 = result.CCE_10
self.header.cce_11 = result.CCE_11
self.header.cce_12 = result.CCE_12
self.header.cce_13 = result.CCE_13
self.header.cce_14 = result.CCE_14
self.header.cce_15 = result.CCE_15
self.header.cce_16 = result.CCE_16
self.header.cce_17 = result.CCE_17
self.header.cce_18 = result.CCE_18
self.header.cce_19 = result.CCE_19
def _fill_info_from_so(self):
"""
When we learn the SOHNUM, we can copy information from the sales order.
"""
result = self._get_so_from_x3()
self.header.salfcy = result.SALFCY_0
self.header.stofcy = result.STOFCY_0
self.header.bpcord = result.BPCORD_0
self.header.bpaadd = result.BPAADD_0
self.header.cur = result.CUR_0
self.header.invdtaamt_2 = result.INVDTAAMT_2
self.header.invdtaamt_3 = result.INVDTAAMT_3
self.header.invdtaamt_4 = result.INVDTAAMT_4
self.header.invdtaamt_5 = result.INVDTAAMT_5
self.header.invdtaamt_6 = result.INVDTAAMT_6
self.header.invdtaamt_7 = result.INVDTAAMT_7
self.header.invdtaamt_8 = result.INVDTAAMT_8
self.header.invdtaamt_9 = result.INVDTAAMT_9
self._copy_accounting_codes(result)
self.header.bpdnam = result.BPDNAM_0
self.header.bpdaddlig = result.BPDADDLIG_0
self.header.bpdaddlig_1 = result.BPDADDLIG_1
self.header.bpdaddlig_2 = result.BPDADDLIG_2
self.header.bpdposcod = result.BPDPOSCOD_0
self.header.bpdcty = result.BPDCTY_0
self.header.bpdsat = result.BPDSAT_0
self.header.bpdcry = result.BPDCRY_0
self.header.bpdcrynam = result.BPDCRYNAM_0
def output(self, import_file: typing.TextIO):
"""
Output entire order to import_file.
"""
output = functools.partial(
yamamotoyama.x3_imports.output_with_file, import_file
)
output(self.header.convert_to_strings())
for detail in self.details:
output(detail.convert_to_strings())
for subdetail in detail.subdetails:
output(subdetail.convert_to_strings())
if __name__ == "__main__":
main()

555
edi_947.py Normal file
View File

@ -0,0 +1,555 @@
#!/usr/bin/env python3
"""
Consume a generic 947 file from 3PLs, and translate into a Sage X3
readable file - import template ZSCS.
For Shadex we also need to reply with a 997
947 is warehouse advice, to alert us of damages or amount changes from something
like a count
"""
#what about serial numbers?
#status on L line?
#remove negative line, needs to look up stocou? not sure.
# pylint: disable=too-many-instance-attributes
import dataclasses
import datetime
import decimal
import functools
import pathlib
import re
import shutil
import typing
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import records # type: ignore
import yamamotoyama # type: ignore
import yamamotoyama.x3_imports # type: ignore
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "incoming"
IMPORTS_DIRECTORY = THIS_DIRECTORY / "x3_imports"
SHANDEX_947_FILENAME_RE = re.compile(
r"\A 947_YAMAMOTOYAMA_ \S+ [.]edi \Z", re.X | re.M | re.S
)
#TODO remove this and have a single production name
SHANDEX_947_FILENAME_RE = re.compile(
r"\A STASH_TEST_947 \S+ [.]edi \Z", re.X | re.M | re.S
)
TEST_FILE = THIS_DIRECTORY / "edi-testing" / "STASH_TEST_947_e19f1f58-d77c-47da-a1f1-2ba449eba6ae.edi"#TODO remove this
TEST_DIR = THIS_DIRECTORY / "edi-testing"
DAMAGE_CODE_MAPPING = {
"07" : 'RD',#Product Dumped or Destroyed
"AV" : 'RD'#Damaged in Transit
}
DAMAGE_CODE_DESCRIPTIONS_MAPPING = {
"07" : "Product Dumped or Destroyed",
"AV" : "Damaged in Transit"
}
def main():
"""
Do it!
"""
if SHANDEX_947_FILENAME_RE.match(TEST_FILE.name):#TODO remove these 2 lines
process_file(TEST_FILE)
# for edi_filename in X12_DIRECTORY.iterdir(): #TODO uncomment and review
# if SHANDEX_947_FILENAME_RE.match(edi_filename.name):
# process_file(edi_filename)
# file moved to 997 processing folder to be sent later
# shutil.move(edi_filename, X12_DIRECTORY / "997_processing" / edi_filename.name)
def tokens_from_edi_file(
edi_filename: pathlib.Path,
) -> typing.Iterator[typing.List[str]]:
"""
Read tokens from EDI file
"""
with edi_filename.open(encoding="utf-8", newline="") as edi_file:
for record in edi_file.read().split("~"):
fields = record.split("*")
if fields[0] in { #TODO see if there are more fields used in vendor EDI
"ISA",
"GS"
"W15",
"N1",
"N9",
"SE",
"GE",
"IEA"
}:
continue
yield fields
def gtin_lookup(gtin):
with yamamotoyama.get_connection('test') as db_connection:#todo remove 'test'
return db_connection.query(
"""
select
[ITM].[ITMREF_0],
[ITM].[ITMDES1_0],
[ITM].[EANCOD_0],
[ITM].[ZCASEUPC_0]
from [FY23TEST].[ITMMASTER] [ITM]--TODO change back to [PROD]
where
[ITM].[ZCASEUPC_0] = :zcaseupc
""",
zcaseupc=gtin,
).first()["ITMREF_0"]
def stock_movement_alert(gtin, qty, lot, status):
msg = MIMEMultipart()
msg['Subject'] = 'New Stock Change from Shandex'
msg['Precedence'] = 'bulk'
msg['From'] = 'x3report@stashtea.com'
msg['To'] = 'bleeson@stashtea.com'#TODO correct receipientscares
emailtext = f'Item: {gtin_lookup(gtin)}\nQty: {qty}\nLot: {lot}\nStatus: {DAMAGE_CODE_MAPPING[status]}\nReason: {DAMAGE_CODE_DESCRIPTIONS_MAPPING[status]}'
msg.attach(MIMEText(emailtext, 'plain'))
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
smtp.send_message(msg)
def process_file(edi_filename: pathlib.Path):
"""
Convert a specific EDI file into an import file.
"""
warehouse_stockchange = StockChange()
vcrlin = 0
for fields in tokens_from_edi_file(edi_filename):
if fields[0] == "G62":
iptdat = fields[2]
warehouse_stockchange.header.iptdat = datetime.datetime.strptime(
iptdat, "%Y%m%d"
).date()
if fields[0] == "W19":
vcrlin += 1000
# W19*AV*35*CA**UK*10077652082651***03022026C
_, status, qty, uom, _, _, gtin, _, _, lot = fields[:10]
stock_movement_alert(gtin, qty, lot, status)
warehouse_stockchange.header.vcrdes = DAMAGE_CODE_DESCRIPTIONS_MAPPING[status]
subdetail = StockChangeSubDetail(
qtypcu=int(qty),
sta=DAMAGE_CODE_MAPPING[status],
pcu=uom
)
detail_line = StockChangeDetail(
vcrlin=vcrlin,
itmref=gtin,
pcu=uom,
lot=lot
)
warehouse_stockchange.append(
detail_line,
subdetail,
)
time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
with yamamotoyama.x3_imports.open_import_file(
TEST_DIR / f"ZSCS_{warehouse_stockchange.sdhnum}_{time_stamp}.dat" #todo change folder back to IMPORTS_DIRECTORY
) as import_file:
warehouse_stockchange.output(import_file)
#TODO email notification about stock change record creation and reason code to Ilora, anyone else too?
@dataclasses.dataclass
class StockChangeSubDetail:
"""
Information that goes onto a stockchange sub-detail line, taken from ZPTHI template.
"""
pcu: str = ""
qtypcu: int = 0
qtystu: int = 0
loc: str = ""
sta: str = "A"
# def stojous(self, shipment, item) -> typing.List[str]:
# """
# Convert grouped lot quantities into individual STOJOU records to fit on stockchange
# """
# with yamamotoyama.get_connection('test') as database: #todo remove 'test'
# details = (
# database.query(
# """
# select
# 'S',
# 'A',
# [STJ].[PCU_0],
# cast(cast(-1*[STJ].[QTYSTU_0] as int) as nvarchar),
# [STJ].[LOT_0],
# '',
# ''
# from [FY23TEST].[STOJOU] [STJ] --TODO change to PROD
# where
# [STJ].[VCRNUM_0] = :sdhnum
# and [STJ].[ITMREF_0] = :itmref
# and [STJ].[LOT_0] = :lot
# and [STJ].[TRSTYP_0] = 4
# """,
# sdhnum=shipment,
# itmref=item,
# lot=self.lot,
# )
# .all()
# )
# return details
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to strings for X3 import writing.
"""
return yamamotoyama.x3_imports.convert_to_strings(
[
"S",
self.pcu,
self.qtypcu,
self.qtystu,
self.loc,
self.sta,
]
)
@dataclasses.dataclass
class StockChangeDetail:
"""
Information that goes on a stockchange detail line, taken from ZPTHI template.
"""
vcrlin: int = 0
itmref: str = ""
pcu: str = ""
pcustucoe: int = 1 #does this need a lookup?
sta: str = "A" #todo this needs to flip based on the transaction A > R, A > Q, what about Q > A?
loctyp: str = ""
loc: str = ""
lot: str = ""
slo: str = ""
sernum: str = ""
palnum: str = ""
ctrnum: str = ""
qlyctldem: str= ""
owner: str = "WON"
subdetails: typing.List[StockChangeSubDetail] = dataclasses.field(
default_factory=list
)
def palnum_lookup(self, itmref, lot, status):
"""
Pick a palnum from X3 using the lot, location, and status
It doesn't matter which one we choose?
"""
with yamamotoyama.get_connection('test') as db_connection:#todo remove 'test'
return db_connection.query(
"""
select
[STO].[STOFCY_0],
[STO].[ITMREF_0],
[STO].[LOT_0],
[STO].[PALNUM_0]
from [FY23TEST].[STOCK] [STO] --TODO change to PROD
where
[STO].[ITMREF_0] = :itmref
and [STO].[STOFCY_0] = 'WON'
and [STO].[LOT_0] = :lot
and [STO].[STA_0] = :status
""",
itmref=itmref,
lot=lot,
status=status
).first()["PALNUM_0"]
def gtin_lookup(self, gtin):
with yamamotoyama.get_connection('test') as db_connection:#todo remove 'test'
return db_connection.query(
"""
select
[ITM].[ITMREF_0],
[ITM].[ITMDES1_0],
[ITM].[EANCOD_0],
[ITM].[ZCASEUPC_0]
from [FY23TEST].[ITMMASTER] [ITM]--TODO change back to [PROD]
where
[ITM].[ZCASEUPC_0] = :zcaseupc
""",
zcaseupc=gtin,
).first()["ITMREF_0"]
def append(self, subdetail: StockChangeSubDetail):
"""
Add subdetail
"""
subdetail.pcu = self.pcu
self.subdetails.append(subdetail)
def check_subdetail_qty(self):
"""
Check for shortages by totaling up subdetail quantities.
"""
total_cases = 0
for subdetail in self.subdetails:
total_cases -= subdetail.qtypcu
return abs(total_cases)
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to strings for X3 import writing.
"""
def fix_uom(uom):
x3_uom = ''
if uom == 'CA':
x3_uom = 'CS'
else:
x3_uom = uom
return x3_uom
self.qty = self.check_subdetail_qty()
return yamamotoyama.x3_imports.convert_to_strings(
[
"L",
self.vcrlin,
self.gtin_lookup(self.itmref),
fix_uom(self.pcu),
self.pcustucoe,
self.sta,
self.loctyp,
self.loc,
self.lot,
self.slo,
self.sernum,
self.palnum_lookup(self.itmref, self.lot, self.sta),
self.ctrnum,
self.qlyctldem,
self.owner
]
)
def __eq__(self, item: typing.Any) -> bool:
"""
Test for equality
"""
if isinstance(item, str):
return self.itmref == item
if isinstance(item, StockChangeDetail):
return self.itmref == item.itmref
return False
# def fill(self):#not needed for stockchanges
# """
# Set soplin & itmdes from itmref & sohnum
# """
# def get() -> records.Record:
# with yamamotoyama.get_connection() as database:
# how_many = (
# database.query(
# """
# select
# count(*) as [how_many]
# from [PROD].[SORDERP] as [SOP]
# where
# [SOP].[SOHNUM_0] = :sohnum
# and [SOP].[ITMREF_0] = :itmref
# """,
# sohnum=self.sohnum,
# itmref=self.itmref,
# )
# .first()
# .how_many
# )
# if how_many == 1:
# return database.query(
# """
# select top 1
# [SOP].[SOPLIN_0]
# ,[SOP].[ITMDES1_0]
# ,[SOP].[SAU_0]
# from [PROD].[SORDERP] as [SOP]
# where
# [SOP].[SOHNUM_0] = :sohnum
# and [SOP].[ITMREF_0] = :itmref
# order by
# [SOP].[SOPLIN_0]
# """,
# sohnum=self.sohnum,
# itmref=self.itmref,
# ).first()
# result = get()
# self.soplin = result.SOPLIN_0
# self.itmdes = result.ITMDES1_0
# self.sau = result.SAU_0
@dataclasses.dataclass
class StockChangeHeader:
"""
Information that goes on a stockchange header, taken from ZSCS template.
"""
vcrnum: str = ""
stofcy: str = "WON"
iptdat: datetime.date = datetime.date(1753, 1, 1)
vcrdes: str = ""
pjt: str = ""
trsfam: str = "CHX"
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to X3 import line
"""
return yamamotoyama.x3_imports.convert_to_strings(
[
"E",
self.vcrnum,
self.stofcy,
self.iptdat.strftime("%Y%m%d"),
self.vcrdes,
self.pjt,
self.trsfam,
]
)
class StockChangeDetailList:
"""
List of stockchange details
"""
_details: typing.List[StockChangeDetail]
_item_set: typing.Set[str]
def __init__(self):
self._details = []
self._item_set = set()
def append(
self,
stockchange_detail: StockChangeDetail,
stockchange_subdetail: StockChangeSubDetail,
):
"""
Append
"""
itmref = stockchange_detail.itmref
if itmref in self._item_set:
for detail in self._details:
if detail == itmref:
detail.subdetails.append(stockchange_subdetail)
return
self._item_set.add(itmref)
#stockchange_detail.fill()
stockchange_detail.append(stockchange_subdetail)
self._details.append(stockchange_detail)
def __iter__(self):
return iter(self._details)
class StockChange:
"""
Warehouse stockchange, both header & details
"""
header: StockChangeHeader
details: StockChangeDetailList
_sdhnum: str
def __init__(self):
self.header = StockChangeHeader()
self._sdhnum = ""
self.details = StockChangeDetailList()
def append(
self,
stockchange_detail: StockChangeDetail,
stockchange_subdetail: StockChangeSubDetail,
):
"""
Add detail information.
"""
self.details.append(stockchange_detail, stockchange_subdetail)
@property
def sdhnum(self):
"""
shipment number
"""
return self._sdhnum
@sdhnum.setter
def sdhnum(self, value: str):
if self._sdhnum != value:
self._sdhnum = value
if value:
self._fill_info_from_shipment()
# def _get_shipment_from_x3(self) -> records.Record:
# """
# Fetch shipment from X3 database.
# """
# with yamamotoyama.get_connection('test') as db_connection:#todo remove 'test'
# return db_connection.query(
# """
# select
# [SDH].[STOFCY_0],
# [SDH].[SDHNUM_0],
# [SDH].[SALFCY_0],
# [SDH].[BPCORD_0],
# [SDH].[CUR_0],
# [SDH].[SOHNUM_0]
# from [FY23TEST].[SDELIVERY] [SDH]--TODO change back to [PROD]
# where
# [SDH].[SDHNUM_0] = :shipment
# """,
# shipment=self.sdhnum,
# ).first()
# def _fill_info_from_shipment(self):
# """
# When we learn the SOHNUM, we can copy information from the sales order.
# """
# result = self._get_shipment_from_x3()
# self.header.stofcy = result.STOFCY_0
# self.header.sdhnum = result.SDHNUM_0
# self.header.salfcy = result.SALFCY_0
# self.header.bpcord = result.BPCORD_0
# self.header.cur = result.CUR_0
# self.header.sohnum = result.SOHNUM_0
def output(self, import_file: typing.TextIO):
"""
Output entire order to import_file.
"""
output = functools.partial(
yamamotoyama.x3_imports.output_with_file, import_file
)
output(self.header.convert_to_strings())
for detail in self.details:
output(detail.convert_to_strings())
for subdetail in detail.subdetails:
#shipment = detail.sdhnum
#item = detail.itmref
output(subdetail.convert_to_strings())
# for record in subdetail.stojous(shipment, item):
# output(subdetail.convert_to_strings())
# output(record)
if __name__ == "__main__":
main()

656
edi_997_incoming.py Normal file
View File

@ -0,0 +1,656 @@
#!/usr/bin/env python3
"""
Consume a generic 997 file from 3PLs
Functional Acknowledgment
"""
# pylint: disable=too-many-instance-attributes
import dataclasses
import datetime
import decimal
import functools
import pathlib
import re
import shutil
import typing
import records # type: ignore
import yamamotoyama # type: ignore
import yamamotoyama.x3_imports # type: ignore
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "incoming"
SHANDEX_997_FILENAME_RE = re.compile( #TODO FIX REGEX
r"\A 997_YAMAMOTOYAMA_ .* [.]ed \Z", re.X | re.M | re.S
)
def main():
"""
Do it!
"""
for edi_filename in X12_DIRECTORY.iterdir():
if SHANDEX_997_FILENAME_RE.match(edi_filename.name):
#process_file(edi_filename)
shutil.move(edi_filename, X12_DIRECTORY / "archive" / edi_filename.name)
def tokens_from_edi_file(
edi_filename: pathlib.Path,
) -> typing.Iterator[typing.List[str]]:
"""
Read tokens from EDI file
"""
with edi_filename.open(encoding="utf-8", newline="") as edi_file:
for record in edi_file.read().split("~"):
fields = record.split("*")
if fields[0] in { #TODO see if there are more fields used in vendor EDI
"ISA",
"GS",
"ST",
"N1",
"N2",
"N3",
"N4",
"G62",
"W27",
"W10",
"LX",
"MAN",
}:
continue
yield fields
def process_file(edi_filename: pathlib.Path):
"""
997 is a functional acknowledgment, we don't need to do anything with it in X3
Do we need to send it somewhere for reporting purposes?
"""
# warehouse_receipt = Receipt()
# warehouse_receipt.header.ylicplate = '' #if we don't find a tracking number, submit a blank
# for fields in tokens_from_edi_file(edi_filename):
# if fields[0] == "W06":
# _, _, sohnum, shidat_str = fields[:4]
# warehouse_receipt.sohnum = sohnum
# warehouse_receipt.header.shidat = datetime.datetime.strptime(
# shidat_str, "%Y%m%d"
# ).date() # 20230922
# if fields[0] == "N9" and fields[1] == "2I" and len(fields) > 2 and tracking_number_not_found:
# warehouse_receipt.header.ylicplate = fields[2]
# tracking_number_not_found = False
# if fields[0] == "W12":
# W12*CC*32*32*0*CA**VN*08279*01112025C~
# _, _, qty_str, det_qty, _, _, _, _, itmref, lot = fields[:10]
# subdetail = ReceiptSubDetail(
# qtypcu=-1 * int(det_qty),
# lot=lot,
# )
# warehouse_receipt.append(
# ReceiptDetail(
# sohnum=warehouse_receipt.sohnum,
# itmref=itmref,
# qty=int(qty_str),
# ),
# subdetail,
# )
# if fields[0] == "W03":
# _, _, weight, _ = fields
# warehouse_receipt.header.growei = weight
# time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# with yamamotoyama.x3_imports.open_import_file(
# IMPORTS_DIRECTORY / f"ZSHIP945S_{warehouse_receipt.sohnum}_{time_stamp}.dat"
# ) as import_file:
# warehouse_receipt.output(import_file)
@dataclasses.dataclass
class ReceiptSubDetail:
"""
Information that goes onto a shipment sub-detail line, taken from ZSHIP945 template.
"""
sta: str = "A"
pcu: str = ""
qtypcu: int = 0
loc: str = ""
lot: str = ""
sernum: str = ""
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to strings for X3 import writing.
"""
return yamamotoyama.x3_imports.convert_to_strings(
[
"S",
self.sta,
self.pcu,
self.qtypcu,
self.loc,
self.lot,
self.sernum,
]
)
@dataclasses.dataclass
class ReceiptDetail:
"""
Information that goes on a shipment detail line, taken from ZSHIP945 template.
"""
sohnum: str = ""
soplin: int = 0
itmref: str = ""
itmdes: str = ""
sau: str = ""
qty: int = 0
star91: str = ""
star92: str = ""
subdetails: typing.List[ReceiptSubDetail] = dataclasses.field(
default_factory=list
)
def append(self, subdetail: ReceiptSubDetail):
"""
Add subdetail
"""
subdetail.pcu = self.sau
self.subdetails.append(subdetail)
def check_subdetail_qty(self):
"""
Check for shortages by totaling up subdetail quantities.
"""
total_cases = 0
for subdetail in self.subdetails:
total_cases += subdetail.qtypcu
return abs(total_cases)
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to strings for X3 import writing.
"""
self.qty = self.check_subdetail_qty()
return yamamotoyama.x3_imports.convert_to_strings(
[
"L",
self.sohnum,
self.soplin,
self.itmref,
self.itmdes,
self.sau,
self.qty,
self.star91,
self.star92,
]
)
def __eq__(self, item: typing.Any) -> bool:
"""
Test for equality
"""
if isinstance(item, str):
return self.itmref == item
if isinstance(item, ReceiptDetail):
return self.itmref == item.itmref
return False
def fill(self):
"""
Set soplin & itmdes from itmref & sohnum
"""
def get() -> records.Record:
with yamamotoyama.get_connection() as database:
how_many = (
database.query(
"""
select
count(*) as [how_many]
from [PROD].[SORDERP] as [SOP]
where
[SOP].[SOHNUM_0] = :sohnum
and [SOP].[ITMREF_0] = :itmref
""",
sohnum=self.sohnum,
itmref=self.itmref,
)
.first()
.how_many
)
if how_many == 1:
return database.query(
"""
select top 1
[SOP].[SOPLIN_0]
,[SOP].[ITMDES1_0]
,[SOP].[SAU_0]
from [PROD].[SORDERP] as [SOP]
where
[SOP].[SOHNUM_0] = :sohnum
and [SOP].[ITMREF_0] = :itmref
order by
[SOP].[SOPLIN_0]
""",
sohnum=self.sohnum,
itmref=self.itmref,
).first()
result = get()
self.soplin = result.SOPLIN_0
self.itmdes = result.ITMDES1_0
self.sau = result.SAU_0
@dataclasses.dataclass
class ReceiptHeader:
"""
Information that goes on a shipment header, taken from ZSHIP945 template.
"""
salfcy: str = "STC"
stofcy: str = ""
sdhnum: str = ""
bpcord: str = ""
bpaadd: str = "SH001"
cur: str = "USD"
shidat: datetime.date = datetime.date(1753, 1, 1)
cfmflg: int = 1
pjt: str = ""
bptnum: str = ""
ylicplate: str = ""
invdtaamt_2: decimal.Decimal = decimal.Decimal()
invdtaamt_3: decimal.Decimal = decimal.Decimal()
invdtaamt_4: decimal.Decimal = decimal.Decimal()
invdtaamt_5: decimal.Decimal = decimal.Decimal()
invdtaamt_6: decimal.Decimal = decimal.Decimal()
invdtaamt_7: decimal.Decimal = decimal.Decimal()
invdtaamt_8: decimal.Decimal = decimal.Decimal()
invdtaamt_9: decimal.Decimal = decimal.Decimal()
die: str = ""
die_1: str = ""
die_2: str = ""
die_3: str = ""
die_4: str = ""
die_5: str = ""
die_6: str = ""
die_7: str = ""
die_8: str = ""
die_9: str = ""
die_10: str = ""
die_11: str = ""
die_12: str = ""
die_13: str = ""
die_14: str = ""
die_15: str = ""
die_16: str = ""
die_17: str = ""
die_18: str = ""
die_19: str = ""
cce: str = ""
cce_1: str = ""
cce_2: str = ""
cce_3: str = ""
cce_4: str = ""
cce_5: str = ""
cce_6: str = ""
cce_7: str = ""
cce_8: str = ""
cce_9: str = ""
cce_10: str = ""
cce_11: str = ""
cce_12: str = ""
cce_13: str = ""
cce_14: str = ""
cce_15: str = ""
cce_16: str = ""
cce_17: str = ""
cce_18: str = ""
cce_19: str = ""
bpdnam: str = ""
bpdaddlig: str = ""
bpdaddlig_1: str = ""
bpdaddlig_2: str = ""
bpdposcod: str = ""
bpdcty: str = ""
bpdsat: str = ""
bpdcry: str = ""
bpdcrynam: str = ""
sdhtyp: str = "SDN"
growei: decimal.Decimal = decimal.Decimal()
pacnbr: int = 0
star71: str = ""
star72: str = ""
star81: str = ""
star82: str = ""
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to X3 import line
"""
return yamamotoyama.x3_imports.convert_to_strings(
[
"H",
self.salfcy,
self.stofcy,
self.sdhnum,
self.bpcord,
self.bpaadd,
self.cur,
self.shidat.strftime("%Y%m%d"),
self.cfmflg,
self.pjt,
self.bptnum,
self.ylicplate,
self.invdtaamt_2,
self.invdtaamt_3,
self.invdtaamt_4,
self.invdtaamt_5,
self.invdtaamt_6,
self.invdtaamt_7,
self.invdtaamt_8,
self.invdtaamt_9,
self.die,
self.die_1,
self.die_2,
self.die_3,
self.die_4,
self.die_5,
self.die_6,
self.die_7,
self.die_8,
self.die_9,
self.die_10,
self.die_11,
self.die_12,
self.die_13,
self.die_14,
self.die_15,
self.die_16,
self.die_17,
self.die_18,
self.die_19,
self.cce,
self.cce_1,
self.cce_2,
self.cce_3,
self.cce_4,
self.cce_5,
self.cce_6,
self.cce_7,
self.cce_8,
self.cce_9,
self.cce_10,
self.cce_11,
self.cce_12,
self.cce_13,
self.cce_14,
self.cce_15,
self.cce_16,
self.cce_17,
self.cce_18,
self.cce_19,
self.bpdnam,
self.bpdaddlig,
self.bpdaddlig_1,
self.bpdaddlig_2,
self.bpdposcod,
self.bpdcty,
self.bpdsat,
self.bpdcry,
self.bpdcrynam,
self.sdhtyp,
self.growei,
self.pacnbr,
self.star71,
self.star72,
self.star81,
self.star82,
]
)
class ReceiptDetailList:
"""
List of shipment details
"""
_details: typing.List[ReceiptDetail]
_item_set: typing.Set[str]
def __init__(self):
self._details = []
self._item_set = set()
def append(
self,
shipment_detail: ReceiptDetail,
shipment_subdetail: ReceiptSubDetail,
):
"""
Append
"""
itmref = shipment_detail.itmref
if itmref in self._item_set:
for detail in self._details:
if detail == itmref:
detail.subdetails.append(shipment_subdetail)
return
self._item_set.add(itmref)
shipment_detail.fill()
shipment_detail.append(shipment_subdetail)
self._details.append(shipment_detail)
def __iter__(self):
return iter(self._details)
class Receipt:
"""
Warehosue shipment, both header & details
"""
header: ReceiptHeader
details: ReceiptDetailList
_sohnum: str
def __init__(self):
self.header = ReceiptHeader()
self._sohnum = ""
self.details = ReceiptDetailList()
def append(
self,
shipment_detail: ReceiptDetail,
shipment_subdetail: ReceiptSubDetail,
):
"""
Add detail information.
"""
self.details.append(shipment_detail, shipment_subdetail)
@property
def sohnum(self):
"""
Sales order number
"""
return self._sohnum
@sohnum.setter
def sohnum(self, value: str):
if self._sohnum != value:
self._sohnum = value
if value:
self._fill_info_from_so()
def _get_so_from_x3(self) -> records.Record:
"""
Fetch sales order from X3 database.
"""
with yamamotoyama.get_connection() as db_connection:
return db_connection.query(
"""
select
[SOH].[SALFCY_0]
,[SOH].[STOFCY_0]
,[SOH].[BPCORD_0]
,[SOH].[BPAADD_0]
,[SOH].[CUR_0]
,[SOH].[INVDTAAMT_2]
,[SOH].[INVDTAAMT_3]
,[SOH].[INVDTAAMT_4]
,[SOH].[INVDTAAMT_5]
,[SOH].[INVDTAAMT_6]
,[SOH].[INVDTAAMT_7]
,[SOH].[INVDTAAMT_8]
,[SOH].[INVDTAAMT_9]
,[SOH].[DIE_0]
,[SOH].[DIE_1]
,[SOH].[DIE_2]
,[SOH].[DIE_3]
,[SOH].[DIE_4]
,[SOH].[DIE_5]
,[SOH].[DIE_6]
,[SOH].[DIE_7]
,[SOH].[DIE_8]
,[SOH].[DIE_9]
,[SOH].[DIE_10]
,[SOH].[DIE_11]
,[SOH].[DIE_12]
,[SOH].[DIE_13]
,[SOH].[DIE_14]
,[SOH].[DIE_15]
,[SOH].[DIE_16]
,[SOH].[DIE_17]
,[SOH].[DIE_18]
,[SOH].[DIE_19]
,[SOH].[CCE_0]
,[SOH].[CCE_1]
,[SOH].[CCE_2]
,[SOH].[CCE_3]
,[SOH].[CCE_4]
,[SOH].[CCE_5]
,[SOH].[CCE_6]
,[SOH].[CCE_7]
,[SOH].[CCE_8]
,[SOH].[CCE_9]
,[SOH].[CCE_10]
,[SOH].[CCE_11]
,[SOH].[CCE_12]
,[SOH].[CCE_13]
,[SOH].[CCE_14]
,[SOH].[CCE_15]
,[SOH].[CCE_16]
,[SOH].[CCE_17]
,[SOH].[CCE_18]
,[SOH].[CCE_19]
,[SOH].[BPDNAM_0]
,[SOH].[BPDADDLIG_0]
,[SOH].[BPDADDLIG_1]
,[SOH].[BPDADDLIG_2]
,[SOH].[BPDPOSCOD_0]
,[SOH].[BPDCTY_0]
,[SOH].[BPDSAT_0]
,[SOH].[BPDCRY_0]
,[SOH].[BPDCRYNAM_0]
from [PROD].[SORDER] as [SOH]
where
[SOH].[SOHNUM_0] = :order
""",
order=self.sohnum,
).first()
def _copy_accounting_codes(self, result: records.Record):
"""
Fill in all the accounting codes
"""
self.header.die = result.DIE_0
self.header.die_1 = result.DIE_1
self.header.die_2 = result.DIE_2
self.header.die_3 = result.DIE_3
self.header.die_4 = result.DIE_4
self.header.die_5 = result.DIE_5
self.header.die_6 = result.DIE_6
self.header.die_7 = result.DIE_7
self.header.die_8 = result.DIE_8
self.header.die_9 = result.DIE_9
self.header.die_10 = result.DIE_10
self.header.die_11 = result.DIE_11
self.header.die_12 = result.DIE_12
self.header.die_13 = result.DIE_13
self.header.die_14 = result.DIE_14
self.header.die_15 = result.DIE_15
self.header.die_16 = result.DIE_16
self.header.die_17 = result.DIE_17
self.header.die_18 = result.DIE_18
self.header.die_19 = result.DIE_19
self.header.cce = result.CCE_0
self.header.cce_1 = result.CCE_1
self.header.cce_2 = result.CCE_2
self.header.cce_3 = result.CCE_3
self.header.cce_4 = result.CCE_4
self.header.cce_5 = result.CCE_5
self.header.cce_6 = result.CCE_6
self.header.cce_7 = result.CCE_7
self.header.cce_8 = result.CCE_8
self.header.cce_9 = result.CCE_9
self.header.cce_10 = result.CCE_10
self.header.cce_11 = result.CCE_11
self.header.cce_12 = result.CCE_12
self.header.cce_13 = result.CCE_13
self.header.cce_14 = result.CCE_14
self.header.cce_15 = result.CCE_15
self.header.cce_16 = result.CCE_16
self.header.cce_17 = result.CCE_17
self.header.cce_18 = result.CCE_18
self.header.cce_19 = result.CCE_19
def _fill_info_from_so(self):
"""
When we learn the SOHNUM, we can copy information from the sales order.
"""
result = self._get_so_from_x3()
self.header.salfcy = result.SALFCY_0
self.header.stofcy = result.STOFCY_0
self.header.bpcord = result.BPCORD_0
self.header.bpaadd = result.BPAADD_0
self.header.cur = result.CUR_0
self.header.invdtaamt_2 = result.INVDTAAMT_2
self.header.invdtaamt_3 = result.INVDTAAMT_3
self.header.invdtaamt_4 = result.INVDTAAMT_4
self.header.invdtaamt_5 = result.INVDTAAMT_5
self.header.invdtaamt_6 = result.INVDTAAMT_6
self.header.invdtaamt_7 = result.INVDTAAMT_7
self.header.invdtaamt_8 = result.INVDTAAMT_8
self.header.invdtaamt_9 = result.INVDTAAMT_9
self._copy_accounting_codes(result)
self.header.bpdnam = result.BPDNAM_0
self.header.bpdaddlig = result.BPDADDLIG_0
self.header.bpdaddlig_1 = result.BPDADDLIG_1
self.header.bpdaddlig_2 = result.BPDADDLIG_2
self.header.bpdposcod = result.BPDPOSCOD_0
self.header.bpdcty = result.BPDCTY_0
self.header.bpdsat = result.BPDSAT_0
self.header.bpdcry = result.BPDCRY_0
self.header.bpdcrynam = result.BPDCRYNAM_0
def output(self, import_file: typing.TextIO):
"""
Output entire order to import_file.
"""
output = functools.partial(
yamamotoyama.x3_imports.output_with_file, import_file
)
output(self.header.convert_to_strings())
for detail in self.details:
output(detail.convert_to_strings())
for subdetail in detail.subdetails:
output(subdetail.convert_to_strings())
if __name__ == "__main__":
main()

163
edi_997_outgoing.py Normal file
View File

@ -0,0 +1,163 @@
#!/usr/bin/env python3
"""
Create a 997 and load it into the proper outgoing folder
A 997 is a functional acknowledgment, we received the EDI tranmission contorl number and reply back with it
"""
import dataclasses
import datetime
import io
import pathlib
import typing
import pprint
import records # type: ignore
import yamamotoyama # type: ignore
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "edi-testing" #TODO change back to "outgoing"
ACK_DIRECTORY = THIS_DIRECTORY / "997_processing"
TEST_FILE = THIS_DIRECTORY / "edi-testing" / "944_YAMAMOTOYAMA_765aaebb-06c4-4eea-8d2a-7dddf2fd9ec2.edi"#TODO remove this
AK1_MAPPING = {
"944" : "RE",
"945" : "SW",
}
def main():
"""
Do it!
"""
#write_997(TEST_FILE)
#TODO loop through 997 directory and send for each file inside
for edi_filename in ACK_DIRECTORY.iterdir(): #TODO uncomment and review
process_file(edi_filename)
# file moved to 997 processing folder to be sent later
shutil.move(edi_filename, X12_DIRECTORY / "outgoing" / "archive" / edi_filename.name)
def write_997(edi_filename: pathlib.Path):
"""
Write out a 997 to a file
"""
group_control_number = ''
transaction_set_control_number = ''
company = ''
edi_type = ''
with open(edi_filename, 'r') as edi_file:
for line in edi_file:
line = line.split("~")
for field in line:
fields = field.split("*")
if fields[0] == 'ISA':
group_control_number = fields[13]
company_longname = fields[6]
elif fields[0] == 'GS':
company = fields[2]
transaction_set_control_number = fields[6]
elif fields[0] == 'ST':
edi_type = fields[1]
now = datetime.datetime.now()
datestamp_string = now.strftime("%Y-%m-%d-%H-%M-%S")
with (X12_DIRECTORY / f"{company}-{transaction_set_control_number}-{datestamp_string}-997.edi").open(
"w", encoding="utf-8", newline="\n"
) as x12_file:
raw_control_number = control_number()
output = x12_file.write
header = write_997_header(raw_control_number,company_longname,company,edi_type,group_control_number)
lines = write_997_lines(edi_type,transaction_set_control_number)
footer = write_997_footer(raw_control_number,group_control_number)
output(header)
output(lines)
output(footer)
def write_997_header(raw_control_number,company_longname,company,edi_type,group_control_number):
now = datetime.datetime.now()
date = now.strftime("%y%m%d")
longdate = now.strftime("%Y%m%d")
time = now.strftime("%H%M")
short_control_number = f"{raw_control_number:04}"
interchange_control_number = (
f"{raw_control_number:09}" # Format to 9 characters
)
AK1 = AK1_MAPPING[edi_type]
header_string = ''.join([
f"ISA*00* *00* *ZZ*YAMAMOTOYAMA *ZZ*{company_longname}*",
date,
"*",
time,
"*U*00401*",
interchange_control_number,
"*0*P*>~",
"GS*",
"FA*",
"YAMAMOTOYAMA*",
company+"*",
longdate+"*",
time+"*",
short_control_number+"*",
"X*",
"004010~",
"ST*",
"997*0001~",
"AK1*",
AK1+"*",
group_control_number+"~",
]
)
return header_string
def write_997_lines(edi_type,transaction_set_control_number):
#short_control_number = f"{group_control_number:04}"
detail_string = ''.join([
"AK2*",
edi_type+"*",
transaction_set_control_number+"~"
"AK5*",
"A~",
"AK9*A*1*1*1~",
])
return detail_string
def write_997_footer(raw_control_number,group_control_number):
interchange_control_number = (
f"{raw_control_number:09}" # Format to 9 characters
)
short_control_number = f"{raw_control_number:04}"
footer_string = ''.join([
"SE*6*0001~",
"GE*1*",
str(short_control_number)+"~",
"IEA*1*",
str(interchange_control_number)
])
return footer_string
def control_number() -> int:
"""
Next EDI serial number
"""
filepath = pathlib.Path(__file__).with_suffix(".remember")
encoding = "utf-8"
newline = "\n"
try:
with filepath.open(
"r", encoding=encoding, newline=newline
) as remember_file:
number = int(remember_file.readline().rstrip("\n"))
except (OSError, ValueError):
number = 0
number += 1
with filepath.open("w", encoding=encoding, newline=newline) as remember_file:
remember_file.write(f"{number}\n")
return number
if __name__ == "__main__":
main()

82
master_contoller.py Normal file
View File

@ -0,0 +1,82 @@
#!/usr/bin/env python3
"""
Control middleware
"""
import contextlib
import shutil
import pathlib
import paramiko # type: ignore
import pprint
#import edi_943multi3pl #TODO remove 940 from this file
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_SHANDEX_OUTGOING = THIS_DIRECTORY / "outgoing"
X12_SHANDEX_INCOMING = THIS_DIRECTORY / "incoming"
def main():
"""
Do it!
"""
#process all EDIs that start with us
#edi_943-multi3pl.main()#TODO make this file Shandex only
#send them to Shandex
#send_x12_edi_files_shandex()#TODO set this up
#pick up files from Shandex
retrieve_x12_edi_files_shandex()
#process all EDIs that started with Shandex
#edi_997_outbound.main()
#edi_945.main()
# edi_944.main()
# edi_997_inbound.main()
#combine_zship945s()
# SL_SFTP_HOST = "s-8ade4d252cc44c50b.server.transfer.us-west-1.amazonaws.com"
# SL_SFTP_USERNAME = "yumiddleware2023"
SSH_DIRECTORY = THIS_DIRECTORY / "ssh"
SSH_KNOWN_HOSTS_FILE = str(SSH_DIRECTORY / "known_hosts")
SSH_KEY_FILENAME = str(SSH_DIRECTORY / "id_ed25519")
SHANDEX_SFTP_HOST = "ftp.shandex.com"
SHANDEX_SFTP_USERNAME = "Stash"
SHANDEX_SFTP_PASSWORD = "ST@Pass2024$$"
def send_x12_edi_files_shandex():
"""
Connect to FTP & send files.
"""
ssh_client.load_system_host_keys()
ssh_client.load_host_keys(SSH_KNOWN_HOSTS_FILE)
ssh_client.set_missing_host_key_policy(paramiko.client.RejectPolicy)
ssh_client.connect(
hostname=SHANDEX_SFTP_HOST, username=SHANDEX_SFTP_USERNAME, password=SHANDEX_SFTP_PASSWORD
)
with ssh_client.open_sftp() as sftp_connection:
sftp_connection.chdir("/Stash/Test/ToShandex") #TODO change to production folder
for filename in X12_SHANDEX_OUTGOING.glob("*.edi"):
sftp_connection.put(filename, str(filename.name))
shutil.move(filename, X12_SHANDEX_OUTGOING / "archive" / filename.name)
def retrieve_x12_edi_files_shandex():
with paramiko.SSHClient() as ssh_client:
ssh_client.load_system_host_keys()
ssh_client.load_host_keys(SSH_KNOWN_HOSTS_FILE)
ssh_client.set_missing_host_key_policy(paramiko.client.RejectPolicy)
ssh_client.connect(
hostname=SHANDEX_SFTP_HOST, username=SHANDEX_SFTP_USERNAME, password=SHANDEX_SFTP_PASSWORD
)
with ssh_client.open_sftp() as sftp_connection:
sftp_connection.chdir("/Stash/Test/FromShandex")
for filename in sftp_connection.listdir():
if filename.endswith(".edi"):
sftp_connection.get(filename, X12_SHANDEX_INCOMING / filename)
#new_filename = f"/Stash/Test/FromShandex/Archive/{filename}"
#sftp_connection.rename(filename, new_filename)
if __name__ == "__main__":
main()

3
readme.txt Normal file
View File

@ -0,0 +1,3 @@
867 -> X3 shipment import
-------------------------
create mass shipment records

7
ssh/id_ed25519 Normal file
View File

@ -0,0 +1,7 @@
-----BEGIN OPENSSH PRIVATE KEY-----
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW
QyNTUxOQAAACBV8x5ikQyA6ZgAO2BokIZB13CCFjiQIU8bHf8BXrkVLAAAAJj90RvK/dEb
ygAAAAtzc2gtZWQyNTUxOQAAACBV8x5ikQyA6ZgAO2BokIZB13CCFjiQIU8bHf8BXrkVLA
AAAEBIPcJFMSUHOcXD0G85tKPaaSaUfXoYz/pgoffs+Y4ul1XzHmKRDIDpmAA7YGiQhkHX
cIIWOJAhTxsd/wFeuRUsAAAAEHl1bWlkZGxld2FyZTIwMjMBAgMEBQ==
-----END OPENSSH PRIVATE KEY-----

1
ssh/id_ed25519.pub Normal file
View File

@ -0,0 +1 @@
ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIFXzHmKRDIDpmAA7YGiQhkHXcIIWOJAhTxsd/wFeuRUs yumiddleware2023

2
ssh/known_hosts Normal file
View File

@ -0,0 +1,2 @@
s-8ade4d252cc44c50b.server.transfer.us-west-1.amazonaws.com ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDIDQlEH17E193o9cOd0azlbI388A2GX4DzCWriLuj+BI+Jsi4Ij6oSKbsfKsqY0oThzdahxvafr1q1RxX4WN7yKGtQ+osOrXaSdSBOfejTJ9Wtr3DI4g6APoK4KX8luo7lYhmdVsNZtYdd2Wz7gIm4hsFtnSzrCyOvYMQ6mzvZQGGb+3V5Ce2wjYb0TjxdDdiacXXtbopVRuPAARqFz8hYMoKsZEyKuMekbErqiaC99ZZXtfmh9ZOJdSIF0N6loMWQaNtdLoyD1Xts3CDAcSg41wSfDYB3mtuIZEC/WNBj57RDuy93IsxH9z4Ak47cCrpChSpXp4pfajJS7W5g+Hyd
ftp.shandex.com ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDWht+ZlPHMwzohq4gnOTAAdtCt2u7BG/fdONXdvluMopWq8bwTKX0iKV5/7sRIIGimk1zIX19zDGR/5B9BbPFBRrxdKA915L830hj8omdo/ayA7pm/sDE7YdmSzUJ/akaO2KYDqBcpElr0Eb3gKaxy1oJGOR8zcLEffZpmYjMHKuCE6KqooCbn6326yRpl/fUhFK9QKLowIzBpeaQzGeNnGLON6j1bRPtObO0QYykdsb6mMF77ZKcf/kibnAtau2APC6xmDL3LDA6h5bwMs8nrC2Yg094dFPjvmC2FIbgiomtz8bfhLYsjDSE1JMNOUIbyoNvitWX5Zavtp70FnQcv