source_logistics_ecommerce_.../source_ecommerce_load_shipp...

116 lines
4.4 KiB
Python

#!/usr/bin/env python3
"""
Pull shipment files from the Stash AWS FTP.
load shipping data into analytics database
"""
import csv
import pprint
import dataclasses
import datetime
import decimal
import functools
import pathlib
import re
import shutil
import typing
import paramiko
import records # type: ignore
import yamamotoyama # type: ignore
import yamamotoyama.x3_imports # type: ignore
THIS_DIRECTORY = pathlib.Path(__file__).parent
SFTP_HOST = "s-8ade4d252cc44c50b.server.transfer.us-west-1.amazonaws.com"
SFTP_USERNAME = "yumiddleware2023"
SSH_DIRECTORY = THIS_DIRECTORY / "ssh" #TODO fixme
SSH_KNOWN_HOSTS_FILE = str(SSH_DIRECTORY / "known_hosts")
SSH_KEY_FILENAME = str(SSH_DIRECTORY / "id_ed25519")
INCOMING_DIRECTORY = THIS_DIRECTORY / "new_files_from_ftp"
ORDER_DIRECTORY = THIS_DIRECTORY / "incoming_orders"
ARCHIVE_DIRECTORY = THIS_DIRECTORY / "incoming_shipments" / "archive"
def main():
#retrieve_x12_edi_files()#TODO uncomment
for file in INCOMING_DIRECTORY.iterdir():
if file.name[-4:] != '.csv':
continue
else:
process_files(file)
shutil.move(file, ARCHIVE_DIRECTORY / file.name)
def sftp_server() -> paramiko.SFTPClient:
with paramiko.SSHClient() as ssh_client:
ssh_client.load_system_host_keys()
ssh_client.load_host_keys(SSH_KNOWN_HOSTS_FILE)
ssh_client.set_missing_host_key_policy(paramiko.client.RejectPolicy)
ssh_client.connect(
hostname=SFTP_HOST, username=SFTP_USERNAME, key_filename=SSH_KEY_FILENAME
)
with ssh_client.open_sftp() as sftp_connection:
yield sftp_connection
def retrieve_x12_edi_files():
"""
Connect to S3 bucket & pull down files.
"""
with paramiko.SSHClient() as ssh_client:
ssh_client.load_system_host_keys()
ssh_client.load_host_keys(SSH_KNOWN_HOSTS_FILE)
ssh_client.set_missing_host_key_policy(paramiko.client.RejectPolicy)
ssh_client.connect(
hostname=SFTP_HOST, username=SFTP_USERNAME, key_filename=SSH_KEY_FILENAME
)
with ssh_client.open_sftp() as sftp_connection:
sftp_connection.chdir("/yu-edi-transfer/source-logi/prod/ecomm-inbound")
for filename in sftp_connection.listdir():
#if edi_945.SOURCE_945_FILENAME_RE.match(filename):#TODO fixme
sftp_connection.get(filename, INCOMING_DIRECTORY / filename)
new_filename = f"/yu-edi-transfer/source-logi/prod/ecomm-processed/{filename}"
sftp_connection.rename(filename, new_filename)
def process_files(file):
with open(file) as source_file:
with yamamotoyama.get_connection() as db_connection:
with db_connection.transaction() as _:
with open(ORDER_DIRECTORY / f'sorted_{file.name}', 'w',newline='') as output:
csv_reader = csv.reader(source_file)
csv_writer = csv.writer(output)
csv_writer.writerow(next(csv_reader, None)) #skip header
csv_sort = sorted(csv_reader, key=lambda row: row[6], reverse=False)
for row in csv_sort:
csv_writer.writerow(row)
site = row[0]
item = row[1]
des = row[2]
qty = row[3]
lot = row[4]
order_id = row[6]
ship_date = datetime.datetime.strptime(row[17],'%m/%d/%Y %I:%M:%S %p').strftime('%m/%d/%Y')# what comes from SL
#ship_date = datetime.datetime.strptime(row[17],'%m/%d/%Y %H:%M').strftime('%m/%d/%Y')#default when we sort in Excel
db_connection.query(
"""
INSERT INTO [analytics].[dbo].[SL_ECOMM]
([site],[item],[des],[qty],[lot],[order_id],[ship_date])
VALUES
(:site,:item,:des,:qty,:lot,:order_id,:ship_date)
""",
site=site,
item=item,
des=des,
qty=qty,
lot=lot,
order_id=order_id,
ship_date=ship_date
)
if __name__ == "__main__":
main()