source_logistics_ecommerce_.../source_ecommerce_load_shipp...

142 lines
5.4 KiB
Python
Raw Normal View History

2024-04-19 10:17:30 -07:00
#!/usr/bin/env python3
"""
Pull shipment files from the Stash AWS FTP.
load shipping data into analytics database
"""
import csv
import pprint
import dataclasses
import datetime
import decimal
import functools
import pathlib
import re
import shutil
import typing
import paramiko
import records # type: ignore
import yamamotoyama # type: ignore
import yamamotoyama.x3_imports # type: ignore
THIS_DIRECTORY = pathlib.Path(__file__).parent
SFTP_HOST = "s-8ade4d252cc44c50b.server.transfer.us-west-1.amazonaws.com"
SFTP_USERNAME = "yumiddleware2023"
SSH_DIRECTORY = THIS_DIRECTORY / "ssh" #TODO fixme
SSH_KNOWN_HOSTS_FILE = str(SSH_DIRECTORY / "known_hosts")
SSH_KEY_FILENAME = str(SSH_DIRECTORY / "id_ed25519")
INCOMING_DIRECTORY = THIS_DIRECTORY / "new_files_from_ftp"
ORDER_DIRECTORY = THIS_DIRECTORY / "incoming_orders"
ARCHIVE_DIRECTORY = THIS_DIRECTORY / "incoming_shipments" / "archive"
def main():
retrieve_x12_edi_files()#TODO uncomment
for file in INCOMING_DIRECTORY.iterdir():
if file.name[-4:] != '.csv':
continue
else:
clean_lines(file)
shutil.move(file, ARCHIVE_DIRECTORY / file.name)
2024-04-19 10:17:30 -07:00
for file in INCOMING_DIRECTORY.iterdir():
if file.name[-4:] != '.csv':
continue
else:
process_files(file)
shutil.move(file, ARCHIVE_DIRECTORY / file.name)
def sftp_server() -> paramiko.SFTPClient:
with paramiko.SSHClient() as ssh_client:
ssh_client.load_system_host_keys()
ssh_client.load_host_keys(SSH_KNOWN_HOSTS_FILE)
ssh_client.set_missing_host_key_policy(paramiko.client.RejectPolicy)
ssh_client.connect(
hostname=SFTP_HOST, username=SFTP_USERNAME, key_filename=SSH_KEY_FILENAME
)
with ssh_client.open_sftp() as sftp_connection:
yield sftp_connection
def retrieve_x12_edi_files():
"""
Connect to S3 bucket & pull down files.
"""
with paramiko.SSHClient() as ssh_client:
ssh_client.load_system_host_keys()
ssh_client.load_host_keys(SSH_KNOWN_HOSTS_FILE)
ssh_client.set_missing_host_key_policy(paramiko.client.RejectPolicy)
ssh_client.connect(
hostname=SFTP_HOST, username=SFTP_USERNAME, key_filename=SSH_KEY_FILENAME
)
with ssh_client.open_sftp() as sftp_connection:
sftp_connection.chdir("/yu-edi-transfer/source-logi/prod/ecomm-inbound")
for filename in sftp_connection.listdir():
#if edi_945.SOURCE_945_FILENAME_RE.match(filename):#TODO fixme
sftp_connection.get(filename, INCOMING_DIRECTORY / filename)
new_filename = f"/yu-edi-transfer/source-logi/prod/ecomm-processed/{filename}"
sftp_connection.rename(filename, new_filename)
def clean_lines(file):
"""
Fix lines that have newlines in their descriptions
"""
with open(file, 'r', encoding='utf8') as source_file:
with open(INCOMING_DIRECTORY / f'clean_{file.name}', 'w',newline='',encoding='utf8') as output:
csv_reader = csv.reader(source_file)
csv_writer = csv.writer(output)
headers = next(csv_reader)
data = list(csv_reader)
for i, row in enumerate(data):
if len(row) < len(headers):
next_line = data.pop(i+1)
csv_writer.writerow(row+next_line)
else:
csv_writer.writerow(row)
2024-04-19 10:17:30 -07:00
def process_files(file):
2024-07-05 15:10:08 -07:00
with open(file, encoding='utf8') as source_file:
2024-04-19 10:17:30 -07:00
with yamamotoyama.get_connection() as db_connection:
with db_connection.transaction() as _:
2024-07-05 15:10:08 -07:00
with open(ORDER_DIRECTORY / f'sorted_{file.name}', 'w',newline='',encoding='utf8') as output:
2024-04-19 10:17:30 -07:00
csv_reader = csv.reader(source_file)
csv_writer = csv.writer(output)
csv_writer.writerow(next(csv_reader, None)) #skip header
csv_sort = sorted(csv_reader, key=lambda row: row[6], reverse=False)
for row in csv_sort:
csv_writer.writerow(row)
site = row[0]
item = row[1]
des = row[2]
qty = row[3]
lot = row[4]
order_id = row[6]
tracking = row[16]
2024-04-19 10:17:30 -07:00
ship_date = datetime.datetime.strptime(row[17],'%m/%d/%Y %I:%M:%S %p').strftime('%m/%d/%Y')# what comes from SL
#ship_date = datetime.datetime.strptime(row[17],'%m/%d/%Y %H:%M').strftime('%m/%d/%Y')#default when we sort in Excel
db_connection.query(
"""
INSERT INTO [analytics].[dbo].[SL_ECOMM]
([site],[item],[des],[qty],[lot],[order_id],[ship_date],[tracking])
2024-04-19 10:17:30 -07:00
VALUES
(:site,:item,:des,:qty,:lot,:order_id,:ship_date,:tracking)
2024-04-19 10:17:30 -07:00
""",
site=site,
item=item,
des=des,
qty=qty,
lot=lot,
order_id=order_id,
ship_date=ship_date,
tracking=tracking
2024-04-19 10:17:30 -07:00
)
if __name__ == "__main__":
main()