494 lines
18 KiB
Python
494 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Pull shipment files from the Stash AWS FTP.
|
|
After copying file, move file to archive folder on FTP.
|
|
Create ZSHPORD import files, which X3 will consume on a schedule via ZECMSL recurring task.
|
|
Copy file to the archive and then pass it to the shipment maker directory.
|
|
|
|
TODO: Source needs to send us real data for final adjustments (site, discount, multilot, etc.)
|
|
"""
|
|
import csv
|
|
import pprint
|
|
import dataclasses
|
|
import datetime
|
|
import decimal
|
|
import functools
|
|
import pathlib
|
|
import re
|
|
import shutil
|
|
import typing
|
|
import paramiko
|
|
import decimal
|
|
|
|
import records # type: ignore
|
|
|
|
import yamamotoyama # type: ignore
|
|
import yamamotoyama.x3_imports # type: ignore
|
|
|
|
THIS_DIRECTORY = pathlib.Path(__file__).parent
|
|
|
|
SFTP_HOST = "s-8ade4d252cc44c50b.server.transfer.us-west-1.amazonaws.com"
|
|
SFTP_USERNAME = "yumiddleware2023"
|
|
SSH_DIRECTORY = THIS_DIRECTORY / "ssh" #TODO fixme
|
|
SSH_KNOWN_HOSTS_FILE = str(SSH_DIRECTORY / "known_hosts")
|
|
SSH_KEY_FILENAME = str(SSH_DIRECTORY / "id_ed25519")
|
|
|
|
INCOMING_DIRECTORY = THIS_DIRECTORY / "incoming_orders"
|
|
SHIPMENTS_DIRECTORY = THIS_DIRECTORY / "incoming_shipments"
|
|
SOH_IMPORT_DIRECTORY = THIS_DIRECTORY / "to_import_SOH"
|
|
|
|
def main():
|
|
#retrieve_x12_edi_files()#TODO remove this as it's handled by the earlier process
|
|
for file in INCOMING_DIRECTORY.iterdir():
|
|
if file.name[-4:] != '.csv':
|
|
continue
|
|
else:
|
|
process_files(file)
|
|
shutil.move(file, SHIPMENTS_DIRECTORY / file.name)
|
|
# archives are in the shipping folder
|
|
combine_zshpords()
|
|
|
|
def sftp_server() -> paramiko.SFTPClient:
|
|
with paramiko.SSHClient() as ssh_client:
|
|
ssh_client.load_system_host_keys()
|
|
ssh_client.load_host_keys(SSH_KNOWN_HOSTS_FILE)
|
|
ssh_client.set_missing_host_key_policy(paramiko.client.RejectPolicy)
|
|
ssh_client.connect(
|
|
hostname=SFTP_HOST, username=SFTP_USERNAME, key_filename=SSH_KEY_FILENAME
|
|
)
|
|
with ssh_client.open_sftp() as sftp_connection:
|
|
yield sftp_connection
|
|
|
|
def retrieve_x12_edi_files():
|
|
"""
|
|
Connect to S3 bucket & pull down files.
|
|
"""
|
|
with paramiko.SSHClient() as ssh_client:
|
|
ssh_client.load_system_host_keys()
|
|
ssh_client.load_host_keys(SSH_KNOWN_HOSTS_FILE)
|
|
ssh_client.set_missing_host_key_policy(paramiko.client.RejectPolicy)
|
|
ssh_client.connect(
|
|
hostname=SFTP_HOST, username=SFTP_USERNAME, key_filename=SSH_KEY_FILENAME
|
|
)
|
|
with ssh_client.open_sftp() as sftp_connection:
|
|
sftp_connection.chdir("/yu-edi-transfer/source-logi/dev/ecomm-inbound")#TODO set to prod
|
|
for filename in sftp_connection.listdir():
|
|
#if edi_945.SOURCE_945_FILENAME_RE.match(filename):#TODO fixme
|
|
sftp_connection.get(filename, INCOMING_DIRECTORY / filename)
|
|
new_filename = f"/yu-edi-transfer/source-logi/dev/ecomm-processed/{filename}"#TODO set to prod
|
|
sftp_connection.rename(filename, new_filename)
|
|
|
|
def combine_zshpords():
|
|
"""
|
|
Collect all ZSHPORD imports into a single file for easy import.
|
|
"""
|
|
archive_directory = SOH_IMPORT_DIRECTORY / "archive"
|
|
archive_directory.mkdir(exist_ok=True)
|
|
with (SOH_IMPORT_DIRECTORY / "ZSHPORD.dat").open(
|
|
"w", encoding="utf-8", newline="\n"
|
|
) as combined_import_file:
|
|
for individual_import_filename in SOH_IMPORT_DIRECTORY.glob(
|
|
"ZSHPORD_*.dat"
|
|
):
|
|
with individual_import_filename.open(
|
|
"r", encoding="utf-8", newline="\n"
|
|
) as individual_import_file:
|
|
for line in individual_import_file:
|
|
combined_import_file.write(line)
|
|
shutil.move(
|
|
individual_import_filename,
|
|
archive_directory / individual_import_filename.name,
|
|
)
|
|
|
|
def process_files(file): #I am assuming I am getting a sorted csv file by order number and line id from Source
|
|
with open(file, encoding='utf8') as source_file:
|
|
csv_reader = csv.reader(source_file)
|
|
sales_order = SalesOrder()
|
|
previous_order = ''
|
|
current_order = ''
|
|
for num, row in enumerate(csv_reader):
|
|
if num == 0:
|
|
continue #skip header lines
|
|
if num >= 1: #gather header information
|
|
current_order = row[6]
|
|
if current_order != previous_order:
|
|
time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
if sales_order.header.cusordref != '':
|
|
with yamamotoyama.x3_imports.open_import_file(
|
|
SOH_IMPORT_DIRECTORY / f"ZSHPORD_{previous_order}_{time_stamp}_{sales_order.header.cusordref}.dat"
|
|
) as import_file:
|
|
sales_order.output(import_file)
|
|
sales_order = SalesOrder()
|
|
previous_order = current_order
|
|
pprint.pprint(current_order)
|
|
shopify_order_info = get_details_from_shopify(current_order)
|
|
shopify_line_dict = create_shopify_dict(shopify_order_info)
|
|
for entry in shopify_line_dict:
|
|
sales_order.append(
|
|
SalesOrderDetail(
|
|
itmref=shopify_line_dict[entry]['sku'],
|
|
qty=int(shopify_line_dict[entry]['quantity']),
|
|
gropri=shopify_line_dict[entry]['price']
|
|
)
|
|
)
|
|
ship_site = row[0]
|
|
order_id = row[6]
|
|
order_date = row[9]
|
|
customer_name = row[10]
|
|
# shipadd1 = row[9] # address information is not stored in X3
|
|
# shipadd2 = row[10]
|
|
# shipcity = row[11]
|
|
# shipstate = row[12]
|
|
# shipzip = row[13]
|
|
tracking = row[16]
|
|
weight = row[18]
|
|
taxes = shopify_order_info[0]['current_total_tax']#row[22]
|
|
ship_charge = shopify_order_info[0]['shipping_lines__price']#row[21]
|
|
discount = shopify_order_info[0]['current_total_discounts']#row[24]
|
|
sales_order.header.cusordref = order_id
|
|
sales_order.header.orddat = datetime.datetime.strptime(order_date,'%m/%d/%Y %I:%M:%S %p').strftime('%Y%m%d') # what comes from SL
|
|
#sales_order.header.orddat = datetime.datetime.strptime(order_date,'%m/%d/%Y %H:%M').strftime('%Y%m%d') #default when we sort in Excel
|
|
sales_order.header.stofcy = ship_site
|
|
sales_order.header.bpdnam = customer_name
|
|
sales_order.header.invdtaamt_5 = ship_charge
|
|
sales_order.header.invdtaamt_7 = discount
|
|
sales_order.header.invdtaamt_8 = taxes
|
|
|
|
#gather line data
|
|
# line_product = row[1]
|
|
# line_qty = int(row[3])
|
|
# line_lot = row[4]
|
|
# line_price = row[20]
|
|
# shopify_item_data = get_item_from_shopify_order(shopify_line_dict, line_product, line_qty)
|
|
|
|
# shopify_line_dict = remove_item_from_shopify_order(shopify_line_dict, shopify_item_data['sku'], shopify_item_data['quantity'],shopify_item_data['price'])
|
|
time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
with yamamotoyama.x3_imports.open_import_file(
|
|
SOH_IMPORT_DIRECTORY / f"ZSHPORD_{current_order}_{time_stamp}_{sales_order.header.cusordref}.dat"
|
|
) as import_file:
|
|
sales_order.output(import_file)
|
|
|
|
|
|
def create_shopify_dict(shopify_record):
|
|
order_info = {}
|
|
for record in shopify_record:
|
|
sku = record['sku']
|
|
qty = record['quantity']
|
|
price = record['price']
|
|
key = sku + '_' + str(price)
|
|
if key in order_info:
|
|
order_info[key]['quantity'] += qty
|
|
else:
|
|
order_info[key] = {
|
|
'sku':record['sku'],
|
|
'quantity':record['quantity'],
|
|
'price':record['price'],
|
|
}
|
|
return order_info
|
|
|
|
|
|
def remove_item_from_shopify_order(line_item_dict, product, qty, price):
|
|
#after using some or all of a line, decrement or remove it
|
|
for line in line_item_dict:
|
|
line_info = line_item_dict[line]
|
|
qty_to_remove = line_info['quantity']
|
|
if line_info['sku'] == product and line_info['quantity'] > 0 and line_info['price'] == price:
|
|
if qty_to_remove > line_info['quantity']: #more qty is required than on the line
|
|
pprint.pprint('too much to remove from 1 line')
|
|
remainder = qty_to_remove - line_info['quantity']
|
|
line_item_dict = remove_item_from_shopify_order(line_item_dict, product, remainder, price)
|
|
else:
|
|
line_info['quantity'] -= qty_to_remove
|
|
return line_item_dict
|
|
|
|
|
|
def get_item_from_shopify_order(line_item_dict, product, qty):
|
|
for line in line_item_dict:
|
|
line_info = line_item_dict[line]
|
|
if line_info == product and line_info['quantity'] > 0:
|
|
return line_info
|
|
return None
|
|
|
|
|
|
def get_details_from_shopify(order):
|
|
with yamamotoyama.get_connection() as db_connection:
|
|
return db_connection.query(
|
|
"""
|
|
select
|
|
[ecommerce_shipped_orders].[order_number]
|
|
,[ecommerce_shipped_orders].[current_total_tax]
|
|
,[ecommerce_shipped_orders].[current_total_discounts]
|
|
,[ecommerce_shipped_orders].[shipping_lines__price]
|
|
,[ecommerce_shipped_order_lines].[sku]
|
|
,[ecommerce_shipped_order_lines].[quantity]
|
|
,[ecommerce_shipped_order_lines].[price]
|
|
from [staging].[dbo].[ecommerce_shipped_orders]
|
|
left join [staging].[dbo].[ecommerce_shipped_order_lines]
|
|
on [ecommerce_shipped_orders].[id] = [ecommerce_shipped_order_lines].[id]
|
|
where order_number = :shopifyorder
|
|
""",
|
|
shopifyorder=order,
|
|
).all()
|
|
|
|
@dataclasses.dataclass
|
|
class SalesOrderDetail:
|
|
"""
|
|
Information that goes on ann order detail line, taken from ZSHPORD template.
|
|
"""
|
|
|
|
itmref: str = ""
|
|
itmrefbpc: str = ""
|
|
itmdes: str = ""
|
|
qty: int = 0
|
|
gropri: decimal.Decimal = decimal.Decimal()
|
|
discrgval_1: decimal.Decimal = decimal.Decimal()
|
|
zamaztax: decimal.Decimal = decimal.Decimal()
|
|
star91: str = ""
|
|
star92: str = ""
|
|
|
|
def convert_to_strings(self) -> typing.List[str]:
|
|
"""
|
|
Convert to strings for X3 import writing.
|
|
"""
|
|
#self.qty = self.check_subdetail_qty()
|
|
return yamamotoyama.x3_imports.convert_to_strings(
|
|
[
|
|
"D",
|
|
self.itmref,
|
|
self.itmrefbpc,
|
|
self.qty,
|
|
self.gropri,
|
|
self.discrgval_1,
|
|
self.zamaztax,
|
|
self.star91,
|
|
self.star92,
|
|
]
|
|
)
|
|
|
|
# def __eq__(self, item: typing.Any) -> bool:
|
|
# """
|
|
# Test for equality
|
|
# """
|
|
# if isinstance(item, str):
|
|
# return self.itmref == item
|
|
# if isinstance(item, SalesOrderDetail):
|
|
# return self.itmref == item.itmref
|
|
# return False
|
|
|
|
# def fill(self):
|
|
# """
|
|
# Set soplin & itmdes from itmref & sohnum
|
|
# """
|
|
|
|
# def get() -> records.Record:
|
|
# with yamamotoyama.get_connection() as database:
|
|
# how_many = (
|
|
# database.query(
|
|
# """
|
|
# select
|
|
# count(*) as [how_many]
|
|
# from [PROD].[SORDERP] as [SOP]
|
|
# where
|
|
# [SOP].[SOHNUM_0] = :sohnum
|
|
# and [SOP].[ITMREF_0] = :itmref
|
|
# """,
|
|
# sohnum=self.sohnum,
|
|
# itmref=self.itmref,
|
|
# )
|
|
# .first()
|
|
# .how_many
|
|
# )
|
|
# if how_many == 1:
|
|
# return database.query(
|
|
# """
|
|
# select top 1
|
|
# [SOP].[SOPLIN_0]
|
|
# ,[SOP].[ITMDES1_0]
|
|
# ,[SOP].[SAU_0]
|
|
# from [PROD].[SORDERP] as [SOP]
|
|
# where
|
|
# [SOP].[SOHNUM_0] = :sohnum
|
|
# and [SOP].[ITMREF_0] = :itmref
|
|
# order by
|
|
# [SOP].[SOPLIN_0]
|
|
# """,
|
|
# sohnum=self.sohnum,
|
|
# itmref=self.itmref,
|
|
# ).first()
|
|
# else:
|
|
# emailtext = str(self.sohnum +' '+str(self.itmref))
|
|
# msg.attach(MIMEText(emailtext, 'plain'))
|
|
# with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
|
|
# smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
|
|
# smtp.send_message(msg)
|
|
# raise NotImplementedError # TODO
|
|
|
|
# result = get()
|
|
# self.soplin = result.SOPLIN_0
|
|
# self.itmdes = result.ITMDES1_0
|
|
# self.sau = result.SAU_0
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class SalesOrderHeader:
|
|
"""
|
|
Information that goes on an order header, taken from ZSHPORD template.
|
|
"""
|
|
|
|
sohnum: str = ""
|
|
sohtyp: str = "WEB"
|
|
bpcord: str = "STSHOPIFY" #TODO are they handling YU?
|
|
bpcinv: str = "STSHOPIFY" #TODO are they handling YU?
|
|
bpcpyr: str = "STSHOPIFY" #TODO are they handling YU?
|
|
bpaadd: str = "BL001"
|
|
orddat: datetime.date = datetime.date(1753, 1, 1)
|
|
cusordref: str = ""
|
|
cur: str = "USD"
|
|
alltyp: int = 2 #Detailed
|
|
salfcy: str = "ECS" #TODO are they handling YU?
|
|
stofcy: str = "" #TODO need to be set from file
|
|
pte: str = "USPREPAY" #TODO needs checking
|
|
vacbpr: str = "NTX" #TODO needs checking
|
|
dlvpio: int = 1 #normal
|
|
mdl: str = "GRN"#TODO any way to tell how they were sent?
|
|
yshppaymth: int = 1 #prepaid freight
|
|
bpcnam: str = "ST SHOPIFY Stashtea.com" #TODO does this need to be set
|
|
bpdnam: str = ""
|
|
bpdaddlig_0: str = "999 ANYSTREET"
|
|
bpdaddlig_1: str = ""
|
|
bpdaddlig_2: str = ""
|
|
bpdcty: str = "POMONA"
|
|
bpdsat: str = "CA"
|
|
bpdposcod: str = "91768"
|
|
bpdcry: str = "US"
|
|
ybpdweb: str = ""
|
|
ybpdtel: str = ""
|
|
ybpcweb: str = ""
|
|
yamaorder: str = ""
|
|
ygiftwrap: int = 0
|
|
invdtaamt_5: decimal.Decimal = decimal.Decimal()
|
|
invdtaamt_7: decimal.Decimal = decimal.Decimal()
|
|
invdtaamt_8: decimal.Decimal = decimal.Decimal()
|
|
yimport: int = 0
|
|
pjt: str = ""
|
|
yedinotes: str = ""
|
|
|
|
def convert_to_strings(self) -> typing.List[str]:
|
|
"""
|
|
Convert to X3 import line
|
|
"""
|
|
return yamamotoyama.x3_imports.convert_to_strings(
|
|
[
|
|
"H",
|
|
self.sohnum,
|
|
self.sohtyp,
|
|
self.bpcord,
|
|
self.bpcinv,
|
|
self.bpcpyr,
|
|
self.bpaadd,
|
|
self.orddat,
|
|
self.cusordref,
|
|
self.cur,
|
|
self.alltyp,
|
|
self.salfcy,
|
|
self.stofcy,
|
|
self.pte,
|
|
self.vacbpr,
|
|
self.dlvpio,
|
|
self.mdl,
|
|
self.yshppaymth,
|
|
self.bpcnam,
|
|
self.bpdnam,
|
|
self.bpdaddlig_0,
|
|
self.bpdaddlig_1,
|
|
self.bpdaddlig_2,
|
|
self.bpdcty,
|
|
self.bpdsat,
|
|
self.bpdposcod,
|
|
self.bpdcry,
|
|
self.ybpdweb,
|
|
self.ybpdtel,
|
|
self.ybpcweb,
|
|
self.yamaorder,
|
|
self.ygiftwrap,
|
|
self.invdtaamt_5,
|
|
self.invdtaamt_7,
|
|
self.invdtaamt_8,
|
|
self.yimport,
|
|
self.pjt,
|
|
self.yedinotes
|
|
]
|
|
)
|
|
|
|
|
|
class SalesOrderDetailList:
|
|
"""
|
|
List of shipment details
|
|
"""
|
|
|
|
_details: typing.List[SalesOrderDetail]
|
|
_item_set: typing.Set[str]
|
|
|
|
def __init__(self):
|
|
self._details = []
|
|
self._item_set = set()
|
|
|
|
def append(
|
|
self,
|
|
salesorder_detail: SalesOrderDetail,
|
|
):
|
|
"""
|
|
Append
|
|
"""
|
|
itmref = salesorder_detail.itmref
|
|
qty = salesorder_detail.qty
|
|
gropri = salesorder_detail.gropri
|
|
set_item = itmref + '_' + str(gropri)
|
|
if set_item in self._item_set:
|
|
for detail in self._details:
|
|
if detail.itmref+'_'+str(detail.gropri) == set_item:
|
|
detail.qty += qty
|
|
else:
|
|
self._item_set.add(set_item)
|
|
self._details.append(salesorder_detail)
|
|
|
|
def __iter__(self):
|
|
return iter(self._details)
|
|
|
|
|
|
class SalesOrder:
|
|
"""
|
|
sales order both header & details
|
|
"""
|
|
|
|
header: SalesOrderHeader
|
|
details: SalesOrderDetailList
|
|
|
|
def __init__(self):
|
|
self.header = SalesOrderHeader()
|
|
self.details = SalesOrderDetailList()
|
|
|
|
def append(
|
|
self,
|
|
salesorder_detail: SalesOrderDetail,
|
|
):
|
|
"""
|
|
Add detail information.
|
|
"""
|
|
self.details.append(salesorder_detail)
|
|
|
|
def output(self, import_file: typing.TextIO):
|
|
"""
|
|
Output entire order to import_file.
|
|
"""
|
|
output = functools.partial(
|
|
yamamotoyama.x3_imports.output_with_file, import_file
|
|
)
|
|
output(self.header.convert_to_strings())
|
|
for detail in self.details:
|
|
output(detail.convert_to_strings())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|