shandex_edi_2024/edi_846.py

333 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Consume a 846 file from 3PLs, and translate into a
inventory comparison report
For Shadex we also need to reply with a 997
"""
import datetime
import pathlib
import re
import shutil
import typing
import smtplib
import pprint
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from email.mime.text import MIMEText
import os
import base64
import google.auth
import pickle
# Gmail API utils
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import records # type: ignore
import yamamotoyama # type: ignore
import yamamotoyama.x3_imports # type: ignore
SCOPES = ['https://mail.google.com/']
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "incoming"
IMPORTS_DIRECTORY = THIS_DIRECTORY / "x3_imports"
EDI_997_DIRECTORY = THIS_DIRECTORY / "997_processing"
EDI_846_ATTACHMENTS = THIS_DIRECTORY / "846_reports"
EDI_846_ATTACHMENTS_ARCHIVE = EDI_846_ATTACHMENTS / "archive"
SHANDEX_846_FILENAME_RE = re.compile(
r"\A 846_STASH-YAMAMOTOYAMA_ \S+ [.]edi \Z", re.X | re.M | re.S
)
SHANDEX_STATUS = {
'33' : 'A',
'20' : 'R',
'QH' : 'Q'
}
def main():
"""
Do it!
"""
#read in the information from Shandex and store it
for edi_filename in X12_DIRECTORY.iterdir():
if SHANDEX_846_FILENAME_RE.match(edi_filename.name):
pprint.pprint(edi_filename.name)
shandex_inventory=process_file(edi_filename)
# file moved to 997 processing folder to be sent later
shutil.move(edi_filename, EDI_997_DIRECTORY / edi_filename.name)
#get stock information about WON and store it
#pass date from EDI so we can subtract newer stock movements?
x3_won_inventory=get_x3_won_inventory()
#write out an excel file with the stock from Shandex with X3 next to it, then include anything missing
compare_inventory(shandex_inventory, x3_won_inventory)
stock_count_alert()
def gmail_authenticate():
creds = None
# the file token.pickle stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first time
if os.path.exists("token.pickle"):
with open("token.pickle", "rb") as token:
creds = pickle.load(token)
# if there are no (valid) credentials availablle, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
# save the credentials for the next run
with open("token.pickle", "wb") as token:
pickle.dump(creds, token)
return build('gmail', 'v1', credentials=creds)
def gmail_send_message(service, payload):
create_message = {"raw": payload}
# pylint: disable=E1101
send_message = (
service.users()
.messages()
.send(userId="me", body=create_message)
.execute()
)
return send_message
def compare_inventory(shandex_inventory, x3_inventory):
today = datetime.datetime.today()
today = today.strftime('%Y-%m-%d')
with open(EDI_846_ATTACHMENTS / f'inventory_comparison_{today}.csv', 'w', newline='') as outfile:
outfile.write(','.join(['Site','Item','Description','Lot','X3 qty','Shandex qty']))#header
outfile.write('\n')
for record in x3_inventory:
site = record["STOFCY_0"]
item = record["ITMREF_0"]
des = record["ITMDES1_0"]
lot = record["LOT_0"]
qty = str(record["QTY"])
outfile.write(','.join([site, item, des, lot, qty]))
outfile.write(',')
if item in shandex_inventory:
if lot in shandex_inventory[item]:
outfile.write(str(shandex_inventory[item][lot]))
# pprint.pprint(shandex_inventory[item])
del shandex_inventory[item][lot]
# pprint.pprint(shandex_inventory[item])
if len(shandex_inventory[item]) == 0:
# pprint.pprint('entire del')
# pprint.pprint(shandex_inventory[item])
del shandex_inventory[item]
# pprint.pprint(shandex_inventory[item])
else:
outfile.write('0')#lot not found
else:
outfile.write('0')#item not found
outfile.write('\n')
#write the rest of shandex inventory
if len(shandex_inventory) > 0:
outfile.write('Shandex only')
outfile.write('\n')
for item in shandex_inventory:
# pprint.pprint(item)
for lot in shandex_inventory[item]:
# pprint.pprint(lot)
qty = str(shandex_inventory[item][lot])
outfile.write(','.join([item, lot, qty]))
outfile.write('\n')
def tokens_from_edi_file(
edi_filename: pathlib.Path,
) -> typing.Iterator[typing.List[str]]:
"""
Read tokens from EDI file
"""
with edi_filename.open(encoding="utf-8", newline="") as edi_file:
for record in edi_file.read().split("~"):
fields = record.split("*")
if fields[0] in {
"ISA",
"GS"
"ST",
"RED",
"GE",
"IEA"
}:
continue
yield fields
def get_x3_won_inventory():
#TODO correct the dates used in stock_issues?
with yamamotoyama.get_connection() as db_connection:
return db_connection.query(
"""
with stock_issues as (
select
STJ.STOFCY_0,
STJ.ITMREF_0,
STJ.LOT_0,
sum(STJ.QTYSTU_0) [QTYSTU_0]
from PROD.STOJOU STJ
where
STJ.STOFCY_0 = 'WON'
and STJ.IPTDAT_0 between getdate()-1 and getdate()
group by
STJ.STOFCY_0,
STJ.ITMREF_0,
STJ.LOT_0
)
select
SLF.STOFCY_0,
SLF.ITMREF_0,
ITM.ITMDES1_0,
SLF.LOT_0,
cast(sum(SLF.AAACUMQTY_0 + SLF.QQQCUMQTY_0 + SLF.RRRCUMQTY_0 - coalesce(stock_issues.QTYSTU_0,0)) as integer) as QTY,
SLF.AVC_0
from PROD.STOLOTFCY SLF
join PROD.ITMMASTER ITM on
SLF.ITMREF_0 = ITM.ITMREF_0
left join stock_issues
on SLF.ITMREF_0 = stock_issues.ITMREF_0
and SLF.STOFCY_0 = stock_issues.STOFCY_0
and SLF.LOT_0 = stock_issues.LOT_0
where
SLF.STOFCY_0 = 'WON'
group by
SLF.STOFCY_0,
SLF.ITMREF_0,
ITM.ITMDES1_0,
SLF.LOT_0,
SLF.AVC_0
order by 1, 2
""",
#startdate=edi_date,
).all()
def gtin_lookup(gtin):
with yamamotoyama.get_connection() as db_connection:
itmref = db_connection.query(
"""
select
[ITM].[ITMREF_0],
[ITM].[ITMDES1_0],
[ITM].[EANCOD_0],
[ITM].[ZCASEUPC_0]
from [PROD].[ITMMASTER] [ITM]
join [PROD].[ITMFACILIT] [ITF]
on [ITM].[ITMREF_0] = [ITF].[ITMREF_0]
and [ITF].[STOFCY_0] = 'WON'
where
replace([ITM].[ZCASEUPC_0],' ','') = :zcaseupc
""",
zcaseupc=gtin,
).first()
if itmref is None:
itmref = db_connection.query(
"""
select
[ITM].[ITMREF_0],
[ITM].[ITMDES1_0],
[ITM].[EANCOD_0],
[ITM].[ZCASEUPC_0]
from [PROD].[ITMMASTER] [ITM]
join [PROD].[ITMFACILIT] [ITF]
on [ITM].[ITMREF_0] = [ITF].[ITMREF_0]
and [ITF].[STOFCY_0] = 'WON'
where
replace([ITM].[EANCOD_0],' ','') = :zcaseupc
""",
zcaseupc=gtin,
).first()["ITMREF_0"]
else:
itmref = itmref["ITMREF_0"]
return itmref
def stock_count_alert():
msg = MIMEMultipart()
msg['Subject'] = 'New Stock Count from Shandex'
msg['Precedence'] = 'bulk'
msg['From'] = 'x3report@stashtea.com'
msg['To'] = 'jpena@yamamotoyama.com,icortes@yamamotoyama.com,mdelacruz@yamamotoyama.com'
msg['Cc'] = 'bleeson@stashtea.com'
emailtext = f'Attached.'
msg.attach(MIMEText(emailtext, 'plain'))
for file in EDI_846_ATTACHMENTS.iterdir():
if file.name.endswith('.csv'):
part = MIMEApplication(open(file, 'rb').read())
part['Content-Disposition'] = f'attachment; filename="{file.name}"'
msg.attach(part)
shutil.move(file, EDI_846_ATTACHMENTS_ARCHIVE / file.name)
service = gmail_authenticate()
encoded_message = base64.urlsafe_b64encode(msg.as_bytes()).decode()
gmail_send_message(service, encoded_message)
# with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
# smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
# smtp.send_message(msg)
def process_file(edi_filename: pathlib.Path):
"""
Convert a specific EDI file into an import file.
"""
shandex_inventory = {} #all inventory
product = ''
lot = ''
qty = 0
for fields in tokens_from_edi_file(edi_filename):
if fields[0] == "BIA":
advice_date = fields[4]
if fields[0] == 'LIN' and len(fields) > 5:
if product != '': #check loop entry
if product not in shandex_inventory: #if we haven't seen the product yet add it
# pprint.pprint('product was not found')
shandex_inventory[product] = {lot : qty}
# pprint.pprint(shandex_inventory)
else: #we've seen this product, have we seen the lot
if lot not in shandex_inventory[product]:#if not, add it
shandex_inventory[product][lot] = qty
else:
shandex_inventory[product][lot] += qty #if we have add to it
# pprint.pprint('product: ' + product)
# pprint.pprint('lot: ' + lot)
# pprint.pprint('qty: ' + str(qty))
#LIN**SK*077652972160*LT*31052026A
gtin = fields[3]
lot = fields[5]
product = gtin_lookup(gtin)
qty = 0
if fields[0] == "QTY":#product should already exist
# QTY*33*0
# QTY*20*16
# QTY*QH*0
qty += int(fields[2])
if fields[0] == "SE":#end of file
# pprint.pprint('final add')
# pprint.pprint(shandex_inventory)
# pprint.pprint('product: ' + product)
# pprint.pprint('lot: ' + lot)
# pprint.pprint('qty: ' + str(qty))
if product is not None: #check loop entry
if product not in shandex_inventory: #if we haven't seen the product yet add it
shandex_inventory[product] = {lot : qty}
else: #we've seen this product, have we seen the lot
if lot not in shandex_inventory[product]:#if not, add it
shandex_inventory[product][lot] = qty
else:
shandex_inventory[product][lot] += qty #if we have add to it
return shandex_inventory
if __name__ == "__main__":
main()