#!/usr/bin/env python3 """ Pull shipment files from the Stash AWS FTP. After copying file, move file to archive folder on FTP. Create ZSHPORD import files, which X3 will consume on a schedule via ZECMSL recurring task. Copy file to the archive and then pass it to the shipment maker directory. TODO: Source needs to send us real data for final adjustments (site, discount, multilot, etc.) """ import csv import pprint import dataclasses import datetime import decimal import functools import pathlib import re import shutil import typing import paramiko import decimal import records # type: ignore import yamamotoyama # type: ignore import yamamotoyama.x3_imports # type: ignore THIS_DIRECTORY = pathlib.Path(__file__).parent SFTP_HOST = "s-8ade4d252cc44c50b.server.transfer.us-west-1.amazonaws.com" SFTP_USERNAME = "yumiddleware2023" SSH_DIRECTORY = THIS_DIRECTORY / "ssh" #TODO fixme SSH_KNOWN_HOSTS_FILE = str(SSH_DIRECTORY / "known_hosts") SSH_KEY_FILENAME = str(SSH_DIRECTORY / "id_ed25519") INCOMING_DIRECTORY = THIS_DIRECTORY / "incoming_orders" SHIPMENTS_DIRECTORY = THIS_DIRECTORY / "incoming_shipments" SOH_IMPORT_DIRECTORY = THIS_DIRECTORY / "to_import_SOH" def main(): #retrieve_x12_edi_files()#TODO remove this as it's handled by the earlier process for file in INCOMING_DIRECTORY.iterdir(): if file.name[-4:] != '.csv': continue else: process_files(file) shutil.move(file, SHIPMENTS_DIRECTORY / file.name) # archives are in the shipping folder combine_zshpords() def sftp_server() -> paramiko.SFTPClient: with paramiko.SSHClient() as ssh_client: ssh_client.load_system_host_keys() ssh_client.load_host_keys(SSH_KNOWN_HOSTS_FILE) ssh_client.set_missing_host_key_policy(paramiko.client.RejectPolicy) ssh_client.connect( hostname=SFTP_HOST, username=SFTP_USERNAME, key_filename=SSH_KEY_FILENAME ) with ssh_client.open_sftp() as sftp_connection: yield sftp_connection def retrieve_x12_edi_files(): """ Connect to S3 bucket & pull down files. """ with paramiko.SSHClient() as ssh_client: ssh_client.load_system_host_keys() ssh_client.load_host_keys(SSH_KNOWN_HOSTS_FILE) ssh_client.set_missing_host_key_policy(paramiko.client.RejectPolicy) ssh_client.connect( hostname=SFTP_HOST, username=SFTP_USERNAME, key_filename=SSH_KEY_FILENAME ) with ssh_client.open_sftp() as sftp_connection: sftp_connection.chdir("/yu-edi-transfer/source-logi/dev/ecomm-inbound")#TODO set to prod for filename in sftp_connection.listdir(): #if edi_945.SOURCE_945_FILENAME_RE.match(filename):#TODO fixme sftp_connection.get(filename, INCOMING_DIRECTORY / filename) new_filename = f"/yu-edi-transfer/source-logi/dev/ecomm-processed/{filename}"#TODO set to prod sftp_connection.rename(filename, new_filename) def combine_zshpords(): """ Collect all ZSHPORD imports into a single file for easy import. """ archive_directory = SOH_IMPORT_DIRECTORY / "archive" archive_directory.mkdir(exist_ok=True) with (SOH_IMPORT_DIRECTORY / "ZSHPORD.dat").open( "w", encoding="utf-8", newline="\n" ) as combined_import_file: for individual_import_filename in SOH_IMPORT_DIRECTORY.glob( "ZSHPORD_*.dat" ): with individual_import_filename.open( "r", encoding="utf-8", newline="\n" ) as individual_import_file: for line in individual_import_file: combined_import_file.write(line) shutil.move( individual_import_filename, archive_directory / individual_import_filename.name, ) def process_files(file): #I am assuming I am getting a sorted csv file by order number and line id from Source with open(file, encoding='utf8') as source_file: csv_reader = csv.reader(source_file) sales_order = SalesOrder() previous_order = '' current_order = '' for num, row in enumerate(csv_reader): if num == 0: continue #skip header lines if num >= 1: #gather header information current_order = row[6] if current_order != previous_order: time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") if sales_order.header.cusordref != '': with yamamotoyama.x3_imports.open_import_file( SOH_IMPORT_DIRECTORY / f"ZSHPORD_{previous_order}_{time_stamp}_{sales_order.header.cusordref}.dat" ) as import_file: sales_order.output(import_file) sales_order = SalesOrder() previous_order = current_order pprint.pprint(current_order) shopify_order_info = get_details_from_shopify(current_order) shopify_line_dict = create_shopify_dict(shopify_order_info) for entry in shopify_line_dict: sales_order.append( SalesOrderDetail( itmref=shopify_line_dict[entry]['sku'], qty=int(shopify_line_dict[entry]['quantity']), gropri=shopify_line_dict[entry]['price'] ) ) ship_site = row[0] order_id = row[6] order_date = row[9] customer_name = row[10] # shipadd1 = row[9] # address information is not stored in X3 # shipadd2 = row[10] # shipcity = row[11] # shipstate = row[12] # shipzip = row[13] tracking = row[16] weight = row[18] taxes = shopify_order_info[0]['current_total_tax']#row[22] ship_charge = shopify_order_info[0]['shipping_lines__price']#row[21] discount = shopify_order_info[0]['current_total_discounts']#row[24] sales_order.header.cusordref = order_id sales_order.header.orddat = datetime.datetime.strptime(order_date,'%m/%d/%Y %I:%M:%S %p').strftime('%Y%m%d') # what comes from SL #sales_order.header.orddat = datetime.datetime.strptime(order_date,'%m/%d/%Y %H:%M').strftime('%Y%m%d') #default when we sort in Excel sales_order.header.stofcy = ship_site sales_order.header.bpdnam = customer_name sales_order.header.invdtaamt_5 = ship_charge sales_order.header.invdtaamt_7 = discount sales_order.header.invdtaamt_8 = taxes #gather line data # line_product = row[1] # line_qty = int(row[3]) # line_lot = row[4] # line_price = row[20] # shopify_item_data = get_item_from_shopify_order(shopify_line_dict, line_product, line_qty) # shopify_line_dict = remove_item_from_shopify_order(shopify_line_dict, shopify_item_data['sku'], shopify_item_data['quantity'],shopify_item_data['price']) time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") with yamamotoyama.x3_imports.open_import_file( SOH_IMPORT_DIRECTORY / f"ZSHPORD_{current_order}_{time_stamp}_{sales_order.header.cusordref}.dat" ) as import_file: sales_order.output(import_file) def create_shopify_dict(shopify_record): order_info = {} for record in shopify_record: sku = record['sku'] qty = record['quantity'] price = record['price'] key = sku + '_' + str(price) if key in order_info: order_info[key]['quantity'] += qty else: order_info[key] = { 'sku':record['sku'], 'quantity':record['quantity'], 'price':record['price'], } return order_info def remove_item_from_shopify_order(line_item_dict, product, qty, price): #after using some or all of a line, decrement or remove it for line in line_item_dict: line_info = line_item_dict[line] qty_to_remove = line_info['quantity'] if line_info['sku'] == product and line_info['quantity'] > 0 and line_info['price'] == price: if qty_to_remove > line_info['quantity']: #more qty is required than on the line pprint.pprint('too much to remove from 1 line') remainder = qty_to_remove - line_info['quantity'] line_item_dict = remove_item_from_shopify_order(line_item_dict, product, remainder, price) else: line_info['quantity'] -= qty_to_remove return line_item_dict def get_item_from_shopify_order(line_item_dict, product, qty): for line in line_item_dict: line_info = line_item_dict[line] if line_info == product and line_info['quantity'] > 0: return line_info return None def get_details_from_shopify(order): with yamamotoyama.get_connection() as db_connection: return db_connection.query( """ select [ecommerce_shipped_orders].[order_number] ,[ecommerce_shipped_orders].[current_total_tax] ,[ecommerce_shipped_orders].[current_total_discounts] ,[ecommerce_shipped_orders].[shipping_lines__price] ,[ecommerce_shipped_order_lines].[sku] ,[ecommerce_shipped_order_lines].[quantity] ,[ecommerce_shipped_order_lines].[price] from [staging].[dbo].[ecommerce_shipped_orders] left join [staging].[dbo].[ecommerce_shipped_order_lines] on [ecommerce_shipped_orders].[id] = [ecommerce_shipped_order_lines].[id] where order_number = :shopifyorder """, shopifyorder=order, ).all() @dataclasses.dataclass class SalesOrderDetail: """ Information that goes on ann order detail line, taken from ZSHPORD template. """ itmref: str = "" itmrefbpc: str = "" itmdes: str = "" qty: int = 0 gropri: decimal.Decimal = decimal.Decimal() discrgval_1: decimal.Decimal = decimal.Decimal() zamaztax: decimal.Decimal = decimal.Decimal() star91: str = "" star92: str = "" def convert_to_strings(self) -> typing.List[str]: """ Convert to strings for X3 import writing. """ #self.qty = self.check_subdetail_qty() return yamamotoyama.x3_imports.convert_to_strings( [ "D", self.itmref, self.itmrefbpc, self.qty, self.gropri, self.discrgval_1, self.zamaztax, self.star91, self.star92, ] ) # def __eq__(self, item: typing.Any) -> bool: # """ # Test for equality # """ # if isinstance(item, str): # return self.itmref == item # if isinstance(item, SalesOrderDetail): # return self.itmref == item.itmref # return False # def fill(self): # """ # Set soplin & itmdes from itmref & sohnum # """ # def get() -> records.Record: # with yamamotoyama.get_connection() as database: # how_many = ( # database.query( # """ # select # count(*) as [how_many] # from [PROD].[SORDERP] as [SOP] # where # [SOP].[SOHNUM_0] = :sohnum # and [SOP].[ITMREF_0] = :itmref # """, # sohnum=self.sohnum, # itmref=self.itmref, # ) # .first() # .how_many # ) # if how_many == 1: # return database.query( # """ # select top 1 # [SOP].[SOPLIN_0] # ,[SOP].[ITMDES1_0] # ,[SOP].[SAU_0] # from [PROD].[SORDERP] as [SOP] # where # [SOP].[SOHNUM_0] = :sohnum # and [SOP].[ITMREF_0] = :itmref # order by # [SOP].[SOPLIN_0] # """, # sohnum=self.sohnum, # itmref=self.itmref, # ).first() # else: # emailtext = str(self.sohnum +' '+str(self.itmref)) # msg.attach(MIMEText(emailtext, 'plain')) # with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp: # smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n typing.List[str]: """ Convert to X3 import line """ return yamamotoyama.x3_imports.convert_to_strings( [ "H", self.sohnum, self.sohtyp, self.bpcord, self.bpcinv, self.bpcpyr, self.bpaadd, self.orddat, self.cusordref, self.cur, self.alltyp, self.salfcy, self.stofcy, self.pte, self.vacbpr, self.dlvpio, self.mdl, self.yshppaymth, self.bpcnam, self.bpdnam, self.bpdaddlig_0, self.bpdaddlig_1, self.bpdaddlig_2, self.bpdcty, self.bpdsat, self.bpdposcod, self.bpdcry, self.ybpdweb, self.ybpdtel, self.ybpcweb, self.yamaorder, self.ygiftwrap, self.invdtaamt_5, self.invdtaamt_7, self.invdtaamt_8, self.yimport, self.pjt, self.yedinotes ] ) class SalesOrderDetailList: """ List of shipment details """ _details: typing.List[SalesOrderDetail] _item_set: typing.Set[str] def __init__(self): self._details = [] self._item_set = set() def append( self, salesorder_detail: SalesOrderDetail, ): """ Append """ itmref = salesorder_detail.itmref qty = salesorder_detail.qty gropri = salesorder_detail.gropri set_item = itmref + '_' + str(gropri) if set_item in self._item_set: for detail in self._details: if detail.itmref+'_'+str(detail.gropri) == set_item: detail.qty += qty else: self._item_set.add(set_item) self._details.append(salesorder_detail) def __iter__(self): return iter(self._details) class SalesOrder: """ sales order both header & details """ header: SalesOrderHeader details: SalesOrderDetailList def __init__(self): self.header = SalesOrderHeader() self.details = SalesOrderDetailList() def append( self, salesorder_detail: SalesOrderDetail, ): """ Add detail information. """ self.details.append(salesorder_detail) def output(self, import_file: typing.TextIO): """ Output entire order to import_file. """ output = functools.partial( yamamotoyama.x3_imports.output_with_file, import_file ) output(self.header.convert_to_strings()) for detail in self.details: output(detail.convert_to_strings()) if __name__ == "__main__": main()