118 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			118 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Python
		
	
	
| #!/usr/bin/env python3
 | |
| """
 | |
| Pull shipment files from the Stash AWS FTP.
 | |
| load shipping data into analytics database
 | |
| """
 | |
| import csv
 | |
| import pprint
 | |
| import dataclasses
 | |
| import datetime
 | |
| import decimal
 | |
| import functools
 | |
| import pathlib
 | |
| import re
 | |
| import shutil
 | |
| import typing
 | |
| import paramiko
 | |
| 
 | |
| import records  # type: ignore
 | |
| 
 | |
| import yamamotoyama  # type: ignore
 | |
| import yamamotoyama.x3_imports  # type: ignore
 | |
| 
 | |
| THIS_DIRECTORY = pathlib.Path(__file__).parent
 | |
| 
 | |
| SFTP_HOST = "s-8ade4d252cc44c50b.server.transfer.us-west-1.amazonaws.com"
 | |
| SFTP_USERNAME = "yumiddleware2023"
 | |
| SSH_DIRECTORY = THIS_DIRECTORY / "ssh" #TODO fixme
 | |
| SSH_KNOWN_HOSTS_FILE = str(SSH_DIRECTORY / "known_hosts")
 | |
| SSH_KEY_FILENAME = str(SSH_DIRECTORY / "id_ed25519")
 | |
| 
 | |
| INCOMING_DIRECTORY = THIS_DIRECTORY / "new_files_from_ftp"
 | |
| ORDER_DIRECTORY = THIS_DIRECTORY / "incoming_orders"
 | |
| ARCHIVE_DIRECTORY = THIS_DIRECTORY / "incoming_shipments" / "archive"
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     retrieve_x12_edi_files()#TODO uncomment
 | |
|     for file in INCOMING_DIRECTORY.iterdir():
 | |
|         if file.name[-4:] != '.csv':
 | |
|             continue
 | |
|         else:
 | |
|             process_files(file)
 | |
|             shutil.move(file, ARCHIVE_DIRECTORY / file.name)
 | |
| 
 | |
| 
 | |
| def sftp_server() -> paramiko.SFTPClient:
 | |
|     with paramiko.SSHClient() as ssh_client:
 | |
|         ssh_client.load_system_host_keys()
 | |
|         ssh_client.load_host_keys(SSH_KNOWN_HOSTS_FILE)
 | |
|         ssh_client.set_missing_host_key_policy(paramiko.client.RejectPolicy)
 | |
|         ssh_client.connect(
 | |
|             hostname=SFTP_HOST, username=SFTP_USERNAME, key_filename=SSH_KEY_FILENAME
 | |
|         )
 | |
|         with ssh_client.open_sftp() as sftp_connection:
 | |
|             yield sftp_connection
 | |
| 
 | |
| def retrieve_x12_edi_files():
 | |
|     """
 | |
|     Connect to S3 bucket & pull down files.
 | |
|     """
 | |
|     with paramiko.SSHClient() as ssh_client:
 | |
|         ssh_client.load_system_host_keys()
 | |
|         ssh_client.load_host_keys(SSH_KNOWN_HOSTS_FILE)
 | |
|         ssh_client.set_missing_host_key_policy(paramiko.client.RejectPolicy)
 | |
|         ssh_client.connect(
 | |
|             hostname=SFTP_HOST, username=SFTP_USERNAME, key_filename=SSH_KEY_FILENAME
 | |
|         )
 | |
|         with ssh_client.open_sftp() as sftp_connection:
 | |
|             sftp_connection.chdir("/yu-edi-transfer/source-logi/prod/ecomm-inbound")
 | |
|             for filename in sftp_connection.listdir():
 | |
|                 #if edi_945.SOURCE_945_FILENAME_RE.match(filename):#TODO fixme
 | |
|                 sftp_connection.get(filename, INCOMING_DIRECTORY / filename)
 | |
|                 new_filename = f"/yu-edi-transfer/source-logi/prod/ecomm-processed/{filename}"
 | |
|                 sftp_connection.rename(filename, new_filename)
 | |
| 
 | |
| 
 | |
| def process_files(file):
 | |
|     with open(file, encoding='utf8') as source_file:
 | |
|         with yamamotoyama.get_connection() as db_connection:
 | |
|             with db_connection.transaction() as _:
 | |
|                 with open(ORDER_DIRECTORY / f'sorted_{file.name}', 'w',newline='',encoding='utf8') as output:
 | |
|                     csv_reader = csv.reader(source_file)
 | |
|                     csv_writer = csv.writer(output)
 | |
|                     csv_writer.writerow(next(csv_reader, None)) #skip header
 | |
|                     csv_sort = sorted(csv_reader, key=lambda row: row[6], reverse=False)
 | |
|                     for row in csv_sort:
 | |
|                         csv_writer.writerow(row)
 | |
|                         site = row[0]
 | |
|                         item = row[1]
 | |
|                         des = row[2]
 | |
|                         qty = row[3]
 | |
|                         lot = row[4]
 | |
|                         order_id = row[6]
 | |
|                         tracking = row[16]
 | |
|                         ship_date = datetime.datetime.strptime(row[17],'%m/%d/%Y %I:%M:%S %p').strftime('%m/%d/%Y')# what comes from SL
 | |
|                         #ship_date = datetime.datetime.strptime(row[17],'%m/%d/%Y %H:%M').strftime('%m/%d/%Y')#default when we sort in Excel
 | |
|                         
 | |
|                         db_connection.query(
 | |
|                             """
 | |
|                             INSERT INTO [analytics].[dbo].[SL_ECOMM]
 | |
|                                 ([site],[item],[des],[qty],[lot],[order_id],[ship_date],[tracking])
 | |
|                             VALUES
 | |
|                                 (:site,:item,:des,:qty,:lot,:order_id,:ship_date,:tracking)
 | |
|                             """,
 | |
|                             site=site,
 | |
|                             item=item,
 | |
|                             des=des,
 | |
|                             qty=qty,
 | |
|                             lot=lot,
 | |
|                             order_id=order_id,
 | |
|                             ship_date=ship_date,
 | |
|                             tracking=tracking
 | |
|                         )
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main()
 | |
| 
 |