286 lines
10 KiB
Python
286 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Consume a 846 file from 3PLs, and translate into a
|
|
inventory comparison report
|
|
For Shadex we also need to reply with a 997
|
|
|
|
"""
|
|
import datetime
|
|
import pathlib
|
|
import re
|
|
import shutil
|
|
import typing
|
|
import smtplib
|
|
import pprint
|
|
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.application import MIMEApplication
|
|
from email.mime.text import MIMEText
|
|
|
|
import records # type: ignore
|
|
|
|
import yamamotoyama # type: ignore
|
|
import yamamotoyama.x3_imports # type: ignore
|
|
|
|
THIS_DIRECTORY = pathlib.Path(__file__).parent
|
|
X12_DIRECTORY = THIS_DIRECTORY / "incoming"
|
|
IMPORTS_DIRECTORY = THIS_DIRECTORY / "x3_imports"
|
|
EDI_997_DIRECTORY = THIS_DIRECTORY / "997_processing"
|
|
EDI_846_ATTACHMENTS = THIS_DIRECTORY / "846_reports"
|
|
EDI_846_ATTACHMENTS_ARCHIVE = EDI_846_ATTACHMENTS / "archive"
|
|
|
|
SHANDEX_846_FILENAME_RE = re.compile(
|
|
r"\A 846_STASH-YAMAMOTOYAMA_ \S+ [.]edi \Z", re.X | re.M | re.S
|
|
)
|
|
|
|
SHANDEX_STATUS = {
|
|
'33' : 'A',
|
|
'20' : 'R',
|
|
'QH' : 'Q'
|
|
}
|
|
|
|
|
|
def main():
|
|
"""
|
|
Do it!
|
|
"""
|
|
#read in the information from Shandex and store it
|
|
for edi_filename in X12_DIRECTORY.iterdir():
|
|
if SHANDEX_846_FILENAME_RE.match(edi_filename.name):
|
|
shandex_inventory=process_file(edi_filename)
|
|
# file moved to 997 processing folder to be sent later
|
|
shutil.move(edi_filename, EDI_997_DIRECTORY / edi_filename.name)
|
|
#get stock information about WON and store it
|
|
#pass date from EDI so we can subtract newer stock movements?
|
|
x3_won_inventory=get_x3_won_inventory()
|
|
#write out an excel file with the stock from Shandex with X3 next to it, then include anything missing
|
|
compare_inventory(shandex_inventory, x3_won_inventory)
|
|
stock_count_alert()
|
|
|
|
|
|
def compare_inventory(shandex_inventory, x3_inventory):
|
|
today = datetime.datetime.today()
|
|
today = today.strftime('%Y-%m-%d')
|
|
with open(EDI_846_ATTACHMENTS / f'inventory_comparison_{today}.csv', 'w', newline='') as outfile:
|
|
outfile.write(','.join(['Site','Item','Description','Lot','X3 qty','Shandex qty']))#header
|
|
outfile.write('\n')
|
|
for record in x3_inventory:
|
|
site = record["STOFCY_0"]
|
|
item = record["ITMREF_0"]
|
|
des = record["ITMDES1_0"]
|
|
lot = record["LOT_0"]
|
|
qty = str(record["QTY"])
|
|
outfile.write(','.join([site, item, des, lot, qty]))
|
|
outfile.write(',')
|
|
if item in shandex_inventory:
|
|
if lot in shandex_inventory[item]:
|
|
outfile.write(str(shandex_inventory[item][lot]))
|
|
# pprint.pprint(shandex_inventory[item])
|
|
del shandex_inventory[item][lot]
|
|
# pprint.pprint(shandex_inventory[item])
|
|
if len(shandex_inventory[item]) == 0:
|
|
# pprint.pprint('entire del')
|
|
# pprint.pprint(shandex_inventory[item])
|
|
del shandex_inventory[item]
|
|
# pprint.pprint(shandex_inventory[item])
|
|
else:
|
|
outfile.write('0')#lot not found
|
|
else:
|
|
outfile.write('0')#item not found
|
|
outfile.write('\n')
|
|
#write the rest of shandex inventory
|
|
if len(shandex_inventory) > 0:
|
|
outfile.write('Shandex only')
|
|
outfile.write('\n')
|
|
for item in shandex_inventory:
|
|
# pprint.pprint(item)
|
|
for lot in shandex_inventory[item]:
|
|
# pprint.pprint(lot)
|
|
qty = str(shandex_inventory[item][lot])
|
|
outfile.write(','.join([item, lot, qty]))
|
|
outfile.write('\n')
|
|
|
|
|
|
def tokens_from_edi_file(
|
|
edi_filename: pathlib.Path,
|
|
) -> typing.Iterator[typing.List[str]]:
|
|
"""
|
|
Read tokens from EDI file
|
|
"""
|
|
with edi_filename.open(encoding="utf-8", newline="") as edi_file:
|
|
for record in edi_file.read().split("~"):
|
|
fields = record.split("*")
|
|
if fields[0] in {
|
|
"ISA",
|
|
"GS"
|
|
"ST",
|
|
"RED",
|
|
"GE",
|
|
"IEA"
|
|
}:
|
|
continue
|
|
yield fields
|
|
|
|
|
|
def get_x3_won_inventory():
|
|
#TODO correct the dates used in stock_issues?
|
|
with yamamotoyama.get_connection() as db_connection:
|
|
return db_connection.query(
|
|
"""
|
|
with stock_issues as (
|
|
select
|
|
STJ.STOFCY_0,
|
|
STJ.ITMREF_0,
|
|
STJ.LOT_0,
|
|
sum(STJ.QTYSTU_0) [QTYSTU_0]
|
|
from PROD.STOJOU STJ
|
|
where
|
|
STJ.STOFCY_0 = 'WON'
|
|
and STJ.IPTDAT_0 between getdate()-1 and getdate()
|
|
group by
|
|
STJ.STOFCY_0,
|
|
STJ.ITMREF_0,
|
|
STJ.LOT_0
|
|
)
|
|
select
|
|
SLF.STOFCY_0,
|
|
SLF.ITMREF_0,
|
|
ITM.ITMDES1_0,
|
|
SLF.LOT_0,
|
|
cast(sum(SLF.AAACUMQTY_0 + SLF.QQQCUMQTY_0 + SLF.RRRCUMQTY_0 - coalesce(stock_issues.QTYSTU_0,0)) as integer) as QTY,
|
|
SLF.AVC_0
|
|
from PROD.STOLOTFCY SLF
|
|
join PROD.ITMMASTER ITM on
|
|
SLF.ITMREF_0 = ITM.ITMREF_0
|
|
left join stock_issues
|
|
on SLF.ITMREF_0 = stock_issues.ITMREF_0
|
|
and SLF.STOFCY_0 = stock_issues.STOFCY_0
|
|
and SLF.LOT_0 = stock_issues.LOT_0
|
|
where
|
|
SLF.STOFCY_0 = 'WON'
|
|
group by
|
|
SLF.STOFCY_0,
|
|
SLF.ITMREF_0,
|
|
ITM.ITMDES1_0,
|
|
SLF.LOT_0,
|
|
SLF.AVC_0
|
|
order by 1, 2
|
|
""",
|
|
#startdate=edi_date,
|
|
).all()
|
|
|
|
def gtin_lookup(gtin):
|
|
with yamamotoyama.get_connection() as db_connection:
|
|
itmref = db_connection.query(
|
|
"""
|
|
select
|
|
[ITM].[ITMREF_0],
|
|
[ITM].[ITMDES1_0],
|
|
[ITM].[EANCOD_0],
|
|
[ITM].[ZCASEUPC_0]
|
|
from [PROD].[ITMMASTER] [ITM]
|
|
join [PROD].[ITMFACILIT] [ITF]
|
|
on [ITM].[ITMREF_0] = [ITF].[ITMREF_0]
|
|
and [ITF].[STOFCY_0] = 'WON'
|
|
where
|
|
replace([ITM].[ZCASEUPC_0],' ','') = :zcaseupc
|
|
""",
|
|
zcaseupc=gtin,
|
|
).first()
|
|
if itmref is None:
|
|
itmref = db_connection.query(
|
|
"""
|
|
select
|
|
[ITM].[ITMREF_0],
|
|
[ITM].[ITMDES1_0],
|
|
[ITM].[EANCOD_0],
|
|
[ITM].[ZCASEUPC_0]
|
|
from [PROD].[ITMMASTER] [ITM]
|
|
join [PROD].[ITMFACILIT] [ITF]
|
|
on [ITM].[ITMREF_0] = [ITF].[ITMREF_0]
|
|
and [ITF].[STOFCY_0] = 'WON'
|
|
where
|
|
replace([ITM].[EANCOD_0],' ','') = :zcaseupc
|
|
""",
|
|
zcaseupc=gtin,
|
|
).first()["ITMREF_0"]
|
|
else:
|
|
itmref = itmref["ITMREF_0"]
|
|
return itmref
|
|
|
|
|
|
def stock_count_alert():
|
|
msg = MIMEMultipart()
|
|
msg['Subject'] = 'New Stock Count from Shandex'
|
|
msg['Precedence'] = 'bulk'
|
|
msg['From'] = 'x3report@stashtea.com'
|
|
msg['To'] = 'isenn@yamamotoyama.com,vgomez@yamamotoyama.com'
|
|
msg['Cc'] = 'bleeson@stashtea.com'
|
|
emailtext = f'Attached.'
|
|
msg.attach(MIMEText(emailtext, 'plain'))
|
|
for file in EDI_846_ATTACHMENTS.iterdir():
|
|
if file.name.endswith('.csv'):
|
|
part = MIMEApplication(open(file, 'rb').read())
|
|
part['Content-Disposition'] = f'attachment; filename="{file.name}"'
|
|
msg.attach(part)
|
|
shutil.move(file, EDI_846_ATTACHMENTS_ARCHIVE / file.name)
|
|
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
|
|
smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
|
|
smtp.send_message(msg)
|
|
|
|
|
|
def process_file(edi_filename: pathlib.Path):
|
|
"""
|
|
Convert a specific EDI file into an import file.
|
|
"""
|
|
shandex_inventory = {} #all inventory
|
|
product = ''
|
|
lot = ''
|
|
qty = 0
|
|
for fields in tokens_from_edi_file(edi_filename):
|
|
if fields[0] == "BIA":
|
|
advice_date = fields[4]
|
|
if fields[0] == 'LIN':
|
|
if product != '': #check loop entry
|
|
if product not in shandex_inventory: #if we haven't seen the product yet add it
|
|
# pprint.pprint('product was not found')
|
|
shandex_inventory[product] = {lot : qty}
|
|
# pprint.pprint(shandex_inventory)
|
|
else: #we've seen this product, have we seen the lot
|
|
if lot not in shandex_inventory[product]:#if not, add it
|
|
shandex_inventory[product][lot] = qty
|
|
else:
|
|
shandex_inventory[product][lot] += qty #if we have add to it
|
|
# pprint.pprint('product: ' + product)
|
|
# pprint.pprint('lot: ' + lot)
|
|
# pprint.pprint('qty: ' + str(qty))
|
|
#LIN**SK*077652972160*LT*31052026A
|
|
gtin = fields[3]
|
|
lot = fields[5]
|
|
product = gtin_lookup(gtin)
|
|
qty = 0
|
|
if fields[0] == "QTY":#product should already exist
|
|
# QTY*33*0
|
|
# QTY*20*16
|
|
# QTY*QH*0
|
|
qty += int(fields[2])
|
|
if fields[0] == "SE":#end of file
|
|
# pprint.pprint('final add')
|
|
# pprint.pprint(shandex_inventory)
|
|
# pprint.pprint('product: ' + product)
|
|
# pprint.pprint('lot: ' + lot)
|
|
# pprint.pprint('qty: ' + str(qty))
|
|
if product is not None: #check loop entry
|
|
if product not in shandex_inventory: #if we haven't seen the product yet add it
|
|
shandex_inventory[product] = {lot : qty}
|
|
else: #we've seen this product, have we seen the lot
|
|
if lot not in shandex_inventory[product]:#if not, add it
|
|
shandex_inventory[product][lot] = qty
|
|
else:
|
|
shandex_inventory[product][lot] += qty #if we have add to it
|
|
return shandex_inventory
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|