Google auth changes and to_table work

master
bleeson 2025-05-05 15:21:23 -07:00
parent 318cf7790e
commit 30e028609c
12 changed files with 1319 additions and 30 deletions

1
credentials.json Normal file
View File

@ -0,0 +1 @@
{"installed":{"client_id":"249203512502-8ut4vkh77ns4rl40ia485t460niii2b8.apps.googleusercontent.com","project_id":"python-access-2025","auth_uri":"https://accounts.google.com/o/oauth2/auth","token_uri":"https://oauth2.googleapis.com/token","auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs","client_secret":"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","redirect_uris":["http://localhost"]}}

View File

@ -17,11 +17,22 @@ from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from email.mime.text import MIMEText
import os
import base64
import google.auth
import pickle
# Gmail API utils
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import records # type: ignore
import yamamotoyama # type: ignore
import yamamotoyama.x3_imports # type: ignore
SCOPES = ['https://mail.google.com/']
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "incoming"
IMPORTS_DIRECTORY = THIS_DIRECTORY / "x3_imports"
@ -59,6 +70,38 @@ def main():
stock_count_alert()
def gmail_authenticate():
creds = None
# the file token.pickle stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first time
if os.path.exists("token.pickle"):
with open("token.pickle", "rb") as token:
creds = pickle.load(token)
# if there are no (valid) credentials availablle, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
# save the credentials for the next run
with open("token.pickle", "wb") as token:
pickle.dump(creds, token)
return build('gmail', 'v1', credentials=creds)
def gmail_send_message(service, payload):
create_message = {"raw": payload}
# pylint: disable=E1101
send_message = (
service.users()
.messages()
.send(userId="me", body=create_message)
.execute()
)
return send_message
def compare_inventory(shandex_inventory, x3_inventory):
today = datetime.datetime.today()
today = today.strftime('%Y-%m-%d')
@ -215,7 +258,7 @@ def stock_count_alert():
msg['Subject'] = 'New Stock Count from Shandex'
msg['Precedence'] = 'bulk'
msg['From'] = 'x3report@stashtea.com'
msg['To'] = 'isenn@yamamotoyama.com,vgomez@yamamotoyama.com'
msg['To'] = 'jpena@yamamotoyama.com,icortes@yamamotoyama.com,mdelacruz@yamamotoyama.com'
msg['Cc'] = 'bleeson@stashtea.com'
emailtext = f'Attached.'
msg.attach(MIMEText(emailtext, 'plain'))
@ -225,9 +268,12 @@ def stock_count_alert():
part['Content-Disposition'] = f'attachment; filename="{file.name}"'
msg.attach(part)
shutil.move(file, EDI_846_ATTACHMENTS_ARCHIVE / file.name)
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
smtp.send_message(msg)
service = gmail_authenticate()
encoded_message = base64.urlsafe_b64encode(msg.as_bytes()).decode()
gmail_send_message(service, encoded_message)
# with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
# smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
# smtp.send_message(msg)
def process_file(edi_filename: pathlib.Path):

View File

