#!/usr/bin/env python3 import csv import pprint import dataclasses import datetime import decimal import functools import pathlib import re import shutil import typing import records # type: ignore import yamamotoyama # type: ignore import yamamotoyama.x3_imports # type: ignore SFTP_HOST = "s-8ade4d252cc44c50b.server.transfer.us-west-1.amazonaws.com" SFTP_USERNAME = "yumiddleware2023" SSH_DIRECTORY = edi_940.THIS_DIRECTORY / "ssh" #TODO fixme SSH_KNOWN_HOSTS_FILE = str(SSH_DIRECTORY / "known_hosts") SSH_KEY_FILENAME = str(SSH_DIRECTORY / "id_ed25519") THIS_DIRECTORY = pathlib.Path(__file__).parent INCOMING_DIRECTORY = THIS_DIRECTORY / "incoming_orders" SOH_IMPORT_DIRECTORY = THIS_DIRECTORY / "to_import_SOH" def main(): retrieve_x12_edi_files() for file in INCOMING_DIRECTORY.iterdir(): process_files() shutil.move(file, INCOMING_DIRECTORY / "archive" / file.name) combine_zshpords() #TODO determine X3 processing schedule def sftp_server() -> paramiko.SFTPClient: with paramiko.SSHClient() as ssh_client: ssh_client.load_system_host_keys() ssh_client.load_host_keys(SSH_KNOWN_HOSTS_FILE) ssh_client.set_missing_host_key_policy(paramiko.client.RejectPolicy) ssh_client.connect( hostname=SFTP_HOST, username=SFTP_USERNAME, key_filename=SSH_KEY_FILENAME ) with ssh_client.open_sftp() as sftp_connection: yield sftp_connection def retrieve_x12_edi_files(): """ Connect to S3 bucket & pull down files. """ with sftp_server() as sftp_connection: sftp_connection.chdir("/yu-edi-transfer/source-logi/dev/ecomm-inbound")#TODO set to prod for filename in sftp_connection.listdir(): if edi_945.SOURCE_945_FILENAME_RE.match(filename): sftp_connection.get(filename, edi_945.X12_DIRECTORY / filename) new_filename = f"/yu-edi-transfer/source-logi/dev/ecomm-processed/{filename}"#TODO set to prod sftp_connection.rename(filename, new_filename) def combine_zshpords(): """ Collect all ZSHPORD imports into a single file for easy import. """ archive_directory = INCOMING_DIRECTORY / "archive" archive_directory.mkdir(exist_ok=True) with (INCOMING_DIRECTORY / "ZSHPORD.dat").open( "a", encoding="utf-8", newline="\n" ) as combined_import_file: for individual_import_filename in INCOMING_DIRECTORY.glob( "ZSHPORD_*.dat" ): with individual_import_filename.open( "r", encoding="utf-8", newline="\n" ) as individual_import_file: for line in individual_import_file: combined_import_file.write(line) shutil.move( individual_import_filename, archive_directory / individual_import_filename.name, ) def process_files(file): with open(file) as source_file: csv_reader = csv.reader(source_file) sales_order = SalesOrder() for num, row in enumerate(csv_reader): if num == 0: continue #skip header lines # pprint.pprint(row) if num == 1: #gather header information order_id = row[5] order_date = row[8] customer_name = row[9] # shipadd1 = row[9] # address information is not stored in X3 # shipadd2 = row[10] # shipcity = row[11] # shipstate = row[12] # shipzip = row[13] tracking = row[14] weight = row[16] ship_charge = row[20] taxes = "?" #TODO fixme ship_site = "?" #TODO fixme discount = "?" #TODO fixme sales_order.header.cusordref = order_id sales_order.header.orddat = datetime.datetime.strptime(order_date,'%m/%d/%Y').strftime('%Y%m%d') #TODO strftim this sales_order.header.stofcy = ship_site sales_order.header.bpdnam = customer_name sales_order.header.invdtaamt_5 = ship_charge sales_order.header.invdtaamt_7 = '0.33' #discount sales_order.header.invdtaamt_8 = '0.51'#taxes #gather line data line_product = row[0] line_qty = row[2] line_lot = row[3] line_price = row[19] sales_order.append( SalesOrderDetail( itmref=line_product, qty=line_qty, gropri=line_price ) ) time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") with yamamotoyama.x3_imports.open_import_file( SOH_IMPORT_DIRECTORY / f"ZSHPORD_{time_stamp}.dat" ) as import_file: sales_order.output(import_file) @dataclasses.dataclass class SalesOrderDetail: """ Information that goes on ann order detail line, taken from ZSHPORD template. """ itmref: str = "" itmrefbpc: str = "" itmdes: str = "" qty: int = 0 gropri: decimal.Decimal = decimal.Decimal() discrgval_1: decimal.Decimal = decimal.Decimal() zamaztax: decimal.Decimal = decimal.Decimal() star91: str = "" star92: str = "" def convert_to_strings(self) -> typing.List[str]: """ Convert to strings for X3 import writing. """ #self.qty = self.check_subdetail_qty() return yamamotoyama.x3_imports.convert_to_strings( [ "D", self.itmref, self.itmrefbpc, self.qty, self.gropri, self.discrgval_1, self.zamaztax, self.star91, self.star92, ] ) # def __eq__(self, item: typing.Any) -> bool: # """ # Test for equality # """ # if isinstance(item, str): # return self.itmref == item # if isinstance(item, SalesOrderDetail): # return self.itmref == item.itmref # return False # def fill(self): # """ # Set soplin & itmdes from itmref & sohnum # """ # def get() -> records.Record: # with yamamotoyama.get_connection() as database: # how_many = ( # database.query( # """ # select # count(*) as [how_many] # from [PROD].[SORDERP] as [SOP] # where # [SOP].[SOHNUM_0] = :sohnum # and [SOP].[ITMREF_0] = :itmref # """, # sohnum=self.sohnum, # itmref=self.itmref, # ) # .first() # .how_many # ) # if how_many == 1: # return database.query( # """ # select top 1 # [SOP].[SOPLIN_0] # ,[SOP].[ITMDES1_0] # ,[SOP].[SAU_0] # from [PROD].[SORDERP] as [SOP] # where # [SOP].[SOHNUM_0] = :sohnum # and [SOP].[ITMREF_0] = :itmref # order by # [SOP].[SOPLIN_0] # """, # sohnum=self.sohnum, # itmref=self.itmref, # ).first() # else: # emailtext = str(self.sohnum +' '+str(self.itmref)) # msg.attach(MIMEText(emailtext, 'plain')) # with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp: # smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n typing.List[str]: """ Convert to X3 import line """ return yamamotoyama.x3_imports.convert_to_strings( [ "H", self.sohnum, self.sohtyp, self.bpcord, self.bpcinv, self.bpcpyr, self.bpaadd, self.orddat, self.cusordref, self.cur, self.alltyp, self.salfcy, self.stofcy, self.pte, self.vacbpr, self.dlvpio, self.mdl, self.yshppaymth, self.bpcnam, self.bpdnam, self.bpdaddlig_0, self.bpdaddlig_1, self.bpdaddlig_2, self.bpdcty, self.bpdsat, self.bpdposcod, self.bpdcry, self.ybpdweb, self.ybpdtel, self.ybpcweb, self.yamaorder, self.ygiftwrap, self.invdtaamt_5, self.invdtaamt_7, self.invdtaamt_8, self.yimport, self.pjt, self.yedinotes ] ) class SalesOrderDetailList: """ List of shipment details """ _details: typing.List[SalesOrderDetail] _item_set: typing.Set[str] def __init__(self): self._details = [] self._item_set = set() def append( self, salesorder_detail: SalesOrderDetail, ): """ Append """ itmref = salesorder_detail.itmref # if itmref in self._item_set: # for detail in self._details: # if detail == itmref: # detail.subdetails.append(shipment_subdetail) # return self._item_set.add(itmref) #salesorder_detail.fill() #salesorder_detail.append(shipment_subdetail) self._details.append(salesorder_detail) def __iter__(self): return iter(self._details) class SalesOrder: """ sales order both header & details """ header: SalesOrderHeader details: SalesOrderDetailList def __init__(self): self.header = SalesOrderHeader() self.details = SalesOrderDetailList() def append( self, salesorder_detail: SalesOrderDetail, ): """ Add detail information. """ self.details.append(salesorder_detail) def output(self, import_file: typing.TextIO): """ Output entire order to import_file. """ output = functools.partial( yamamotoyama.x3_imports.output_with_file, import_file ) output(self.header.convert_to_strings()) for detail in self.details: output(detail.convert_to_strings()) if __name__ == "__main__": main()