Fixes for move to production.

master
bleeson 2024-03-20 09:42:04 -07:00
parent f3782aede4
commit 150b26c58a
9 changed files with 689 additions and 39 deletions

565
edi_846.py Normal file
View File

@ -0,0 +1,565 @@
#!/usr/bin/env python3
"""
Consume a 846 file from 3PLs, and translate into a
inventory comparison report that could be used as a stock count?
For Shadex we also need to reply with a 997
947 is warehouse advice, to alert us of damages or amount changes from something
like a count
"""
#what about serial numbers?
#status on L line?
#remove negative line, needs to look up stocou? not sure.
# pylint: disable=too-many-instance-attributes
import dataclasses
import datetime
import decimal
import functools
import pathlib
import re
import shutil
import typing
import smtplib
import pprint
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import records # type: ignore
import yamamotoyama # type: ignore
import yamamotoyama.x3_imports # type: ignore
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "incoming"
IMPORTS_DIRECTORY = THIS_DIRECTORY / "x3_imports"
EDI_997_DIRECTORY = THIS_DIRECTORY / "997_processing"
SHANDEX_846_FILENAME_RE = re.compile(
r"\A 846_YAMAMOTOYAMA_ \S+ [.]edi \Z", re.X | re.M | re.S
)
SHANDEX_STATUS = {
'33' : 'A',
'20' : 'R',
'QH' : 'Q'
}
def main():
"""
Do it!
"""
for edi_filename in X12_DIRECTORY.iterdir():
if SHANDEX_846_FILENAME_RE.match(edi_filename.name):
process_file(edi_filename)
# file moved to 997 processing folder to be sent later
#shutil.move(edi_filename, EDI_997_DIRECTORY / edi_filename.name)
def tokens_from_edi_file(
edi_filename: pathlib.Path,
) -> typing.Iterator[typing.List[str]]:
"""
Read tokens from EDI file
"""
with edi_filename.open(encoding="utf-8", newline="") as edi_file:
for record in edi_file.read().split("~"):
fields = record.split("*")
if fields[0] in {
"ISA",
"GS"
"N1",
"N9",
"SE",
"GE",
"IEA"
}:
continue
yield fields
def gtin_lookup(gtin):
with yamamotoyama.get_connection('test') as db_connection:#todo remove 'test'
return db_connection.query(
"""
select
[ITM].[ITMREF_0],
[ITM].[ITMDES1_0],
[ITM].[EANCOD_0],
[ITM].[ZCASEUPC_0]
from [FY23TEST].[ITMMASTER] [ITM]--TODO change back to [PROD]
join [FY23TEST].[ITMFACILIT] [ITF]
on [ITM].[ITMREF_0] = [ITF].[ITMREF_0]
and [ITF].[STOFCY_0] = 'WON'
where
[ITM].[ZCASEUPC_0] = :zcaseupc
""",
zcaseupc=gtin,
).first()["ITMREF_0"]
def stock_movement_alert(itmref, qty, lot, status):
msg = MIMEMultipart()
msg['Subject'] = 'New Stock Change from Shandex'
msg['Precedence'] = 'bulk'
msg['From'] = 'x3report@stashtea.com'
msg['To'] = 'bleeson@stashtea.com'#TODO correct receipientscares
emailtext = f'Item: {itmref}\nQty: {qty}\nLot: {lot}\nStatus: {DAMAGE_CODE_MAPPING[status]}\nReason: {DAMAGE_CODE_DESCRIPTIONS_MAPPING[status]}'
msg.attach(MIMEText(emailtext, 'plain'))
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
smtp.send_message(msg)
def process_file(edi_filename: pathlib.Path):
"""
Convert a specific EDI file into an import file.
"""
warehouse_stockchange = StockChange()
vcrlin = 0
for fields in tokens_from_edi_file(edi_filename):
if fields[0] == "G62":
iptdat = fields[2]
warehouse_stockchange.header.iptdat = datetime.datetime.strptime(
iptdat, "%Y%m%d"
).date()
if fields[0] == 'W15':
transaction_number = fields[2]
if fields[0] == "W19":
vcrlin += 1000
# W19*AV*35*CA**UK*10077652082651***03022026C
_, status, qty, uom, _, _, gtin, _, _, lot = fields[:10]
product = gtin_lookup(gtin)
stock_movement_alert(product, qty, lot, status)
if status in EMAIL_ONLY_CODES:
return
warehouse_stockchange.header.vcrdes = DAMAGE_CODE_DESCRIPTIONS_MAPPING[status]
subdetail = StockChangeSubDetail(
qtypcu=int(qty),
qtystu=int(qty),
sta=DAMAGE_CODE_MAPPING[status],
pcu=uom
)
detail_line = StockChangeDetail(
vcrlin=vcrlin,
itmref=product,
pcu=uom,
sta=DAMAGE_CODE_SOURCE_MAPPING[status],
lot=lot
)
warehouse_stockchange.append(
detail_line,
subdetail,
)
time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
with yamamotoyama.x3_imports.open_import_file(
IMPORTS_DIRECTORY / f"ZSCS_{transaction_number}_{time_stamp}.dat"
) as import_file:
warehouse_stockchange.output(import_file)
@dataclasses.dataclass
class StockChangeSubDetail:
"""
Information that goes onto a stockchange sub-detail line, taken from ZPTHI template.
"""
pcu: str = ""
qtypcu: int = 0
qtystu: int = 0
loc: str = ""
sta: str = "A"
# def stojous(self, shipment, item) -> typing.List[str]:
# """
# Convert grouped lot quantities into individual STOJOU records to fit on stockchange
# """
# with yamamotoyama.get_connection('test') as database: #todo remove 'test'
# details = (
# database.query(
# """
# select
# 'S',
# 'A',
# [STJ].[PCU_0],
# cast(cast(-1*[STJ].[QTYSTU_0] as int) as nvarchar),
# [STJ].[LOT_0],
# '',
# ''
# from [FY23TEST].[STOJOU] [STJ] --TODO change to PROD
# where
# [STJ].[VCRNUM_0] = :sdhnum
# and [STJ].[ITMREF_0] = :itmref
# and [STJ].[LOT_0] = :lot
# and [STJ].[TRSTYP_0] = 4
# """,
# sdhnum=shipment,
# itmref=item,
# lot=self.lot,
# )
# .all()
# )
# return details
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to strings for X3 import writing.
"""
def fix_uom(uom):
x3_uom = ''
if uom == 'CA':
x3_uom = 'CS'
else:
x3_uom = uom
return x3_uom
return yamamotoyama.x3_imports.convert_to_strings(
[
"S",
fix_uom(self.pcu),
self.qtypcu,
self.qtystu,
self.loc,
self.sta,
]
)
@dataclasses.dataclass
class StockChangeDetail:
"""
Information that goes on a stockchange detail line, taken from ZPTHI template.
"""
vcrlin: int = 0
itmref: str = ""
pcu: str = ""
pcustucoe: int = 1 #does this need a lookup?
sta: str = "A" #todo this needs to flip based on the transaction A > R, A > Q, what about Q > A?
loctyp: str = ""
loc: str = ""
lot: str = ""
slo: str = ""
sernum: str = ""
palnum: str = ""
ctrnum: str = ""
qlyctldem: str= ""
owner: str = "WON"
subdetails: typing.List[StockChangeSubDetail] = dataclasses.field(
default_factory=list
)
def palnum_lookup(self, itmref, lot, status):
"""
Pick a palnum from X3 using the lot, location, and status
It matters which one we use, best attempt is to get the largest one available
"""
with yamamotoyama.get_connection('test') as db_connection:#todo remove 'test'
return db_connection.query(
"""
select top 1
[STO].[STOFCY_0],
[STO].[ITMREF_0],
[STO].[LOT_0],
[STO].[PALNUM_0],
[STO].[QTYSTU_0]
from [FY23TEST].[STOCK] [STO] --TODO change to PROD
where
[STO].[ITMREF_0] = :itmref
and [STO].[STOFCY_0] = 'WON'
and [STO].[LOT_0] = :lot
and [STO].[STA_0] = :status
order by
[STO].[QTYSTU_0] desc
""",
itmref=itmref,
lot=lot,
status=status
).first()["PALNUM_0"]
def gtin_lookup(self, gtin):
with yamamotoyama.get_connection('test') as db_connection:#todo remove 'test'
return db_connection.query(
"""
select
[ITM].[ITMREF_0],
[ITM].[ITMDES1_0],
[ITM].[EANCOD_0],
[ITM].[ZCASEUPC_0]
from [FY23TEST].[ITMMASTER] [ITM]--TODO change back to [PROD]
join [FY23TEST].[ITMFACILIT] [ITF]
on [ITM].[ITMREF_0] = [ITF].[ITMREF_0]
and [ITF].[STOFCY_0] = 'WON'
where
[ITM].[ZCASEUPC_0] = :zcaseupc
""",
zcaseupc=gtin,
).first()["ITMREF_0"]
def append(self, subdetail: StockChangeSubDetail):
"""
Add subdetail
"""
subdetail.pcu = self.pcu
self.subdetails.append(subdetail)
def check_subdetail_qty(self):
"""
Check for shortages by totaling up subdetail quantities.
"""
total_cases = 0
for subdetail in self.subdetails:
total_cases -= subdetail.qtypcu
return abs(total_cases)
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to strings for X3 import writing.
"""
def fix_uom(uom):
x3_uom = ''
if uom == 'CA':
x3_uom = 'CS'
else:
x3_uom = uom
return x3_uom
self.qty = self.check_subdetail_qty()
return yamamotoyama.x3_imports.convert_to_strings(
[
"L",
self.vcrlin,
self.itmref,
fix_uom(self.pcu),
self.pcustucoe,
self.sta,
self.loctyp,
self.loc,
self.lot,
self.slo,
self.sernum,
self.palnum_lookup(self.itmref, self.lot, self.sta),
self.ctrnum,
self.qlyctldem,
self.owner
]
)
def __eq__(self, item: typing.Any) -> bool:
"""
Test for equality
"""
if isinstance(item, str):
return self.itmref == item
if isinstance(item, StockChangeDetail):
return self.itmref == item.itmref
return False
# def fill(self):#not needed for stockchanges
# """
# Set soplin & itmdes from itmref & sohnum
# """
# def get() -> records.Record:
# with yamamotoyama.get_connection() as database:
# how_many = (
# database.query(
# """
# select
# count(*) as [how_many]
# from [PROD].[SORDERP] as [SOP]
# where
# [SOP].[SOHNUM_0] = :sohnum
# and [SOP].[ITMREF_0] = :itmref
# """,
# sohnum=self.sohnum,
# itmref=self.itmref,
# )
# .first()
# .how_many
# )
# if how_many == 1:
# return database.query(
# """
# select top 1
# [SOP].[SOPLIN_0]
# ,[SOP].[ITMDES1_0]
# ,[SOP].[SAU_0]
# from [PROD].[SORDERP] as [SOP]
# where
# [SOP].[SOHNUM_0] = :sohnum
# and [SOP].[ITMREF_0] = :itmref
# order by
# [SOP].[SOPLIN_0]
# """,
# sohnum=self.sohnum,
# itmref=self.itmref,
# ).first()
# result = get()
# self.soplin = result.SOPLIN_0
# self.itmdes = result.ITMDES1_0
# self.sau = result.SAU_0
@dataclasses.dataclass
class StockChangeHeader:
"""
Information that goes on a stockchange header, taken from ZSCS template.
"""
vcrnum: str = ""
stofcy: str = "WON"
iptdat: datetime.date = datetime.date(1753, 1, 1)
vcrdes: str = ""
pjt: str = ""
trsfam: str = "CHX"
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to X3 import line
"""
return yamamotoyama.x3_imports.convert_to_strings(
[
"E",
self.vcrnum,
self.stofcy,
self.iptdat.strftime("%Y%m%d"),
self.vcrdes,
self.pjt,
self.trsfam,
]
)
class StockChangeDetailList:
"""
List of stockchange details
"""
_details: typing.List[StockChangeDetail]
_item_set: typing.Set[str]
def __init__(self):
self._details = []
self._item_set = set()
def append(
self,
stockchange_detail: StockChangeDetail,
stockchange_subdetail: StockChangeSubDetail,
):
"""
Append
"""
itmref = stockchange_detail.itmref
if itmref in self._item_set:
for detail in self._details:
if detail == itmref:
detail.subdetails.append(stockchange_subdetail)
return
self._item_set.add(itmref)
#stockchange_detail.fill()
stockchange_detail.append(stockchange_subdetail)
self._details.append(stockchange_detail)
def __iter__(self):
return iter(self._details)
class StockChange:
"""
Warehouse stockchange, both header & details
"""
header: StockChangeHeader
details: StockChangeDetailList
_sdhnum: str
def __init__(self):
self.header = StockChangeHeader()
self._sdhnum = ""
self.details = StockChangeDetailList()
def append(
self,
stockchange_detail: StockChangeDetail,
stockchange_subdetail: StockChangeSubDetail,
):
"""
Add detail information.
"""
self.details.append(stockchange_detail, stockchange_subdetail)
@property
def sdhnum(self):
"""
shipment number
"""
return self._sdhnum
@sdhnum.setter
def sdhnum(self, value: str):
if self._sdhnum != value:
self._sdhnum = value
if value:
self._fill_info_from_shipment()
# def _get_shipment_from_x3(self) -> records.Record:
# """
# Fetch shipment from X3 database.
# """
# with yamamotoyama.get_connection('test') as db_connection:#todo remove 'test'
# return db_connection.query(
# """
# select
# [SDH].[STOFCY_0],
# [SDH].[SDHNUM_0],
# [SDH].[SALFCY_0],
# [SDH].[BPCORD_0],
# [SDH].[CUR_0],
# [SDH].[SOHNUM_0]
# from [FY23TEST].[SDELIVERY] [SDH]--TODO change back to [PROD]
# where
# [SDH].[SDHNUM_0] = :shipment
# """,
# shipment=self.sdhnum,
# ).first()
# def _fill_info_from_shipment(self):
# """
# When we learn the SOHNUM, we can copy information from the sales order.
# """
# result = self._get_shipment_from_x3()
# self.header.stofcy = result.STOFCY_0
# self.header.sdhnum = result.SDHNUM_0
# self.header.salfcy = result.SALFCY_0
# self.header.bpcord = result.BPCORD_0
# self.header.cur = result.CUR_0
# self.header.sohnum = result.SOHNUM_0
def output(self, import_file: typing.TextIO):
"""
Output entire order to import_file.
"""
output = functools.partial(
yamamotoyama.x3_imports.output_with_file, import_file
)
output(self.header.convert_to_strings())
for detail in self.details:
output(detail.convert_to_strings())
for subdetail in detail.subdetails:
#shipment = detail.sdhnum
#item = detail.itmref
output(subdetail.convert_to_strings())
# for record in subdetail.stojous(shipment, item):
# output(subdetail.convert_to_strings())
# output(record)
if __name__ == "__main__":
main()