@ -18,11 +18,22 @@ import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import os
import base64
import google.auth
import pickle
# Gmail API utils
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import records # type: ignore
import yamamotoyama # type: ignore
import yamamotoyama.x3_imports # type: ignore
SCOPES = ['https://mail.google.com/']
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "incoming"
IMPORTS_DIRECTORY = THIS_DIRECTORY / "x3_imports"
@ -44,6 +55,38 @@ def main():
combine_zpohs()
def gmail_authenticate():
creds = None
# the file token.pickle stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first time
if os.path.exists("token.pickle"):
with open("token.pickle", "rb") as token:
creds = pickle.load(token)
# if there are no (valid) credentials availablle, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
# save the credentials for the next run
with open("token.pickle", "wb") as token:
pickle.dump(creds, token)
return build('gmail', 'v1', credentials=creds)
def gmail_send_message(service, payload):
create_message = {"raw": payload}
# pylint: disable=E1101
send_message = (
service.users()
.messages()
.send(userId="me", body=create_message)
.execute()
)
return send_message
def new_850_alert(ordref, orddat):
msg = MIMEMultipart()
msg['Subject'] = 'New PO from Shandex'
@ -53,9 +96,12 @@ def new_850_alert(ordref, orddat):
msg['CC'] = 'bleeson@stashtea.com'
emailtext = f'Ref: {ordref}\nDate: {orddat}'
msg.attach(MIMEText(emailtext, 'plain'))
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
smtp.send_message(msg)
service = gmail_authenticate()
encoded_message = base64.urlsafe_b64encode(msg.as_bytes()).decode()
gmail_send_message(service, encoded_message)
# with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
# smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
# smtp.send_message(msg)
def combine_zpohs():
"""

View File

@ -22,6 +22,15 @@ import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import os
import base64
import google.auth
import pickle
# Gmail API utils
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import records # type: ignore
import yamamotoyama # type: ignore
@ -29,6 +38,8 @@ import yamamotoyama.x3_imports # type: ignore
import simple_email_notification
SCOPES = ['https://mail.google.com/']
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "incoming"
IMPORTS_DIRECTORY = THIS_DIRECTORY / "x3_imports"
@ -222,6 +233,38 @@ def main():
shutil.move(edi_filename, THIS_DIRECTORY / "processed_867s" / edi_filename.name) #They go in here so we can use them in the dashboard script, 2024-08 dashboard no longer needed
def gmail_authenticate():
creds = None
# the file token.pickle stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first time
if os.path.exists("token.pickle"):
with open("token.pickle", "rb") as token:
creds = pickle.load(token)
# if there are no (valid) credentials availablle, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
# save the credentials for the next run
with open("token.pickle", "wb") as token:
pickle.dump(creds, token)
return build('gmail', 'v1', credentials=creds)
def gmail_send_message(service, payload):
create_message = {"raw": payload}
# pylint: disable=E1101
send_message = (
service.users()
.messages()
.send(userId="me", body=create_message)
.execute()
)
return send_message
def get_customer_map():
customer_map = {}
with yamamotoyama.get_connection() as database:
@ -246,9 +289,12 @@ def missing_customer_alert(customer_key):
msg['To'] = 'technical-contact@stashtea.com'
emailtext = f'Missing value: {customer_key}'
msg.attach(MIMEText(emailtext, 'plain'))
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
smtp.send_message(msg)
service = gmail_authenticate()
encoded_message = base64.urlsafe_b64encode(msg.as_bytes()).decode()
gmail_send_message(service, encoded_message)
# with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
# smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
# smtp.send_message(msg)
def combine_zship867s():
@ -690,9 +736,12 @@ class WarehouseShipmentDetail:
else:
emailtext = str(self.sohnum +' '+str(self.itmref))
msg.attach(MIMEText(emailtext, 'plain'))
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
smtp.send_message(msg)
service = gmail_authenticate()
encoded_message = base64.urlsafe_b64encode(msg.as_bytes()).decode()
gmail_send_message(service, encoded_message)
# with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
# smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
# smtp.send_message(msg)
raise NotImplementedError # TODO
result = get()

View File

@ -18,11 +18,22 @@ import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import os
import base64
import google.auth
import pickle
# Gmail API utils
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import records # type: ignore
import yamamotoyama # type: ignore
import yamamotoyama.x3_imports # type: ignore
SCOPES = ['https://mail.google.com/']
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "incoming"
IMPORTS_DIRECTORY = THIS_DIRECTORY / "x3_imports"
@ -44,6 +55,38 @@ def main():
combine_zpthis()
def gmail_authenticate():
creds = None
# the file token.pickle stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first time
if os.path.exists("token.pickle"):
with open("token.pickle", "rb") as token:
creds = pickle.load(token)
# if there are no (valid) credentials availablle, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
# save the credentials for the next run
with open("token.pickle", "wb") as token:
pickle.dump(creds, token)
return build('gmail', 'v1', credentials=creds)
def gmail_send_message(service, payload):
create_message = {"raw": payload}
# pylint: disable=E1101
send_message = (
service.users()
.messages()
.send(userId="me", body=create_message)
.execute()
)
return send_message
def new_944_alert(sdhnum, pohnum, rcpdat):
msg = MIMEMultipart()
msg['Subject'] = 'New Receipt from Shandex'
@ -53,9 +96,12 @@ def new_944_alert(sdhnum, pohnum, rcpdat):
msg['CC'] = 'bleeson@stashtea.com'
emailtext = f'Delivery: {sdhnum}\nPO: {pohnum}\nDate: {rcpdat}'
msg.attach(MIMEText(emailtext, 'plain'))
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
smtp.send_message(msg)
service = gmail_authenticate()
encoded_message = base64.urlsafe_b64encode(msg.as_bytes()).decode()
gmail_send_message(service, encoded_message)
# with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
# smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
# smtp.send_message(msg)
def validation_alert(sdhnum):
@ -67,9 +113,12 @@ def validation_alert(sdhnum):
msg['CC'] = 'bleeson@stashtea.com'
emailtext = f'A Shandex receipt for {sdhnum} could not be loaded into X3 because the shipment is not validated.'
msg.attach(MIMEText(emailtext, 'plain'))
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
smtp.send_message(msg)
service = gmail_authenticate()
encoded_message = base64.urlsafe_b64encode(msg.as_bytes()).decode()
gmail_send_message(service, encoded_message)
# with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
# smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
# smtp.send_message(msg)
def combine_zpthis():
"""
@ -189,8 +238,6 @@ def process_file(edi_filename: pathlib.Path):
),
subdetail,
)
for thing in warehouse_receipt.details:
pprint.pprint(thing)
time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
new_944_alert(sdhnum, pohnum, warehouse_receipt.header.rcpdat)
with yamamotoyama.x3_imports.open_import_file(

663
edi_944_to_table.py Normal file
View File

@ -0,0 +1,663 @@
#!/usr/bin/env python3
"""
Consume a generic 944 file from 3PLs, and translate into a Sage X3
readable file - import template ZPTHI.
For Shandex we also need to reply with a 997
"""
# pylint: disable=too-many-instance-attributes
import dataclasses
import datetime
import decimal
import functools
import pathlib
import re
import shutil
import typing
import pprint
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import simple_email_notification
import records # type: ignore
import yamamotoyama # type: ignore
import yamamotoyama.x3_imports # type: ignore
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "incoming"
IMPORTS_DIRECTORY = THIS_DIRECTORY / "x3_imports"
EDI_997_DIRECTORY = THIS_DIRECTORY / "997_processing"
SHANDEX_944_FILENAME_RE = re.compile(
r"\A 944_STASH-YAMAMOTOYAMA_ \S+ [.]edi \Z", re.X | re.M | re.S
)
INSERT_RECEIPT = """
execute staging.dbo.shandex_insert_receipt
:sdhnum,
:E,
:prhfcy,
:rcpdat,
:pthnum,
:bpsnum,
:cur,
:star71,
:star72,
:star81,
:star82
"""
INSERT_RECEIPT_DETAILS = """
execute staging.dbo.shandex_insert_receipt_line
:L,
:sdhnum,
:sddlin,
:itmref,
:uom,
:qtyuom,
:pjt,
:star65,
:star91,
:star92,
:S,
:sta,
:pcu,
:qtypcu,
:lot,
:bpslot,
:sernum
"""
def main():
"""
Do it!
"""
for edi_filename in X12_DIRECTORY.iterdir():
if SHANDEX_944_FILENAME_RE.match(edi_filename.name):
process_file(edi_filename)
# file moved to 997 processing folder to be sent later
shutil.move(edi_filename, EDI_997_DIRECTORY / edi_filename.name)
def new_944_alert(sdhnum, pohnum, rcpdat):
msg = MIMEMultipart()
msg['Subject'] = 'New Receipt from Shandex'
msg['Precedence'] = 'bulk'
msg['From'] = 'x3report@stashtea.com'
msg['To'] = 'icortes@yamamotoyama.com,mdelacruz@yamamotoyama.com,dalmanza@yamamotoyama.com,jpena@yamamotoyama.com'
msg['CC'] = 'bleeson@stashtea.com'
emailtext = f'Delivery: {sdhnum}\nPO: {pohnum}\nDate: {rcpdat}'
msg.attach(MIMEText(emailtext, 'plain'))
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
smtp.send_message(msg)
def validation_alert(sdhnum):
msg = MIMEMultipart()
msg['Subject'] = 'New Receipt from Shandex'
msg['Precedence'] = 'bulk'
msg['From'] = 'x3report@stashtea.com'
msg['To'] = 'icortes@yamamotoyama.com,mdelacruz@yamamotoyama.com,dalmanza@yamamotoyama.com,jpena@yamamotoyama.com'
msg['CC'] = 'bleeson@stashtea.com'
emailtext = f'A Shandex receipt for {sdhnum} could not be loaded into X3 because the shipment is not validated.'
msg.attach(MIMEText(emailtext, 'plain'))
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
smtp.send_message(msg)
def combine_zpthis():
"""
Collect all ZPTHI imports into a single file for easy import.
"""
archive_directory = IMPORTS_DIRECTORY / "archive"
archive_directory.mkdir(exist_ok=True)
with (IMPORTS_DIRECTORY / "ZPTHI.dat").open(
"w", encoding="utf-8", newline="\n"
) as combined_import_file:
for individual_import_filename in IMPORTS_DIRECTORY.glob(
"ZPTHI_*.dat"
):
with individual_import_filename.open(
"r", encoding="utf-8", newline="\n"
) as individual_import_file:
for line in individual_import_file:
combined_import_file.write(line)
shutil.move(
individual_import_filename,
archive_directory / individual_import_filename.name,
)
def tokens_from_edi_file(
edi_filename: pathlib.Path,
) -> typing.Iterator[typing.List[str]]:
"""
Read tokens from EDI file
"""
with edi_filename.open(encoding="utf-8", newline="") as edi_file:
for record in edi_file.read().split("~"):
fields = record.split("*")
if fields[0] in {
"ISA",
"ST",
"N2",
"N3",
"N4",
"LX",
}:
continue
yield fields
def find_shipment_line(sdhnum, itmref):
with yamamotoyama.get_connection() as database:
result = database.query(
"""
select
SDDLIN_0
from PROD.SDELIVERYD
where
SDHNUM_0 = :sdhnum
and ITMREF_0 = :itmref
""",
sdhnum=sdhnum,
itmref=itmref
).first()['SDDLIN_0']
return result
def check_shipment_status(delivery):
with yamamotoyama.get_connection() as database:
result = database.query(
"""
select
SDH.SDHNUM_0,
CFMFLG_0
from PROD.SDELIVERY SDH
where SDH.SDHNUM_0 = :sdhnum
""",
sdhnum=delivery
).first()['CFMFLG_0']
if result == 2:
return True
return False
def process_file(edi_filename: pathlib.Path):
"""
Convert a specific EDI file into an import file.
"""
def fix_uom(uom):
x3_uom = ''
if uom == 'CA':
x3_uom = 'CS'
else:
x3_uom = uom
return x3_uom
warehouse_receipt = Receipt()
pohnum = ''
for fields in tokens_from_edi_file(edi_filename):
if fields[0] == "W17":
_, _, rcpdat, _, sohnum, sdhnum = fields[:6]
warehouse_receipt.sdhnum = sdhnum
validated = check_shipment_status(sdhnum)
warehouse_receipt.header.rcpdat = datetime.datetime.strptime(
rcpdat, "%Y%m%d"
).date() # 20230922
if fields[0] == "N9" and fields[1] == "PO":
pohnum = fields[2]
if fields[0] == "W07":
# W07*1023*CA**PN*C08249*LT*07032026A***UK*10077652082491
# N9*LI*1000
_, qty_str, uom, _, _, itmref, _, lot = fields[:8]
subdetail = ReceiptSubDetail(
pcu=fix_uom(uom),
qtypcu=int(qty_str),
lot=lot
)
if fields[0] == 'N9' and fields[1] == 'LI':
# N9*LI*1000
#line = fields[2] #This line isn't the line number from X3, it needs to be looked up
line = find_shipment_line(warehouse_receipt.sdhnum, itmref)
warehouse_receipt.append(
ReceiptDetail(
sdhnum=warehouse_receipt.sdhnum,
poplin=int(line),
itmref=itmref,
uom=fix_uom(uom),
qtyuom=int(qty_str)
),
subdetail,
)
time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
#new_944_alert(sdhnum, pohnum, warehouse_receipt.header.rcpdat)#TODO is this needed?
# with yamamotoyama.x3_imports.open_import_file(
# IMPORTS_DIRECTORY / f"ZPTHI_{warehouse_receipt.sdhnum}_{time_stamp}.dat"
# ) as import_file:
# warehouse_receipt.output(import_file)
import_receipt(warehouse_receipt)
def import_receipt(warehouse_receipt):
"""send the shipment information to the shandex staging database"""
with yamamotoyama.get_connection() as data_base:
result = data_base.query(
"""
SELECT
sdhnum
FROM [staging].[dbo].[shandex_receipts]
where sdhnum = :order
""",
order=warehouse_receipt.sdhnum,
).all()
if not result:
with data_base.transaction():
data_base.query(
INSERT_RECEIPT,
sdhnum=warehouse_receipt.sdhnum,
E='E',
prhfcy=warehouse_receipt.header.bpcord,
rcpdat=warehouse_receipt.header.rcpdat.strftime("%Y%m%d"),
pthnum='',
bpsnum=warehouse_receipt.header.stofcy,
cur=warehouse_receipt.header.cur,
star71=warehouse_receipt.header.star71,
star72=warehouse_receipt.header.star72,
star81=warehouse_receipt.header.star81,
star82=warehouse_receipt.header.star82
)
with data_base.transaction():
for detail in warehouse_receipt.details:
detail.qtyuom = detail.check_subdetail_qty()
for subdetail in detail.subdetails:
data_base.query(
INSERT_RECEIPT_DETAILS,
L='L',
sdhnum=detail.sdhnum,
sddlin=detail.poplin,
itmref=detail.itmref,
uom=detail.uom,
qtyuom=detail.qtyuom,
pjt=detail.pjt,
star65=detail.star65,
star91=detail.star91,
star92=detail.star92,
S='S',
sta=subdetail.sta,
pcu=subdetail.pcu,
qtypcu=subdetail.qtypcu,
lot=subdetail.lot,
bpslot=subdetail.bpslot,
sernum=subdetail.sernum
)
else:
simple_email_notification.email_noticication(['bleeson@stashtea.com'],'Shandex Receipt Error',[f'{warehouse_receipt.sdhnum} already exists, what happened?'])
@dataclasses.dataclass
class ReceiptSubDetail:
"""
Information that goes onto a receipt sub-detail line, taken from ZPTHI template.
"""
sta: str = "A"
pcu: str = ""
qtypcu: int = 0
loc: str = ""
lot: str = ""
bpslot: str = ""
sernum: str = ""
def append(self, receipt_subdetail):
self.qtypcu += receipt_subdetail.qtypcu
def stojous(self, shipment, item) -> typing.List[str]:
"""
Convert grouped lot quantities into individual STOJOU records to fit on receipt
"""
with yamamotoyama.get_connection() as database:
details = (
database.query(
"""
select
'S' [Code],
'A' [STA_0],
[STJ].[PCU_0],
cast(cast(-1*[STJ].[QTYSTU_0] as int) as nvarchar) [QTYPCU_0],
[STJ].[LOT_0],
'' [BPSLOT_0],
'' [SERNUM_0]
from [PROD].[STOJOU] [STJ]
where
[STJ].[VCRNUM_0] = :sdhnum
and [STJ].[ITMREF_0] = :itmref
and [STJ].[LOT_0] = :lot
and [STJ].[TRSTYP_0] = 4
""",
sdhnum=shipment,
itmref=item,
lot=self.lot,
)
.all()
)
return details
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to strings for X3 import writing.
"""
return yamamotoyama.x3_imports.convert_to_strings(
[
"S",
self.sta,
self.pcu,
self.qtypcu,
self.lot,
self.bpslot,
self.sernum,
]
)
@dataclasses.dataclass
class ReceiptDetail:
"""
Information that goes on a receipt detail line, taken from ZPTHI template.
"""
sdhnum: str = ""
poplin: int = 0
itmref: str = ""
itmdes: str = ""
uom: str = ""
qtyuom: int = 0
pjt: str = ""
star65: str = ""
star91: str = ""
star92: str = ""
subdetails: typing.List[ReceiptSubDetail] = dataclasses.field(
default_factory=list
)
def append(self, subdetail: ReceiptSubDetail):
"""
Add subdetail
"""
subdetail.pcu = self.uom
self.subdetails.append(subdetail)
def check_subdetail_qty(self):
"""
Check for shortages by totaling up subdetail quantities.
"""
total_cases = 0
for subdetail in self.subdetails:
total_cases -= subdetail.qtypcu
return abs(total_cases)
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to strings for X3 import writing.
"""
def fix_uom(uom):
x3_uom = ''
if uom == 'CA':
x3_uom = 'CS'
else:
x3_uom = uom
return x3_uom
self.qty = self.check_subdetail_qty()
return yamamotoyama.x3_imports.convert_to_strings(
[
"L",
self.sdhnum,
self.poplin,
self.itmref,
fix_uom(self.uom),
self.qty,
self.star65,
self.star91,
self.star92,
]
)
def __eq__(self, item: typing.Any) -> bool:
"""
Test for equality
"""
if isinstance(item, str):
return self.itmref == item
if isinstance(item, ReceiptDetail):
return self.itmref == item.itmref
return False
# def fill(self):#not needed for receipts
# """
# Set soplin & itmdes from itmref & sohnum
# """
# def get() -> records.Record:
# with yamamotoyama.get_connection() as database:
# how_many = (
# database.query(
# """
# select
# count(*) as [how_many]
# from [PROD].[SORDERP] as [SOP]
# where
# [SOP].[SOHNUM_0] = :sohnum
# and [SOP].[ITMREF_0] = :itmref
# """,
# sohnum=self.sohnum,
# itmref=self.itmref,
# )
# .first()
# .how_many
# )
# if how_many == 1:
# return database.query(
# """
# select top 1
# [SOP].[SOPLIN_0]
# ,[SOP].[ITMDES1_0]
# ,[SOP].[SAU_0]
# from [PROD].[SORDERP] as [SOP]
# where
# [SOP].[SOHNUM_0] = :sohnum
# and [SOP].[ITMREF_0] = :itmref
# order by
# [SOP].[SOPLIN_0]
# """,
# sohnum=self.sohnum,
# itmref=self.itmref,
# ).first()
result = get()
self.soplin = result.SOPLIN_0
self.itmdes = result.ITMDES1_0
self.sau = result.SAU_0
@dataclasses.dataclass
class ReceiptHeader:
"""
Information that goes on a receipt header, taken from ZPTHI template.
"""
stofcy: str = ""
bpcord: str = ""
prhfcy: str = ""
rcpdat: datetime.date = datetime.date(1753, 1, 1)
pthnum: str = ""
bpsnum: str = ""
cur: str = "USD"
star71 = ""
star72 = ""
star81 = ""
star82 = ""
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to X3 import line
"""
return yamamotoyama.x3_imports.convert_to_strings(
[
"E",
self.bpcord,
self.rcpdat.strftime("%Y%m%d"),
self.pthnum,
self.stofcy,
self.cur,
self.star71,
self.star72,
self.star81,
self.star82,
]
)
class ReceiptDetailList:
"""
List of receipt details
"""
_details: typing.List[ReceiptDetail]
_item_set: typing.Set[str]
def __init__(self):
self._details = []
self._item_set = set()
def append(
self,
receipt_detail: ReceiptDetail,
receipt_subdetail: ReceiptSubDetail,
):
"""
Append
"""
itmref = receipt_detail.itmref
if itmref in self._item_set:
for detail in self._details:
for subdetail in detail.subdetails:
if detail.itmref == itmref and subdetail.lot == receipt_subdetail.lot:
subdetail.append(receipt_subdetail)
return
if detail.itmref == itmref:
detail.subdetails.append(receipt_subdetail)
return
self._item_set.add(itmref)
#receipt_detail.fill()
receipt_detail.append(receipt_subdetail)
self._details.append(receipt_detail)
def __iter__(self):
return iter(self._details)
class Receipt:
"""
Warehouse receipt, both header & details
"""
header: ReceiptHeader
details: ReceiptDetailList
_sdhnum: str
def __init__(self):
self.header = ReceiptHeader()
self._sdhnum = ""
self.details = ReceiptDetailList()
def append(
self,
receipt_detail: ReceiptDetail,
receipt_subdetail: ReceiptSubDetail,
):
"""
Add detail information.
"""
self.details.append(receipt_detail, receipt_subdetail)
@property
def sdhnum(self):
"""
shipment number
"""
return self._sdhnum
@sdhnum.setter
def sdhnum(self, value: str):
if self._sdhnum != value:
self._sdhnum = value
if value:
self._fill_info_from_shipment()
def _get_shipment_from_x3(self) -> records.Record:
"""
Fetch shipment from X3 database.
"""
with yamamotoyama.get_connection() as db_connection:
return db_connection.query(
"""
select
[SDH].[STOFCY_0],
[SDH].[SDHNUM_0],
[SDH].[SALFCY_0],
[SDH].[BPCORD_0],
[SDH].[CUR_0],
[SDH].[SOHNUM_0]
from [PROD].[SDELIVERY] [SDH]
where
[SDH].[SDHNUM_0] = :shipment
""",
shipment=self.sdhnum,
).first()
def _fill_info_from_shipment(self):
"""
When we learn the SOHNUM, we can copy information from the sales order.
"""
result = self._get_shipment_from_x3()
self.header.stofcy = result.STOFCY_0
self.header.sdhnum = result.SDHNUM_0
self.header.salfcy = result.SALFCY_0
self.header.bpcord = result.BPCORD_0
self.header.cur = result.CUR_0
self.header.sohnum = result.SOHNUM_0
def output(self, import_file: typing.TextIO):
"""
Output entire order to import_file.
"""
output = functools.partial(
yamamotoyama.x3_imports.output_with_file, import_file
)
output(self.header.convert_to_strings())
for detail in self.details:
output(detail.convert_to_strings())
for subdetail in detail.subdetails:
shipment = detail.sdhnum
item = detail.itmref
for record in subdetail.stojous(shipment, item):
record_list = [
record['Code'],
record['STA_0'],
record['PCU_0'],
record['QTYPCU_0'],
record['LOT_0'],
record['BPSLOT_0'],
record['SERNUM_0']
]
#pprint.pprint(record_list)
output(record_list)
if __name__ == "__main__":
main()

View File

@ -24,11 +24,22 @@ import pprint
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import os
import base64
import google.auth
import pickle
# Gmail API utils
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import records # type: ignore
import yamamotoyama # type: ignore
import yamamotoyama.x3_imports # type: ignore
SCOPES = ['https://mail.google.com/']
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "incoming"
IMPORTS_DIRECTORY = THIS_DIRECTORY / "x3_imports"
@ -81,6 +92,38 @@ def main():
combine_zscs()
def gmail_authenticate():
creds = None
# the file token.pickle stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first time
if os.path.exists("token.pickle"):
with open("token.pickle", "rb") as token:
creds = pickle.load(token)
# if there are no (valid) credentials availablle, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
# save the credentials for the next run
with open("token.pickle", "wb") as token:
pickle.dump(creds, token)
return build('gmail', 'v1', credentials=creds)
def gmail_send_message(service, payload):
create_message = {"raw": payload}
# pylint: disable=E1101
send_message = (
service.users()
.messages()
.send(userId="me", body=create_message)
.execute()
)
return send_message
def combine_zscs():
"""
Collect all ZSCS imports into a single file for easy import.
@ -152,13 +195,16 @@ def stock_movement_alert(itmref, qty, lot, status):
msg['Subject'] = 'New Stock Change from Shandex'
msg['Precedence'] = 'bulk'
msg['From'] = 'x3report@stashtea.com'
msg['To'] = 'isenn@yamamotoyama.com, vgomez@yamamotoyama.com'
msg['CC'] = 'bleeson@stashtea.com'
msg['To'] = 'woninventory@stashtea.com'
msg['CC'] = 'bleeson@stashtea.com,icarrera@yamamotoyama.com'
emailtext = f'Item: {itmref}\nQty: {qty}\nLot: {lot}\nStatus: {DAMAGE_CODE_MAPPING[status]}\nReason: {DAMAGE_CODE_DESCRIPTIONS_MAPPING[status]}'
msg.attach(MIMEText(emailtext, 'plain'))
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
smtp.send_message(msg)
service = gmail_authenticate()
encoded_message = base64.urlsafe_b64encode(msg.as_bytes()).decode()
gmail_send_message(service, encoded_message)
# with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
# smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
# smtp.send_message(msg)
def process_file(edi_filename: pathlib.Path):

221
import_944s.py Normal file
View File

@ -0,0 +1,221 @@
#!/usr/bin/env python3
"""
Process entries in the staging database put there by
edi_867 and prepare them for X3 batch importing
"""
import pathlib
import datetime
import pprint
import shutil
import records # type: ignore
import yamamotoyama # type: ignore
import yamamotoyama.x3_imports # type: ignore
import simple_email_notification
THIS_DIRECTORY = pathlib.Path(__file__).parent
IMPORTS_DIRECTORY = THIS_DIRECTORY / 'x3_imports'
UNVALIDATED_STATEMENT = """
select
sdhnum
from staging.dbo.shandex_receipts
left join x3.PROD.SDELIVERY SDH
on shandex_receipts.sdhnum = SDH.SDHNUM_0
where
is_sent = 0
and SDH.CFMFLG_0 = 1
"""
SELECT_STATEMENT = """
select
sdhnum
from staging.dbo.shandex_receipts
left join x3.PROD.SDELIVERY SDH
on shandex_receipts.sdhnum = SDH.SDHNUM_0
where
is_sent = 0
and SDH.CFMFLG_0 = 2
"""
HEADER_STATEMENT = """
select
[sdhnum]
,[E]
,[prhfcy]
,[rcpdat]
,[pthnum]
,[bpsnum]
,[cur]
,[star71]
,[star72]
,[star81]
,[star82]
,[is_sent]
from staging.dbo.shandex_receipts
where sdhnum = :sdhnum
"""
DETAIL_STATEMENT = """
select
[L]
,[sdhnum]
,[sddlin]
,[itmref]
,[uom]
,[qtyuom]
,[pjt]
,[star65]
,[star91]
,[star92]
FROM [staging].[dbo].[shandex_receipt_details]
where sdhnum = :sdhnum
group by
[L]
,[sdhnum]
,[sddlin]
,[itmref]
,[uom]
,[qtyuom]
,[pjt]
,[star65]
,[star91]
,[star92]
"""
SUBDETAIL_STATEMENT = """
select distinct
[S]
,[sta]
,[pcu]
,[qtypcu]
,[lot]
,[bpslot]
,[sernum]
from [staging].[dbo].[shandex_receipt_details]
where sdhnum = :sdhnum
and itmref = :itmref
"""
UPDATE_STATEMENT = """
update [staging].[dbo].[shandex_receipts]
set is_sent = 1
where sdhnum = :sdhnum
"""
HEADER_NAMES = ['E','prhfcy','rcpdat','pthnum','bpsnum','cur','star71','star72','star81','star82']
DETAIL_NAMES = ['L','sdhnum','sddlin','itmref','uom','qtyuom','pjt','star65','star91','star92']
SUBDETAIL_NAMES = ['S','sta','pcu','qtypcu','lot','bpslot','sernum']
def unvalidated_shipments_alert(unvalidated_shipments):
to_addresses = ['bleeson@stashtea.com','icortes@yamamotoyama.com','mdelacruz@yamamotoyama.com',
'dalmanza@yamamotoyama.com','jpena@yamamotoyama.com']
subject = 'Shandex: Unvalidated shipments awaiting receipt'
body_text = ['Shipments pending validation:']
for record in unvalidated_shipments:
body_text.append(record['sdhnum'])
simple_email_notification.email_noticication(to_addresses, subject, body_text)
def get_receipts(database):
with database.transaction():
result = database.query(SELECT_STATEMENT).all()
return result
def get_unvalidated_shipments(database):
with database.transaction():
result = database.query(UNVALIDATED_STATEMENT).all()
return result
def get_receipt_headers(database, sdhnum):
result = database.query(
HEADER_STATEMENT,
sdhnum=sdhnum
).first()
return result
def get_receipt_details(database, sdhnum):
result = database.query(
DETAIL_STATEMENT,
sdhnum=sdhnum
).all()
return result
def get_receipt_subdetails(database, sdhnum, itmref):
result = database.query(
SUBDETAIL_STATEMENT,
sdhnum=sdhnum,
itmref=itmref
).all()
return result
def create_imports(receipts, database):
for receipt in receipts:
time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
with open(
IMPORTS_DIRECTORY / f"ZPTHI_{receipt['sdhnum']}_{time_stamp}.dat", 'w', encoding='utf-8', newline='\n'
) as import_file:
headers = get_receipt_headers(database, receipt['sdhnum'])
details = get_receipt_details(database, receipt['sdhnum'])
for name in HEADER_NAMES:
import_file.write(headers[name])
import_file.write(chr(31))
import_file.write('\n')
for record in details:
for name in DETAIL_NAMES:
import_file.write(record[name])
import_file.write(chr(31))
import_file.write('\n')
subdetails = get_receipt_subdetails(database, receipt['sdhnum'], record['itmref'])
for subrecord in subdetails:
for name in SUBDETAIL_NAMES:
import_file.write(subrecord[name])
import_file.write(chr(31))
import_file.write('\n')
def combine_imports():
archive_directory = IMPORTS_DIRECTORY / "archive"
archive_directory.mkdir(exist_ok=True)
with (IMPORTS_DIRECTORY / "ZPTHI.dat").open(
"w", encoding="utf-8", newline="\n"
) as combined_import_file:
for individual_import_filename in IMPORTS_DIRECTORY.glob(
"ZPTHI_*.dat"
):
with individual_import_filename.open(
"r", encoding="utf-8", newline="\n"
) as individual_import_file:
for line in individual_import_file:
combined_import_file.write(line)
shutil.move(
individual_import_filename,
archive_directory / individual_import_filename.name,
)
def mark_sent(database, receipts):
with database.transaction():
for receipt in receipts:
result = database.query(
UPDATE_STATEMENT,
sdhnum=receipt['sdhnum']
)
def main():
with yamamotoyama.get_connection() as database:
#retrieve everything that has a valid customer and hasn't already been sent to X3
receipts = get_receipts(database)
#TODO check for validation problems, report on them, and don't try to process them
unvalidateds = get_unvalidated_shipments(database)
if unvalidateds:
unvalidated_shipments_alert(unvalidateds)
#turn each shipment into a X3 import file
create_imports(receipts, database)
combine_imports()
#udate the is_sent field so they are not processed again
mark_sent(database, receipts)
if __name__ == "__main__":
main()

View File

@ -9,6 +9,7 @@ import paramiko # type: ignore
import pprint
import edi_997_inbound
import edi_944
import edi_944_to_table
import edi_947
import edi_846
import edi_867_to_table
@ -17,6 +18,7 @@ import update_shandex_dashboard
import edi_943
import unprocessed_files_report
import import_867s
import import_944s
import edi_850
THIS_DIRECTORY = pathlib.Path(__file__).parent
@ -32,14 +34,16 @@ def main():
retrieve_x12_edi_files_shandex()
#report on anything not handled
unprocessed_files_report.main()
#unprocessed_files_report.main()
#process all EDIs that started with Shandex
edi_997_inbound.main()
edi_944.main()
#edi_944.main()
edi_944_to_table.main()
import_944s.main()
edi_947.main()
edi_846.main()
edi_850.main()
#edi_850.main()#disabled 2025-02-13 per Purchasing decision for "Push" methodology
edi_867_to_table.main()
import_867s.main()
@ -54,7 +58,7 @@ def main():
#update_shandex_dashboard.main()
#report on anything not handled
unprocessed_files_report.main()
#unprocessed_files_report.main()

55
reimport_from_archive.py Normal file
View File

@ -0,0 +1,55 @@
#!/usr/bin/env python3
"""
Find import files from the x3 import archive folder
and stick them together for a single reimport.
Useful when stock levels prevented multiple shipments from
importing in.
"""
import re
import pathlib
import pprint
THIS_DIRECTORY = pathlib.Path(__file__).parent
X3_IMPORT_ARCHIVE = THIS_DIRECTORY / "x3_imports" / "archive"
REIMPORT_DIRECTORY = THIS_DIRECTORY / "reimports"
LIST_OF_UNSHIPPED_ORDERS = [
'O0218424'
]
def get_positions(string):
pos = []
for i, chr in enumerate(string):
if string[i] == '_':
pos.append(i)
return pos
def main():
#run through all files in list and combine them into a single import
with open(REIMPORT_DIRECTORY / 'ZSHIP867.dat', 'w',encoding="utf-8", newline="\n") as combined_import_file:
files = get_files()
for file in files:
with file.open(
"r", encoding="utf-8", newline="\n"
) as individual_import_file:
for line in individual_import_file:
combined_import_file.write(line)
#search the archive directory for the files, write their contents to a single file
def get_files():
file_list = []
for file in X3_IMPORT_ARCHIVE.iterdir():
if file.name[:9] == 'ZSHIP867_':
underscores = get_positions(file.name)
pos1 = underscores[1]+1
pos2 = underscores[2]
if file.name[pos1:pos2] in LIST_OF_UNSHIPPED_ORDERS:
file_list.append(file)
return file_list
if __name__ == "__main__":
main()

View File

@ -0,0 +1,70 @@
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import os
import base64
from email.message import EmailMessage
import google.auth
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import pickle
# Gmail API utils
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
SCOPES = ['https://mail.google.com/']
def gmail_authenticate():
creds = None
# the file token.pickle stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first time
if os.path.exists("token.pickle"):
with open("token.pickle", "rb") as token:
creds = pickle.load(token)
# if there are no (valid) credentials availablle, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
# save the credentials for the next run
with open("token.pickle", "wb") as token:
pickle.dump(creds, token)
return build('gmail', 'v1', credentials=creds)
def gmail_send_message(service, payload):
create_message = {"raw": payload}
# pylint: disable=E1101
send_message = (
service.users()
.messages()
.send(userId="me", body=create_message)
.execute()
)
return send_message
#simple email to a list of recipients
def email_noticication(address_list, subject_text, body_text_list):
service = gmail_authenticate()
file_string = '\n'.join(body_text_list)
msg = MIMEMultipart()
msg['Subject'] = f'{subject_text}'
msg['Precedence'] = 'bulk'
msg['From'] = 'x3report@stashtea.com'
msg['To'] = ','.join(address_list)
#msg['CC'] = 'bleeson@stashtea.com'
emailtext = f'{file_string}'
msg.attach(MIMEText(emailtext, 'plain'))
encoded_message = base64.urlsafe_b64encode(msg.as_bytes()).decode()
gmail_send_message(service, encoded_message)
# with smtplib.SMTP_SSL("smtp-relay.gmail.com", 465) as smtp:
# smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
# smtp.send_message(msg)

View File

@ -0,0 +1,41 @@
#!/usr/bin/env python3
"""
This is called unprocessed files, but it's really files that went through processing
We can tell there's a problem if the same files are reprocessing over and over, this is
more of an alert to show that something is happening.
"""
# pylint: disable=too-many-instance-attributes
import pathlib
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import records # type: ignore
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "incoming"
def main():
edi_types = {}
for edi_filename in X12_DIRECTORY.iterdir():
if edi_filename.name.endswith('.edi'):
edi_file_type = edi_filename.name[:3]
if edi_file_type not in edi_types:
edi_types[edi_file_type] = 1
else:
edi_types[edi_file_type] += 1
if edi_types:
msg = MIMEMultipart()
msg['Subject'] = 'Shandex Files'
msg['Precedence'] = 'bulk'
msg['From'] = 'x3report@stashtea.com'
msg['To'] = 'bleeson@stashtea.com'
emailtext = str(edi_types)
msg.attach(MIMEText(emailtext, 'plain'))
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
smtp.send_message(msg)
if __name__ == "__main__":
main()