source_logistics_ecommerce_.../source_ecommerce_make_order...

421 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Pull shipment files from the Stash AWS FTP.
After copying file, move file to archive folder on FTP.
Create ZSHPORD import files, which X3 will consume on a schedule via ZECMSL recurring task.
Copy file to the archive and then pass it to the shipment maker directory.
TODO: Source needs to send us real data for final adjustments (site, discount, multilot, etc.)
"""
import csv
import pprint
import dataclasses
import datetime
import decimal
import functools
import pathlib
import re
import shutil
import typing
import paramiko
import records # type: ignore
import yamamotoyama # type: ignore
import yamamotoyama.x3_imports # type: ignore
THIS_DIRECTORY = pathlib.Path(__file__).parent
SFTP_HOST = "s-8ade4d252cc44c50b.server.transfer.us-west-1.amazonaws.com"
SFTP_USERNAME = "yumiddleware2023"
SSH_DIRECTORY = THIS_DIRECTORY / "ssh" #TODO fixme
SSH_KNOWN_HOSTS_FILE = str(SSH_DIRECTORY / "known_hosts")
SSH_KEY_FILENAME = str(SSH_DIRECTORY / "id_ed25519")
INCOMING_DIRECTORY = THIS_DIRECTORY / "incoming_orders"
SHIPMENTS_DIRECTORY = THIS_DIRECTORY / "incoming_shipments"
SOH_IMPORT_DIRECTORY = THIS_DIRECTORY / "to_import_SOH"
def main():
#retrieve_x12_edi_files()#TODO uncomment
for file in INCOMING_DIRECTORY.iterdir():
if file.name[-4:] != '.csv':
continue
else:
process_files(file)
shutil.move(file, SHIPMENTS_DIRECTORY / file.name)
#archives are in the shipping folder
combine_zshpords()
def sftp_server() -> paramiko.SFTPClient:
with paramiko.SSHClient() as ssh_client:
ssh_client.load_system_host_keys()
ssh_client.load_host_keys(SSH_KNOWN_HOSTS_FILE)
ssh_client.set_missing_host_key_policy(paramiko.client.RejectPolicy)
ssh_client.connect(
hostname=SFTP_HOST, username=SFTP_USERNAME, key_filename=SSH_KEY_FILENAME
)
with ssh_client.open_sftp() as sftp_connection:
yield sftp_connection
def retrieve_x12_edi_files():
"""
Connect to S3 bucket & pull down files.
"""
with paramiko.SSHClient() as ssh_client:
ssh_client.load_system_host_keys()
ssh_client.load_host_keys(SSH_KNOWN_HOSTS_FILE)
ssh_client.set_missing_host_key_policy(paramiko.client.RejectPolicy)
ssh_client.connect(
hostname=SFTP_HOST, username=SFTP_USERNAME, key_filename=SSH_KEY_FILENAME
)
with ssh_client.open_sftp() as sftp_connection:
sftp_connection.chdir("/yu-edi-transfer/source-logi/dev/ecomm-inbound")#TODO set to prod
for filename in sftp_connection.listdir():
#if edi_945.SOURCE_945_FILENAME_RE.match(filename):#TODO fixme
sftp_connection.get(filename, INCOMING_DIRECTORY / filename)
new_filename = f"/yu-edi-transfer/source-logi/dev/ecomm-processed/{filename}"#TODO set to prod
sftp_connection.rename(filename, new_filename)
def combine_zshpords():
"""
Collect all ZSHPORD imports into a single file for easy import.
"""
archive_directory = SOH_IMPORT_DIRECTORY / "archive"
archive_directory.mkdir(exist_ok=True)
with (SOH_IMPORT_DIRECTORY / "ZSHPORD.dat").open(
"w", encoding="utf-8", newline="\n"
) as combined_import_file:
for individual_import_filename in SOH_IMPORT_DIRECTORY.glob(
"ZSHPORD_*.dat"
):
with individual_import_filename.open(
"r", encoding="utf-8", newline="\n"
) as individual_import_file:
for line in individual_import_file:
combined_import_file.write(line)
shutil.move(
individual_import_filename,
archive_directory / individual_import_filename.name,
)
def process_files(file):
with open(file) as source_file:
csv_reader = csv.reader(source_file)
sales_order = SalesOrder()
previous_order = ''
current_order = ''
for num, row in enumerate(csv_reader):
if num == 0:
continue #skip header lines
if num >= 1: #gather header information
current_order = row[6]
if current_order != previous_order:
time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
if sales_order.header.cusordref != '':
with yamamotoyama.x3_imports.open_import_file(
SOH_IMPORT_DIRECTORY / f"ZSHPORD_{current_order}_{time_stamp}_{sales_order.header.cusordref}.dat"
) as import_file:
sales_order.output(import_file)
sales_order = SalesOrder()
previous_order = current_order
ship_site = row[0]
order_id = row[6]
order_date = row[9]
customer_name = row[10]
# shipadd1 = row[9] # address information is not stored in X3
# shipadd2 = row[10]
# shipcity = row[11]
# shipstate = row[12]
# shipzip = row[13]
tracking = row[16]
weight = row[18]
taxes = row[22]
ship_charge = row[21]
discount = row[24]
sales_order.header.cusordref = order_id
sales_order.header.orddat = datetime.datetime.strptime(order_date,'%m/%d/%Y %H:%M').strftime('%Y%m%d') #TODO strftim this
sales_order.header.stofcy = ship_site
sales_order.header.bpdnam = customer_name
sales_order.header.invdtaamt_5 = ship_charge
sales_order.header.invdtaamt_7 = discount
sales_order.header.invdtaamt_8 = taxes
#gather line data
line_product = row[1]
line_qty = row[3]
line_lot = row[4]
line_price = row[20]
sales_order.append(
SalesOrderDetail(
itmref=line_product,
qty=int(line_qty),
gropri=line_price
)
)
time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
with yamamotoyama.x3_imports.open_import_file(
SOH_IMPORT_DIRECTORY / f"ZSHPORD_{current_order}_{time_stamp}_{sales_order.header.cusordref}.dat"
) as import_file:
sales_order.output(import_file)
@dataclasses.dataclass
class SalesOrderDetail:
"""
Information that goes on ann order detail line, taken from ZSHPORD template.
"""
itmref: str = ""
itmrefbpc: str = ""
itmdes: str = ""
qty: int = 0
gropri: decimal.Decimal = decimal.Decimal()
discrgval_1: decimal.Decimal = decimal.Decimal()
zamaztax: decimal.Decimal = decimal.Decimal()
star91: str = ""
star92: str = ""
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to strings for X3 import writing.
"""
#self.qty = self.check_subdetail_qty()
return yamamotoyama.x3_imports.convert_to_strings(
[
"D",
self.itmref,
self.itmrefbpc,
self.qty,
self.gropri,
self.discrgval_1,
self.zamaztax,
self.star91,
self.star92,
]
)
# def __eq__(self, item: typing.Any) -> bool:
# """
# Test for equality
# """
# if isinstance(item, str):
# return self.itmref == item
# if isinstance(item, SalesOrderDetail):
# return self.itmref == item.itmref
# return False
# def fill(self):
# """
# Set soplin & itmdes from itmref & sohnum
# """
# def get() -> records.Record:
# with yamamotoyama.get_connection() as database:
# how_many = (
# database.query(
# """
# select
# count(*) as [how_many]
# from [PROD].[SORDERP] as [SOP]
# where
# [SOP].[SOHNUM_0] = :sohnum
# and [SOP].[ITMREF_0] = :itmref
# """,
# sohnum=self.sohnum,
# itmref=self.itmref,
# )
# .first()
# .how_many
# )
# if how_many == 1:
# return database.query(
# """
# select top 1
# [SOP].[SOPLIN_0]
# ,[SOP].[ITMDES1_0]
# ,[SOP].[SAU_0]
# from [PROD].[SORDERP] as [SOP]
# where
# [SOP].[SOHNUM_0] = :sohnum
# and [SOP].[ITMREF_0] = :itmref
# order by
# [SOP].[SOPLIN_0]
# """,
# sohnum=self.sohnum,
# itmref=self.itmref,
# ).first()
# else:
# emailtext = str(self.sohnum +' '+str(self.itmref))
# msg.attach(MIMEText(emailtext, 'plain'))
# with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
# smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
# smtp.send_message(msg)
# raise NotImplementedError # TODO
# result = get()
# self.soplin = result.SOPLIN_0
# self.itmdes = result.ITMDES1_0
# self.sau = result.SAU_0
@dataclasses.dataclass
class SalesOrderHeader:
"""
Information that goes on an order header, taken from ZSHPORD template.
"""
sohnum: str = ""
sohtyp: str = "WEB"
bpcord: str = "STSHOPIFY" #TODO are they handling YU?
bpcinv: str = "STSHOPIFY" #TODO are they handling YU?
bpcpyr: str = "STSHOPIFY" #TODO are they handling YU?
bpaadd: str = "BL001"
orddat: datetime.date = datetime.date(1753, 1, 1)
cusordref: str = ""
cur: str = "USD"
alltyp: int = 2 #Detailed
salfcy: str = "ECS" #TODO are they handling YU?
stofcy: str = "" #TODO need to be set from file
pte: str = "USPREPAY" #TODO needs checking
vacbpr: str = "NTX" #TODO needs checking
dlvpio: int = 1 #normal
mdl: str = "GRN"#TODO any way to tell how they were sent?
yshppaymth: int = 1 #prepaid freight
bpcnam: str = "ST SHOPIFY Stashtea.com" #TODO does this need to be set
bpdnam: str = ""
bpdaddlig_0: str = "999 ANYSTREET"
bpdaddlig_1: str = ""
bpdaddlig_2: str = ""
bpdcty: str = "POMONA"
bpdsat: str = "CA"
bpdposcod: str = "91768"
bpdcry: str = "US"
ybpdweb: str = ""
ybpdtel: str = ""
ybpcweb: str = ""
yamaorder: str = ""
ygiftwrap: int = 0
invdtaamt_5: decimal.Decimal = decimal.Decimal()
invdtaamt_7: decimal.Decimal = decimal.Decimal()
invdtaamt_8: decimal.Decimal = decimal.Decimal()
yimport: int = 0
pjt: str = ""
yedinotes: str = ""
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to X3 import line
"""
return yamamotoyama.x3_imports.convert_to_strings(
[
"H",
self.sohnum,
self.sohtyp,
self.bpcord,
self.bpcinv,
self.bpcpyr,
self.bpaadd,
self.orddat,
self.cusordref,
self.cur,
self.alltyp,
self.salfcy,
self.stofcy,
self.pte,
self.vacbpr,
self.dlvpio,
self.mdl,
self.yshppaymth,
self.bpcnam,
self.bpdnam,
self.bpdaddlig_0,
self.bpdaddlig_1,
self.bpdaddlig_2,
self.bpdcty,
self.bpdsat,
self.bpdposcod,
self.bpdcry,
self.ybpdweb,
self.ybpdtel,
self.ybpcweb,
self.yamaorder,
self.ygiftwrap,
self.invdtaamt_5,
self.invdtaamt_7,
self.invdtaamt_8,
self.yimport,
self.pjt,
self.yedinotes
]
)
class SalesOrderDetailList:
"""
List of shipment details
"""
_details: typing.List[SalesOrderDetail]
_item_set: typing.Set[str]
def __init__(self):
self._details = []
self._item_set = set()
def append(
self,
salesorder_detail: SalesOrderDetail,
):
"""
Append
"""
itmref = salesorder_detail.itmref
qty = salesorder_detail.qty
if itmref in self._item_set:
for detail in self._details:
if detail.itmref == itmref:
detail.qty += qty
else:
self._item_set.add(itmref)
self._details.append(salesorder_detail)
def __iter__(self):
return iter(self._details)
class SalesOrder:
"""
sales order both header & details
"""
header: SalesOrderHeader
details: SalesOrderDetailList
def __init__(self):
self.header = SalesOrderHeader()
self.details = SalesOrderDetailList()
def append(
self,
salesorder_detail: SalesOrderDetail,
):
"""
Add detail information.
"""
self.details.append(salesorder_detail)
def output(self, import_file: typing.TextIO):
"""
Output entire order to import_file.
"""
output = functools.partial(
yamamotoyama.x3_imports.output_with_file, import_file
)
output(self.header.convert_to_strings())
for detail in self.details:
output(detail.convert_to_strings())
if __name__ == "__main__":
main()