View File

@ -40,9 +40,8 @@ def main():
for edi_filename in X12_DIRECTORY.iterdir():
if SOURCE_867_FILENAME_RE.match(edi_filename.name):
process_file(edi_filename)
#TODO respond with 997?
shutil.copy(edi_filename, EDI_997_DIRECTORY / edi_filename.name)
shutil.move(edi_filename, THIS_DIRECTORY / "processed_867s" / edi_filename.name)
shutil.move(edi_filename, THIS_DIRECTORY / "processed_867s" / edi_filename.name) #They go in here so we can use them in the dashboard script
combine_zship867s()

View File

@ -15,8 +15,6 @@ import pprint
import records
import yamamotoyama # type:ignore
from edi_940 import X12_DIRECTORY, X12
SITE_MAPPING = {
'WNJ' : 'SOURCELOGISTICS',
@ -35,8 +33,6 @@ SHIPPING_CODE_MAPPING = {
}
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_SOURCELOGISTICS = THIS_DIRECTORY / "outgoing"
X12_SHANDEX = THIS_DIRECTORY / "outgoing"
# X12_SOURCELOGISTICS = THIS_DIRECTORY / "edi-testing" #test directories
@ -52,6 +48,52 @@ def main():
write_943(database, shipment)
class X12:
"""
X12 format parent class.
"""
@staticmethod
def line(items: typing.List[str]) -> str:
"""
Return X12 EDI line with * and ~
"""
def sanitize(thing: str) -> str:
for bad_character in ("*", "/", "&", "#", ","):
thing = thing.replace(bad_character, "")
return thing
return "*".join([sanitize(item) for item in items]) + "~"
def x12(self) -> str:
"""
X12 format.
"""
raise NotImplementedError
@staticmethod
def control_number() -> int:
"""
Next EDI serial number
"""
filepath = pathlib.Path(__file__).with_suffix(".remember")
encoding = "utf-8"
newline = "\n"
try:
with filepath.open(
"r", encoding=encoding, newline=newline
) as remember_file:
number = int(remember_file.readline().rstrip("\n"))
except (OSError, ValueError):
number = 0
number += 1
with filepath.open("w", encoding=encoding, newline=newline) as remember_file:
remember_file.write(f"{number}\n")
return number
def write_943(database: records.Connection, shipment: str):
"""
Write out a 943 to a file

View File

@ -38,6 +38,30 @@ def main():
process_file(edi_filename)
# file moved to 997 processing folder to be sent later
shutil.move(edi_filename, EDI_997_DIRECTORY / edi_filename.name)
combine_zpthis()
def combine_zpthis():
"""
Collect all ZPTHI imports into a single file for easy import.
"""
archive_directory = IMPORTS_DIRECTORY / "archive"
archive_directory.mkdir(exist_ok=True)
with (IMPORTS_DIRECTORY / "ZPTHI.dat").open(
"w", encoding="utf-8", newline="\n"
) as combined_import_file:
for individual_import_filename in IMPORTS_DIRECTORY.glob(
"ZPTHI_*.dat"
):
with individual_import_filename.open(
"r", encoding="utf-8", newline="\n"
) as individual_import_file:
for line in individual_import_file:
combined_import_file.write(line)
shutil.move(
individual_import_filename,
archive_directory / individual_import_filename.name,
)
def tokens_from_edi_file(

View File

@ -302,13 +302,13 @@ class StockChangeDetail:
default_factory=list
)
def palnum_lookup(self, itmref, lot, status):
def palnum_lookup(self, itmref, lot, status):#TODO prevent the crash when we don't have the lot supplied in X3
"""
Pick a palnum from X3 using the lot, location, and status
It matters which one we use, best attempt is to get the largest one available
"""
with yamamotoyama.get_connection('test') as db_connection:#todo remove 'test'
return db_connection.query(
result = db_connection.query(
"""
select top 1
[STO].[STOFCY_0],
@ -328,7 +328,11 @@ class StockChangeDetail:
itmref=itmref,
lot=lot,
status=status
).first()["PALNUM_0"]
).first()
if result:
return result["PALNUM_0"]
else:
raise NotImplementedError
def gtin_lookup(self, gtin):
with yamamotoyama.get_connection('test') as db_connection:#todo remove 'test'
@ -378,6 +382,7 @@ class StockChangeDetail:
return x3_uom
self.qty = self.check_subdetail_qty()
self.palnum = self.palnum_lookup(self.itmref, self.lot, self.sta)
return yamamotoyama.x3_imports.convert_to_strings(
[
"L",
@ -391,7 +396,7 @@ class StockChangeDetail:
self.lot,
self.slo,
self.sernum,
self.palnum_lookup(self.itmref, self.lot, self.sta),
self.palnum,
self.ctrnum,
self.qlyctldem,
self.owner
@ -614,3 +619,4 @@ class StockChange:
if __name__ == "__main__":
main()

View File

@ -2,6 +2,7 @@
"""
Consume a generic 997 file from 3PLs
Functional Acknowledgment
We don't do anything with these, code is holding old skeleton
"""
# pylint: disable=too-many-instance-attributes
import dataclasses

View File

@ -9,6 +9,7 @@ import datetime
import io
import pathlib
import typing
import shutil
import pprint
import records # type: ignore
@ -16,26 +17,26 @@ import yamamotoyama # type: ignore
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "edi-testing" #TODO change back to "outgoing"
X12_DIRECTORY = THIS_DIRECTORY / "outgoing"
INCOMING_ARCHIVE_DIRECTORY = THIS_DIRECTORY / "incoming"
ACK_DIRECTORY = THIS_DIRECTORY / "997_processing"
TEST_FILE = THIS_DIRECTORY / "edi-testing" / "944_YAMAMOTOYAMA_765aaebb-06c4-4eea-8d2a-7dddf2fd9ec2.edi"#TODO remove this
AK1_MAPPING = {
"944" : "RE",
"945" : "SW",
"947" : "AW",
"846" : "IB",
"867" : "PT",
}
def main():
"""
Do it!
"""
#write_997(TEST_FILE)
#TODO loop through 997 directory and send for each file inside
for edi_filename in ACK_DIRECTORY.iterdir(): #TODO uncomment and review
process_file(edi_filename)
# file moved to 997 processing folder to be sent later
shutil.move(edi_filename, X12_DIRECTORY / "outgoing" / "archive" / edi_filename.name)
for edi_filename in ACK_DIRECTORY.iterdir():
write_997(edi_filename)
shutil.move(edi_filename, INCOMING_ARCHIVE_DIRECTORY / "archive" / edi_filename.name)
def write_997(edi_filename: pathlib.Path):

View File

@ -12,8 +12,9 @@ import edi_944
import edi_947
import edi_846
import edi_867
import edi_997_outbound
import update_shandex_dashboard
#import edi_943multi3pl #TODO remove 940 from this file
import edi_943
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_SHANDEX_OUTGOING = THIS_DIRECTORY / "outgoing"
@ -25,7 +26,7 @@ def main():
Do it!
"""
#pick up files from Shandex
retrieve_x12_edi_files_shandex()#TODO turn on archiving
retrieve_x12_edi_files_shandex()
#process all EDIs that started with Shandex
edi_997_inbound.main()
@ -35,11 +36,11 @@ def main():
edi_867.main()
# process all EDIs that start with us
#edi_943-multi3pl.main()#TODO make this file Shandex only
#edi_997_outbound
edi_943.main()
edi_997_outbound.main()
#send them to Shandex
#send_x12_edi_files_shandex()#TODO set this up
send_x12_edi_files_shandex()#TODO production changes
#update dashboard
# update_shandex_dashboard.main()
@ -60,6 +61,7 @@ def send_x12_edi_files_shandex():
"""
Connect to FTP & send files.
"""
with paramiko.SSHClient() as ssh_client:
ssh_client.load_system_host_keys()
ssh_client.load_host_keys(SSH_KNOWN_HOSTS_FILE)
ssh_client.set_missing_host_key_policy(paramiko.client.RejectPolicy)
@ -86,8 +88,8 @@ def retrieve_x12_edi_files_shandex():
for filename in sftp_connection.listdir():
if filename.endswith(".edi"):
sftp_connection.get(filename, X12_SHANDEX_INCOMING / filename)
#new_filename = f"/Stash/Test/FromShandex/Archive/{filename}"
#sftp_connection.rename(filename, new_filename)
new_filename = f"/Stash/Test/FromShandex/Archive/{filename}"
sftp_connection.rename(filename, new_filename)
if __name__ == "__main__":

View File

@ -1,3 +1,13 @@
867 -> X3 shipment import
-------------------------
create mass shipment records
Process EDI documents between YU and Shandex
997 Functional acknowledgment Tell Shandex we received an EDI
944 Stock transfer receipt Receive qty and lots from replenishment shipment
947 Inventory advice Tell Stash of Q and R status inventory
846 Inventory inquiry Inventory comparison
867 Resale report Tell Stash qty, price, and lot sold
943 Stock transfer Tell Shandex qty and lots being shipped
master_controller takes EDI files from the Shandex FTP, processes them, and loads our EDI files.
Sales come in via 867 and the shandex_dashboard script stores which files have been received.
The corresponding control numbers can be matched to X3 Sales Deliveries in the YLICPLATE_0 field.