Compare commits

..

No commits in common. "master" and "main" have entirely different histories.
master ... main

18 changed files with 1 additions and 5435 deletions

View File

@ -1,285 +0,0 @@
#!/usr/bin/env python3
"""
Consume a 846 file from 3PLs, and translate into a
inventory comparison report
For Shadex we also need to reply with a 997
"""
import datetime
import pathlib
import re
import shutil
import typing
import smtplib
import pprint
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from email.mime.text import MIMEText
import records # type: ignore
import yamamotoyama # type: ignore
import yamamotoyama.x3_imports # type: ignore
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "incoming"
IMPORTS_DIRECTORY = THIS_DIRECTORY / "x3_imports"
EDI_997_DIRECTORY = THIS_DIRECTORY / "997_processing"
EDI_846_ATTACHMENTS = THIS_DIRECTORY / "846_reports"
EDI_846_ATTACHMENTS_ARCHIVE = EDI_846_ATTACHMENTS / "archive"
SHANDEX_846_FILENAME_RE = re.compile(
r"\A 846_STASH-YAMAMOTOYAMA_ \S+ [.]edi \Z", re.X | re.M | re.S
)
SHANDEX_STATUS = {
'33' : 'A',
'20' : 'R',
'QH' : 'Q'
}
def main():
"""
Do it!
"""
#read in the information from Shandex and store it
for edi_filename in X12_DIRECTORY.iterdir():
if SHANDEX_846_FILENAME_RE.match(edi_filename.name):
shandex_inventory=process_file(edi_filename)
# file moved to 997 processing folder to be sent later
shutil.move(edi_filename, EDI_997_DIRECTORY / edi_filename.name)
#get stock information about WON and store it
#pass date from EDI so we can subtract newer stock movements?
x3_won_inventory=get_x3_won_inventory()
#write out an excel file with the stock from Shandex with X3 next to it, then include anything missing
compare_inventory(shandex_inventory, x3_won_inventory)
stock_count_alert()
def compare_inventory(shandex_inventory, x3_inventory):
today = datetime.datetime.today()
today = today.strftime('%Y-%m-%d')
with open(EDI_846_ATTACHMENTS / f'inventory_comparison_{today}.csv', 'w', newline='') as outfile:
outfile.write(','.join(['Site','Item','Description','Lot','X3 qty','Shandex qty']))#header
outfile.write('\n')
for record in x3_inventory:
site = record["STOFCY_0"]
item = record["ITMREF_0"]
des = record["ITMDES1_0"]
lot = record["LOT_0"]
qty = str(record["QTY"])
outfile.write(','.join([site, item, des, lot, qty]))
outfile.write(',')
if item in shandex_inventory:
if lot in shandex_inventory[item]:
outfile.write(str(shandex_inventory[item][lot]))
# pprint.pprint(shandex_inventory[item])
del shandex_inventory[item][lot]
# pprint.pprint(shandex_inventory[item])
if len(shandex_inventory[item]) == 0:
# pprint.pprint('entire del')
# pprint.pprint(shandex_inventory[item])
del shandex_inventory[item]
# pprint.pprint(shandex_inventory[item])
else:
outfile.write('0')#lot not found
else:
outfile.write('0')#item not found
outfile.write('\n')
#write the rest of shandex inventory
if len(shandex_inventory) > 0:
outfile.write('Shandex only')
outfile.write('\n')
for item in shandex_inventory:
# pprint.pprint(item)
for lot in shandex_inventory[item]:
# pprint.pprint(lot)
qty = str(shandex_inventory[item][lot])
outfile.write(','.join([item, lot, qty]))
outfile.write('\n')
def tokens_from_edi_file(
edi_filename: pathlib.Path,
) -> typing.Iterator[typing.List[str]]:
"""
Read tokens from EDI file
"""
with edi_filename.open(encoding="utf-8", newline="") as edi_file:
for record in edi_file.read().split("~"):
fields = record.split("*")
if fields[0] in {
"ISA",
"GS"
"ST",
"RED",
"GE",
"IEA"
}:
continue
yield fields
def get_x3_won_inventory():
#TODO correct the dates used in stock_issues?
with yamamotoyama.get_connection() as db_connection:
return db_connection.query(
"""
with stock_issues as (
select
STJ.STOFCY_0,
STJ.ITMREF_0,
STJ.LOT_0,
sum(STJ.QTYSTU_0) [QTYSTU_0]
from PROD.STOJOU STJ
where
STJ.STOFCY_0 = 'WON'
and STJ.IPTDAT_0 between getdate()-1 and getdate()
group by
STJ.STOFCY_0,
STJ.ITMREF_0,
STJ.LOT_0
)
select
SLF.STOFCY_0,
SLF.ITMREF_0,
ITM.ITMDES1_0,
SLF.LOT_0,
cast(sum(SLF.AAACUMQTY_0 + SLF.QQQCUMQTY_0 + SLF.RRRCUMQTY_0 - coalesce(stock_issues.QTYSTU_0,0)) as integer) as QTY,
SLF.AVC_0
from PROD.STOLOTFCY SLF
join PROD.ITMMASTER ITM on
SLF.ITMREF_0 = ITM.ITMREF_0
left join stock_issues
on SLF.ITMREF_0 = stock_issues.ITMREF_0
and SLF.STOFCY_0 = stock_issues.STOFCY_0
and SLF.LOT_0 = stock_issues.LOT_0
where
SLF.STOFCY_0 = 'WON'
group by
SLF.STOFCY_0,
SLF.ITMREF_0,
ITM.ITMDES1_0,
SLF.LOT_0,
SLF.AVC_0
order by 1, 2
""",
#startdate=edi_date,
).all()
def gtin_lookup(gtin):
with yamamotoyama.get_connection() as db_connection:
itmref = db_connection.query(
"""
select
[ITM].[ITMREF_0],
[ITM].[ITMDES1_0],
[ITM].[EANCOD_0],
[ITM].[ZCASEUPC_0]
from [PROD].[ITMMASTER] [ITM]
join [PROD].[ITMFACILIT] [ITF]
on [ITM].[ITMREF_0] = [ITF].[ITMREF_0]
and [ITF].[STOFCY_0] = 'WON'
where
replace([ITM].[ZCASEUPC_0],' ','') = :zcaseupc
""",
zcaseupc=gtin,
).first()
if itmref is None:
itmref = db_connection.query(
"""
select
[ITM].[ITMREF_0],
[ITM].[ITMDES1_0],
[ITM].[EANCOD_0],
[ITM].[ZCASEUPC_0]
from [PROD].[ITMMASTER] [ITM]
join [PROD].[ITMFACILIT] [ITF]
on [ITM].[ITMREF_0] = [ITF].[ITMREF_0]
and [ITF].[STOFCY_0] = 'WON'
where
replace([ITM].[EANCOD_0],' ','') = :zcaseupc
""",
zcaseupc=gtin,
).first()["ITMREF_0"]
else:
itmref = itmref["ITMREF_0"]
return itmref
def stock_count_alert():
msg = MIMEMultipart()
msg['Subject'] = 'New Stock Count from Shandex'
msg['Precedence'] = 'bulk'
msg['From'] = 'x3report@stashtea.com'
msg['To'] = 'isenn@yamamotoyama.com,vgomez@yamamotoyama.com'
msg['Cc'] = 'bleeson@stashtea.com'
emailtext = f'Attached.'
msg.attach(MIMEText(emailtext, 'plain'))
for file in EDI_846_ATTACHMENTS.iterdir():
if file.name.endswith('.csv'):
part = MIMEApplication(open(file, 'rb').read())
part['Content-Disposition'] = f'attachment; filename="{file.name}"'
msg.attach(part)
shutil.move(file, EDI_846_ATTACHMENTS_ARCHIVE / file.name)
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
smtp.send_message(msg)
def process_file(edi_filename: pathlib.Path):
"""
Convert a specific EDI file into an import file.
"""
shandex_inventory = {} #all inventory
product = ''
lot = ''
qty = 0
for fields in tokens_from_edi_file(edi_filename):
if fields[0] == "BIA":
advice_date = fields[4]
if fields[0] == 'LIN':
if product != '': #check loop entry
if product not in shandex_inventory: #if we haven't seen the product yet add it
# pprint.pprint('product was not found')
shandex_inventory[product] = {lot : qty}
# pprint.pprint(shandex_inventory)
else: #we've seen this product, have we seen the lot
if lot not in shandex_inventory[product]:#if not, add it
shandex_inventory[product][lot] = qty
else:
shandex_inventory[product][lot] += qty #if we have add to it
# pprint.pprint('product: ' + product)
# pprint.pprint('lot: ' + lot)
# pprint.pprint('qty: ' + str(qty))
#LIN**SK*077652972160*LT*31052026A
gtin = fields[3]
lot = fields[5]
product = gtin_lookup(gtin)
qty = 0
if fields[0] == "QTY":#product should already exist
# QTY*33*0
# QTY*20*16
# QTY*QH*0
qty += int(fields[2])
if fields[0] == "SE":#end of file
# pprint.pprint('final add')
# pprint.pprint(shandex_inventory)
# pprint.pprint('product: ' + product)
# pprint.pprint('lot: ' + lot)
# pprint.pprint('qty: ' + str(qty))
if product is not None: #check loop entry
if product not in shandex_inventory: #if we haven't seen the product yet add it
shandex_inventory[product] = {lot : qty}
else: #we've seen this product, have we seen the lot
if lot not in shandex_inventory[product]:#if not, add it
shandex_inventory[product][lot] = qty
else:
shandex_inventory[product][lot] += qty #if we have add to it
return shandex_inventory
if __name__ == "__main__":
main()

View File

@ -1,860 +0,0 @@
#!/usr/bin/env python3
"""
Consume a 867 file from Shandex, and translate into a Sage X3
readable file-ZSHIP867.
New changes, need to bring in under ship to customer whenever possible.
Build a mapping file of known customers by matching their address to x3 codes
need to not import a shipment if a customer mapping doesn't exist.
"""
# pylint: disable=too-many-instance-attributes
import dataclasses
import datetime
import decimal
import functools
import pathlib
import re
import shutil
import typing
import pprint
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import records # type: ignore
import yamamotoyama # type: ignore
import yamamotoyama.x3_imports # type: ignore
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "incoming"
IMPORTS_DIRECTORY = THIS_DIRECTORY / "x3_imports"
EDI_997_DIRECTORY = THIS_DIRECTORY / "997_processing"
SOURCE_867_FILENAME_RE = re.compile(
r"\A 867_STASH-YAMAMOTOYAMA_ .* [.]edi \Z", re.X | re.M | re.S
)
# Not needed, Shandex stores everything how they want so we need to look up in X3
# UOM_MAPPING = {
# "CA" : "CS",
# "EC" : "EA"
#}
#NAME_ADDRESS_CITY_TERRITORY_POSTAL : X3 Customer Code
X3_CUSTOMER_MAPPING = {
'AVRI1000_AVRIQC' : 'AVRI0001',
'BULK1000_BULKAU' : 'BULK0001',
'COOP2000_190148' : 'FEDE0006',
'COOP2000_190149' : 'FEDE0005',
'COOP2000_607S' : 'FEDE0003',
'COOP2000_CAL' : 'FEDE0007',
'HORI1000_HORIBC' : 'HORI0001',
'LOND1000_190005' : 'LOND0001',
'NATI1000_28' : 'LOBL0002',
'NATI1000_34' : 'LOBL0006',
'NATI1000_D022' : 'LOBL0001',
'ONTA1100_ONTAON' : 'ONTA0002',
'OVER1000_5111' : 'OVER0002',
'OVER1000_A24' : 'OVER0004',
'PARA1100_PARABC' : 'PARA0004',
'PSCN1000_PSCBC' : 'PSCN0002',
'PURE1000_PUREON' : 'PURE0004',
'PURI1000_PURION' : 'PURI0002',
'SATA1000_SATAQC' : 'SATA0002',
'UNFI1000_UNFIBC' : 'UNFI0011',
'UNFI1000_UNFION' : 'UNFI0004',
'SAMP1000_0000' : 'YARI0001',
'PURI1000_PURIQC' : 'PURI0005',
'JIVA1000_JIVABC' : 'JIVA0002',
'WELL1000_WELL' : 'WELL0002',
'WELL1000_WELCAL' : 'WELL0003',
'AMAZ1200_YYC4' : 'AMAZ0210',
'AMAZ1200_YVR2' : 'AMAZ0014',
'AMAZ1200_YYZ4' : 'AMAZ0049',
'AMAZ1200_YXU1' : 'AMAZ0189',
'AMAZ1200_YOW3' : 'AMAZ0176',
'PARA1100_0000' : 'PARA0004',
'AMAZ1200_YVR4' : 'AMAZ0099',
'AMAZ1200_YHM1' : 'AMAZ0169',
'AMAZ1200_YYZ7' :'AMAZ0100',
'PURI1000_PURIBC' : 'PURI0003',
'PURI1000_PURIAB' : 'PURI0004',
'AMAZ1200_YEG2' : 'AMAZ0179',
}
def main():
"""
Do it!
"""
for edi_filename in X12_DIRECTORY.iterdir():
if SOURCE_867_FILENAME_RE.match(edi_filename.name):
process_file(edi_filename)
shutil.copy(edi_filename, EDI_997_DIRECTORY / edi_filename.name)
shutil.move(edi_filename, THIS_DIRECTORY / "processed_867s" / edi_filename.name) #They go in here so we can use them in the dashboard script
combine_zship867s()
def missing_customer_alert(customer_key):
msg = MIMEMultipart()
msg['Subject'] = 'Shandex 867 - Missing X3 Customer'
msg['Precedence'] = 'bulk'
msg['From'] = 'x3report@stashtea.com'
msg['To'] = 'technical-contact@stashtea.com'
emailtext = f'Missing value: {customer_key}'
msg.attach(MIMEText(emailtext, 'plain'))
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
smtp.send_message(msg)
def combine_zship867s():
"""
Collect all ZSHIP867 imports into a single file for easy import.
"""
archive_directory = IMPORTS_DIRECTORY / "archive"
archive_directory.mkdir(exist_ok=True)
with (IMPORTS_DIRECTORY / "ZSHIP867.dat").open(
"w", encoding="utf-8", newline="\n"
) as combined_import_file:
for individual_import_filename in IMPORTS_DIRECTORY.glob(
"ZSHIP867_*.dat"
):
with individual_import_filename.open(
"r", encoding="utf-8", newline="\n"
) as individual_import_file:
for line in individual_import_file:
combined_import_file.write(line)
shutil.move(
individual_import_filename,
archive_directory / individual_import_filename.name,
)
def tokens_from_edi_file(
edi_filename: pathlib.Path,
) -> typing.Iterator[typing.List[str]]:
"""
Read tokens from EDI file
"""
with edi_filename.open(encoding="utf-8", newline="") as edi_file:
for record in edi_file.read().split("~"):
fields = record.split("*")
if fields[0] in {
"ISA",
"GS",
"ST",
"BPT",
}:
continue
yield fields
def get_product_from_gtin(gtin):
#pprint.pprint(gtin)
with yamamotoyama.get_connection() as database:
result = database.query(
"""
select
[ITM].[ITMREF_0],
[ITM].[ITMDES1_0],
[ITM].[EANCOD_0],
[ITM].[ZCASEUPC_0],
[ITM].[STU_0]
from PROD.ITMMASTER ITM
join PROD.ITMFACILIT ITF
on ITM.ITMREF_0 = ITF.ITMREF_0
and ITF.STOFCY_0 = 'WON'
where
replace([ITM].[ZCASEUPC_0],' ','') = :zcaseupc
""",
zcaseupc=gtin,
).first()
if result is None:
result = database.query(
"""
select
[ITM].[ITMREF_0],
[ITM].[ITMDES1_0],
[ITM].[EANCOD_0],
[ITM].[ZCASEUPC_0],
[ITM].[STU_0]
from [PROD].[ITMMASTER] [ITM]
join [PROD].[ITMFACILIT] [ITF]
on [ITM].[ITMREF_0] = [ITF].[ITMREF_0]
and [ITF].[STOFCY_0] = 'WON'
where
replace([ITM].[EANCOD_0],' ','') = :zcaseupc
""",
zcaseupc=gtin,
).first()
return result
def process_file(edi_filename: pathlib.Path):
"""
Convert a specific EDI file into an import file.
"""
shipping_date = ''
previous_picking_number = ''
po_number = ''
cust_po_number = ''
warehouse_shipment = WarehouseShipment()
for fields in tokens_from_edi_file(edi_filename):
if fields[0] == "DTM":
shipping_date = fields[2]
if fields[0] == "PTD" and len(fields) > 2:#There is one PTD in the header that is not used
picking_number = fields[5]
warehouse_shipment.header.ylicplate = f'{previous_picking_number}'
if po_number != '':
warehouse_shipment.header.yclippership = cust_po_number
warehouse_shipment.header.ylicplate = f'{po_number}'
if picking_number != previous_picking_number and previous_picking_number != '':
if warehouse_shipment.header.bpdnam != 'Shandex Group':
#pprint.pprint(warehouse_shipment.header.ylicplate)
warehouse_shipment.header.shidat = datetime.datetime.strptime(
shipping_date, "%Y%m%d")
time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
with yamamotoyama.x3_imports.open_import_file(
IMPORTS_DIRECTORY / f"ZSHIP867_{warehouse_shipment.header.ylicplate}_{time_stamp}.dat"
) as import_file:
warehouse_shipment.output(import_file)
warehouse_shipment = WarehouseShipment()
po_number = ''
warehouse_shipment.header.ylicplate = f'{picking_number}'
previous_picking_number = picking_number
if fields[0] =='REF' and fields[1] == 'PO':
cust_po_number = fields[2]
if fields[0] =='REF' and fields[1] == 'IL':
po_number = fields[2]
if fields[0] == "N1" and fields[1] == 'ST':
ship_to_customer = fields[2]
shandex_code_part1 = fields[4]
warehouse_shipment.header.bpdnam = ship_to_customer
if fields[0] == "N1" and fields[1] == 'BY':
shandex_code_part2 = fields[4]
if fields[0] == "N3":
ship_to_address = fields[1]
warehouse_shipment.header.bpdaddlig = ship_to_address
if fields[0] == "N4":
ship_to_city = fields[1]
ship_to_province = fields[2]
ship_to_zip = fields[3]
warehouse_shipment.header.bpdposcod = ship_to_zip
warehouse_shipment.header.bpdcty = ship_to_city
warehouse_shipment.header.bpdsat = ship_to_province
customer_key = warehouse_shipment.create_customer_key(shandex_code_part2, shandex_code_part1)
if customer_key == 'SAMP1000_0000': #flag sample orders better
warehouse_shipment.header.bpdnam = 'SMP: ' + warehouse_shipment.header.bpdnam
if customer_key not in X3_CUSTOMER_MAPPING.keys():
pprint.pprint(customer_key + ' not found.')
missing_customer_alert(customer_key)
raise NotImplementedError
else:
warehouse_shipment.header.bpcord = X3_CUSTOMER_MAPPING[customer_key]
if fields[0] == "QTY":
#QTY*39*10*CA
_, _, qty_str, uom = fields[:4]
#warehouse_shipment.sohnum = sohnum
if fields[0] == "LIN":
#LIN**VN*10077652082224*LT*09032026C#
_, _, _, gtin, _, lot = fields[:6]
if fields[0] == "AMT":
#AMT*LP*53.90
_, _, price = fields[:3]
lookup_values = get_product_from_gtin(gtin)
itmref = lookup_values['ITMREF_0']
itmdes = lookup_values['ITMDES1_0']
sau = lookup_values['STU_0']
#pprint.pprint(itmdes)
subdetail = WarehouseShipmentSubDetail(
qtypcu=-1 * int(qty_str),
lot=lot,
)
warehouse_shipment.append(
WarehouseShipmentDetail(
#sohnum=warehouse_shipment.sohnum,
itmref=itmref,
itmdes=itmdes,
qty=int(qty_str),
gropri=price,
sau=sau
),
subdetail,
)
#pprint.pprint(warehouse_shipment.header.ylicplate)
warehouse_shipment.header.shidat = datetime.datetime.strptime(
shipping_date, "%Y%m%d")
time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
with yamamotoyama.x3_imports.open_import_file(
IMPORTS_DIRECTORY / f"ZSHIP867_{picking_number}_{time_stamp}.dat"
) as import_file:
warehouse_shipment.output(import_file)
@dataclasses.dataclass
class WarehouseShipmentSubDetail:
"""
Information that goes onto a shipment sub-detail line, taken from ZSHIP867 template.
"""
sta: str = "A"
pcu: str = ""
qtypcu: int = 0
loc: str = ""
lot: str = ""
sernum: str = ""
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to strings for X3 import writing.
"""
return yamamotoyama.x3_imports.convert_to_strings(
[
"S",
self.sta,
self.pcu,
self.qtypcu,
self.loc,
self.lot,
self.sernum,
]
)
@dataclasses.dataclass
class WarehouseShipmentDetail:
"""
Information that goes on a shipment detail line, taken from ZSHIP867 template.
"""
sohnum: str = ""
soplin: int = 0
itmref: str = ""
itmdes: str = ""
sau: str = ""
qty: int = 0
gropri: decimal.Decimal = decimal.Decimal()
star91: str = ""
star92: str = ""
subdetails: typing.List[WarehouseShipmentSubDetail] = dataclasses.field(
default_factory=list
)
def append(self, subdetail: WarehouseShipmentSubDetail):
"""
Add subdetail
"""
subdetail.pcu = self.sau
self.subdetails.append(subdetail)
def check_subdetail_qty(self):
"""
Check for shortages by totaling up subdetail quantities.
"""
total_cases = 0
for subdetail in self.subdetails:
total_cases += subdetail.qtypcu
return abs(total_cases)
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to strings for X3 import writing.
"""
self.qty = self.check_subdetail_qty()
return yamamotoyama.x3_imports.convert_to_strings(
[
"L",
self.sohnum,
self.soplin,
self.itmref,
self.itmdes,
self.sau,
self.qty,
self.gropri,
self.star91,
self.star92,
]
)
def __eq__(self, item: typing.Any) -> bool:
"""
Test for equality
"""
if isinstance(item, str):
return self.itmref == item
if isinstance(item, WarehouseShipmentDetail):
return self.itmref == item.itmref
return False
def fill(self):
"""
Set soplin & itmdes from itmref & sohnum
"""
def get() -> records.Record:
with yamamotoyama.get_connection() as database:
how_many = (
database.query(
"""
select
count(*) as [how_many]
from [PROD].[SORDERP] as [SOP]
where
[SOP].[SOHNUM_0] = :sohnum
and [SOP].[ITMREF_0] = :itmref
""",
sohnum=self.sohnum,
itmref=self.itmref,
)
.first()
.how_many
)
if how_many == 1:
return database.query(
"""
select top 1
[SOP].[SOPLIN_0]
,[SOP].[ITMDES1_0]
,[SOP].[SAU_0]
from [PROD].[SORDERP] as [SOP]
where
[SOP].[SOHNUM_0] = :sohnum
and [SOP].[ITMREF_0] = :itmref
order by
[SOP].[SOPLIN_0]
""",
sohnum=self.sohnum,
itmref=self.itmref,
).first()
else:
emailtext = str(self.sohnum +' '+str(self.itmref))
msg.attach(MIMEText(emailtext, 'plain'))
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
smtp.send_message(msg)
raise NotImplementedError # TODO
result = get()
self.soplin = result.SOPLIN_0
self.itmdes = result.ITMDES1_0
self.sau = result.SAU_0
@dataclasses.dataclass
class WarehouseShipmentHeader:
"""
Information that goes on a shipment header, taken from ZSHIP867 template.
"""
salfcy: str = "STC"
stofcy: str = "WON"
sdhnum: str = ""
bpcinv: str = "SHAN0001"
bpcord: str = "SHAN0001"
bpaadd: str = "BL001"
cur: str = "CAD"
shidat: datetime.date = datetime.date(1753, 1, 1)
cfmflg: int = 1
pjt: str = ""
bptnum: str = ""
ylicplate: str = "SHANDEX"
yclippership: str = ""
invdtaamt_2: decimal.Decimal = decimal.Decimal()
invdtaamt_3: decimal.Decimal = decimal.Decimal()
invdtaamt_4: decimal.Decimal = decimal.Decimal()
invdtaamt_5: decimal.Decimal = decimal.Decimal()
invdtaamt_6: decimal.Decimal = decimal.Decimal()
invdtaamt_7: decimal.Decimal = decimal.Decimal()
invdtaamt_8: decimal.Decimal = decimal.Decimal()
invdtaamt_9: decimal.Decimal = decimal.Decimal()
die: str = "" #TODO consider adding dimension codes?
die_1: str = ""
die_2: str = ""
die_3: str = ""
die_4: str = ""
die_5: str = ""
die_6: str = ""
die_7: str = ""
die_8: str = ""
die_9: str = ""
die_10: str = ""
die_11: str = ""
die_12: str = ""
die_13: str = ""
die_14: str = ""
die_15: str = ""
die_16: str = ""
die_17: str = ""
die_18: str = ""
die_19: str = ""
cce: str = ""
cce_1: str = ""
cce_2: str = ""
cce_3: str = ""
cce_4: str = ""
cce_5: str = ""
cce_6: str = ""
cce_7: str = ""
cce_8: str = ""
cce_9: str = ""
cce_10: str = ""
cce_11: str = ""
cce_12: str = ""
cce_13: str = ""
cce_14: str = ""
cce_15: str = ""
cce_16: str = ""
cce_17: str = ""
cce_18: str = ""
cce_19: str = ""
bpdnam: str = "Shandex Group"
bpdaddlig: str = ""
bpdaddlig_1: str = ""
bpdaddlig_2: str = ""
bpdposcod: str = ""
bpdcty: str = ""
bpdsat: str = ""
bpdcry: str = "CA"
bpdcrynam: str = "Canada"
sdhtyp: str = "SDN"
growei: decimal.Decimal = decimal.Decimal()#TODO consider gross weight?
pacnbr: int = 0
star71: str = ""
star72: str = ""
star81: str = ""
star82: str = ""
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to X3 import line
"""
return yamamotoyama.x3_imports.convert_to_strings(
[
"H",
self.salfcy,
self.stofcy,
self.sdhnum,
self.bpcinv,
self.bpcord,
self.bpaadd,
self.cur,
self.shidat.strftime("%Y%m%d"),
self.cfmflg,
self.pjt,
self.bptnum,
self.ylicplate,
self.yclippership,
self.invdtaamt_2,
self.invdtaamt_3,
self.invdtaamt_4,
self.invdtaamt_5,
self.invdtaamt_6,
self.invdtaamt_7,
self.invdtaamt_8,
self.invdtaamt_9,
self.die,
self.die_1,
self.die_2,
self.die_3,
self.die_4,
self.die_5,
self.die_6,
self.die_7,
self.die_8,
self.die_9,
self.die_10,
self.die_11,
self.die_12,
self.die_13,
self.die_14,
self.die_15,
self.die_16,
self.die_17,
self.die_18,
self.die_19,
self.cce,
self.cce_1,
self.cce_2,
self.cce_3,
self.cce_4,
self.cce_5,
self.cce_6,
self.cce_7,
self.cce_8,
self.cce_9,
self.cce_10,
self.cce_11,
self.cce_12,
self.cce_13,
self.cce_14,
self.cce_15,
self.cce_16,
self.cce_17,
self.cce_18,
self.cce_19,
self.bpdnam,
self.bpdaddlig,
self.bpdaddlig_1,
self.bpdaddlig_2,
self.bpdposcod,
self.bpdcty,
self.bpdsat,
self.bpdcry,
self.bpdcrynam,
self.sdhtyp,
self.growei,
self.pacnbr,
self.star71,
self.star72,
self.star81,
self.star82,
]
)
class WarehouseShipmentDetailList:
"""
List of shipment details
"""
_details: typing.List[WarehouseShipmentDetail]
_item_set: typing.Set[str]
def __init__(self):
self._details = []
self._item_set = set()
def append(
self,
shipment_detail: WarehouseShipmentDetail,
shipment_subdetail: WarehouseShipmentSubDetail,
):
"""
Append
"""
itmref = shipment_detail.itmref
# if itmref in self._item_set:
# for detail in self._details:
# if detail == itmref:
# detail.subdetails.append(shipment_subdetail)
# return
self._item_set.add(itmref)
#shipment_detail.fill()
shipment_detail.append(shipment_subdetail)
self._details.append(shipment_detail)
def __iter__(self):
return iter(self._details)
class WarehouseShipment:
"""
Warehosue shipment, both header & details
"""
header: WarehouseShipmentHeader
details: WarehouseShipmentDetailList
_sohnum: str
def __init__(self):
self.header = WarehouseShipmentHeader()
self._sohnum = ""
self.details = WarehouseShipmentDetailList()
def append(
self,
shipment_detail: WarehouseShipmentDetail,
shipment_subdetail: WarehouseShipmentSubDetail,
):
"""
Add detail information.
"""
self.details.append(shipment_detail, shipment_subdetail)
@property
def sohnum(self):
"""
Sales order number
"""
return self._sohnum
@sohnum.setter
def sohnum(self, value: str):
if self._sohnum != value:
self._sohnum = value
if value:
self._fill_info_from_so()
def create_customer_key(self, part1, part2):
key = (part1 + '_' + part2).replace(' ', '_')
return key
def _get_so_from_x3(self) -> records.Record:
"""
Fetch sales order from X3 database.
"""
with yamamotoyama.get_connection() as db_connection:
return db_connection.query(
"""
select
[SOH].[SALFCY_0]
,[SOH].[STOFCY_0]
,[SOH].[BPCORD_0]
,[SOH].[BPAADD_0]
,[SOH].[CUR_0]
,[SOH].[INVDTAAMT_2]
,[SOH].[INVDTAAMT_3]
,[SOH].[INVDTAAMT_4]
,[SOH].[INVDTAAMT_5]
,[SOH].[INVDTAAMT_6]
,[SOH].[INVDTAAMT_7]
,[SOH].[INVDTAAMT_8]
,[SOH].[INVDTAAMT_9]
,[SOH].[DIE_0]
,[SOH].[DIE_1]
,[SOH].[DIE_2]
,[SOH].[DIE_3]
,[SOH].[DIE_4]
,[SOH].[DIE_5]
,[SOH].[DIE_6]
,[SOH].[DIE_7]
,[SOH].[DIE_8]
,[SOH].[DIE_9]
,[SOH].[DIE_10]
,[SOH].[DIE_11]
,[SOH].[DIE_12]
,[SOH].[DIE_13]
,[SOH].[DIE_14]
,[SOH].[DIE_15]
,[SOH].[DIE_16]
,[SOH].[DIE_17]
,[SOH].[DIE_18]
,[SOH].[DIE_19]
,[SOH].[CCE_0]
,[SOH].[CCE_1]
,[SOH].[CCE_2]
,[SOH].[CCE_3]
,[SOH].[CCE_4]
,[SOH].[CCE_5]
,[SOH].[CCE_6]
,[SOH].[CCE_7]
,[SOH].[CCE_8]
,[SOH].[CCE_9]
,[SOH].[CCE_10]
,[SOH].[CCE_11]
,[SOH].[CCE_12]
,[SOH].[CCE_13]
,[SOH].[CCE_14]
,[SOH].[CCE_15]
,[SOH].[CCE_16]
,[SOH].[CCE_17]
,[SOH].[CCE_18]
,[SOH].[CCE_19]
,[SOH].[BPDNAM_0]
,[SOH].[BPDADDLIG_0]
,[SOH].[BPDADDLIG_1]
,[SOH].[BPDADDLIG_2]
,[SOH].[BPDPOSCOD_0]
,[SOH].[BPDCTY_0]
,[SOH].[BPDSAT_0]
,[SOH].[BPDCRY_0]
,[SOH].[BPDCRYNAM_0]
from [PROD].[SORDER] as [SOH]
where
[SOH].[SOHNUM_0] = :order
""",
order=self.sohnum,
).first()
def _copy_accounting_codes(self, result: records.Record):
"""
Fill in all the accounting codes
"""
self.header.die = result.DIE_0
self.header.die_1 = result.DIE_1
self.header.die_2 = result.DIE_2
self.header.die_3 = result.DIE_3
self.header.die_4 = result.DIE_4
self.header.die_5 = result.DIE_5
self.header.die_6 = result.DIE_6
self.header.die_7 = result.DIE_7
self.header.die_8 = result.DIE_8
self.header.die_9 = result.DIE_9
self.header.die_10 = result.DIE_10
self.header.die_11 = result.DIE_11
self.header.die_12 = result.DIE_12
self.header.die_13 = result.DIE_13
self.header.die_14 = result.DIE_14
self.header.die_15 = result.DIE_15
self.header.die_16 = result.DIE_16
self.header.die_17 = result.DIE_17
self.header.die_18 = result.DIE_18
self.header.die_19 = result.DIE_19
self.header.cce = result.CCE_0
self.header.cce_1 = result.CCE_1
self.header.cce_2 = result.CCE_2
self.header.cce_3 = result.CCE_3
self.header.cce_4 = result.CCE_4
self.header.cce_5 = result.CCE_5
self.header.cce_6 = result.CCE_6
self.header.cce_7 = result.CCE_7
self.header.cce_8 = result.CCE_8
self.header.cce_9 = result.CCE_9
self.header.cce_10 = result.CCE_10
self.header.cce_11 = result.CCE_11
self.header.cce_12 = result.CCE_12
self.header.cce_13 = result.CCE_13
self.header.cce_14 = result.CCE_14
self.header.cce_15 = result.CCE_15
self.header.cce_16 = result.CCE_16
self.header.cce_17 = result.CCE_17
self.header.cce_18 = result.CCE_18
self.header.cce_19 = result.CCE_19
def _fill_info_from_so(self):
"""
When we learn the SOHNUM, we can copy information from the sales order.
"""
result = self._get_so_from_x3()
self.header.salfcy = result.SALFCY_0
self.header.stofcy = result.STOFCY_0
self.header.bpcord = result.BPCORD_0
self.header.bpaadd = result.BPAADD_0
self.header.cur = result.CUR_0
self.header.invdtaamt_2 = result.INVDTAAMT_2
self.header.invdtaamt_3 = result.INVDTAAMT_3
self.header.invdtaamt_4 = result.INVDTAAMT_4
self.header.invdtaamt_5 = result.INVDTAAMT_5
self.header.invdtaamt_6 = result.INVDTAAMT_6
self.header.invdtaamt_7 = result.INVDTAAMT_7
self.header.invdtaamt_8 = result.INVDTAAMT_8
self.header.invdtaamt_9 = result.INVDTAAMT_9
self._copy_accounting_codes(result)
self.header.bpdnam = result.BPDNAM_0
self.header.bpdaddlig = result.BPDADDLIG_0
self.header.bpdaddlig_1 = result.BPDADDLIG_1
self.header.bpdaddlig_2 = result.BPDADDLIG_2
self.header.bpdposcod = result.BPDPOSCOD_0
self.header.bpdcty = result.BPDCTY_0
self.header.bpdsat = result.BPDSAT_0
self.header.bpdcry = result.BPDCRY_0
self.header.bpdcrynam = result.BPDCRYNAM_0
def output(self, import_file: typing.TextIO):
"""
Output entire order to import_file.
"""
output = functools.partial(
yamamotoyama.x3_imports.output_with_file, import_file
)
output(self.header.convert_to_strings())
for detail in self.details:
output(detail.convert_to_strings())
for subdetail in detail.subdetails:
output(subdetail.convert_to_strings())
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff

View File

@ -1,571 +0,0 @@
#!/usr/bin/env python3
"""
Make a 943 in "X12" format without using the Javascript Middleware
developed by Tech4Biz.
A 943 is a replenishment shipping notice: we notify our 3PL that we have
sent them resupplies of our inventory.
"""
import datetime
import dataclasses
import typing
import pathlib
import pprint
import records
import yamamotoyama # type:ignore
SITE_MAPPING = {
'WNJ' : 'SOURCELOGISTICS',
'WCA' : 'SOURCELOGISTICS',
'WON' : 'SHANDEX ' #Testing value is SHANDEXTEST with spaces to 15 characters
}
SHIPPING_CODE_MAPPING = {
'' : 'LT', #Default to LTL
'AIR' : 'AP', #Air package carrier
'DEL' : 'LT', #LTL and the default if mode is not entered
'GRN' : 'D', #Parcel post
'OUR' : 'SR', #Supplier truck
'P/U' : 'CE', #Pickup
'WCALL' : 'CE', #Pickup
}
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_SHANDEX = THIS_DIRECTORY / "outgoing"
def main():
"""
Do it!
"""
with yamamotoyama.get_connection() as database:
shipments = list(get_shipments(database))
for shipment in shipments:
write_943(database, shipment)
class X12:
"""
X12 format parent class.
"""
@staticmethod
def line(items: typing.List[str]) -> str:
"""
Return X12 EDI line with * and ~
"""
def sanitize(thing: str) -> str:
for bad_character in ("*", "/", "&", "#", ","):
thing = thing.replace(bad_character, "")
return thing
return "*".join([sanitize(item) for item in items]) + "~"
def x12(self) -> str:
"""
X12 format.
"""
raise NotImplementedError
@staticmethod
def control_number() -> int:
"""
Next EDI serial number
"""
filepath = pathlib.Path(__file__).with_suffix(".remember")
encoding = "utf-8"
newline = "\n"
try:
with filepath.open(
"r", encoding=encoding, newline=newline
) as remember_file:
number = int(remember_file.readline().rstrip("\n"))
except (OSError, ValueError):
number = 0
number += 1
with filepath.open("w", encoding=encoding, newline=newline) as remember_file:
remember_file.write(f"{number}\n")
return number
def write_943(database: records.Connection, shipment: str):
"""
Write out a 943 to a file
"""
now = datetime.datetime.now()
site = str(get_shipment_destination(database, shipment))
datestamp_string = now.strftime("%Y-%m-%d-%H-%M-%S")
with (X12_SHANDEX / f"{site}-{shipment}-{datestamp_string}-943.edi").open(
"w", encoding="utf-8", newline="\n"
) as x12_file:
output = x12_file.write
is_header_output = False
header = None
detail_count = 0
package_count = 0
for shipment_database_row in get_shipment(database, shipment):
ship_to_site = shipment_database_row.BPCORD_0
mdl = shipment_database_row.MDL_0
x12_ship_to = SITE_MAPPING[ship_to_site]
x12_mdl = SHIPPING_CODE_MAPPING[mdl]
header = ShipmentHeader(shipment_database_row)
detail = ShipmentDetail(shipment_database_row)
if not is_header_output:
output(header.x12(x12_ship_to,x12_mdl))
is_header_output = True
output(detail.x12())
detail_count += 1
package_count += detail.qtystu_0
if header:
output(header.footer(package_count, detail_count))
with database.transaction() as _:
database.query(
"""
update [PROD].[SDELIVERY]
set [XX4S_943RDY_0] = 1
where [SOHNUM_0] = :shipment
""",
shipment=shipment,
)
order = get_order_for_shipment(database, shipment)
database.query(
"""
update [PROD].[SORDER]
set [XX4S_UDF2_0] = :sent_message
where [SOHNUM_0] = :order
""",
order=order,
sent_message=f"943 Sent {datetime.date.today().isoformat()}",
)
def get_shipment_destination(database: records.Connection, shipment: str) -> str:
"""
Get the destination site
"""
return (
database.query(
"""
select
[SDH].[BPCORD_0]
from [PROD].[SDELIVERY] as [SDH]
where
[SDH].[SDHNUM_0] = :shipment
""",
shipment=shipment,
)
.first()
.BPCORD_0
)
def get_order_for_shipment(database: records.Connection, shipment: str) -> str:
"""
What is the order for this shipment?
"""
return (
database.query(
"""
select
[SDH].[SOHNUM_0]
from [PROD].[SDELIVERY] as [SDH]
where
[SDH].[SDHNUM_0] = :shipment
""",
shipment=shipment,
)
.first()
.SOHNUM_0
)
def get_shipments(database: records.Connection) -> typing.Iterator[str]:
"""
What have we shipped? Fetch from X3.
"""
for shipment_result in database.query(
"""
select
[SDH].[SDHNUM_0]
from [PROD].[SDELIVERY] as [SDH]
join [PROD].[SORDER] as [SOH]
on [SOH].[SOHNUM_0] = [SDH].[SOHNUM_0]
where
(
[SDH].[STOFCY_0] in ('PMW','WCA','WNJ')
and [SDH].[BPCORD_0] in ('WON')
and nullif([SDH].[MDL_0],'') is not null
and nullif([SDH].[BPTNUM_0],'') is not null
)
and [SDH].[SHIDAT_0] >= {d'2023-10-09'}
and (
[SOH].[XX4S_UDF2_0] not like '943%'
or [SDH].[XX4S_943RDY_0] = 2
)
and (
[SDH].[CFMFLG_0] = 2
or [SDH].[YLICPLATE_0] <> ''
or [SDH].[XX4S_943RDY_0] = 2
)
"""
):
yield shipment_result.SDHNUM_0
def get_shipment(
database: records.Connection, shipment: str
) -> typing.Iterator[records.Record]:
"""
Get shipment information from X3 database.
"""
yield from database.query(
"""
select
[SDHNUM_0]
,[STOFCY_0]
,[STOFCY]
,[SDHCAT_0]
,[SALFCY_0]
,[SALFCY]
,[SOHNUM_0]
,[CUSORDREF_0]
,[BPCORD_0]
,[BPDNAM_0]
,[BPDADDLIG_0]
,[BPDADDLIG_1]
,[BPDADDLIG_2]
,[CTY_0]
,[SAT_0]
,[POSCOD_0]
,[CRY_0]
,[CRYNAM_0]
,[BPCINV_0]
,[BPINAM_0]
,[BPTNUM_0]
,[BPTNAM_0]
,[MDL_0]
,[SCAC_0]
,[YLICPLATE_0]
,[DLVDAT_0]
,[CREDAT_0]
,[UPDDAT_0]
,[CCE_0]
,[SOPLIN_0]
,[ITMREF_0]
,[ITMDES1_0]
,[ZCASEUPC_0]
,[EANCOD_0]
,[STU_0]
,[QTYSTU_0]
,[LotQty]
,[LOT_0]
,[NETWEI_0]
,[GROWEI_0]
,[CFMFLG_0]
,[SHIDAT_0]
from [PROD].[zyumiddleware_shipment_shandex] as [SDH]
where
[SDH].[SDHNUM_0] = :shipment
""",
shipment=shipment,
)
@dataclasses.dataclass
class ShipmentHeader(X12):
"""
Header
"""
sdhnum: str
stofcy: str
stofcy: str
sdhcat: str
salfcy: str
salfcy: str
sohnum: str
cusordref: str
bpcord: str
bpdnam: str
bpdaddlig: str
bpdaddlig_1: str
bpdaddlig_2: str
cty: str
sat: str
poscod: str
cry: str
crynam: str
bpcinv: str
bpinam: str
bptnum: str
bptnam: str
scac: str
ylicplate: str
dlvdat: datetime.date
credat: datetime.date
upddat: datetime.date
cce: str
short_control_number: str
interchange_control_number: str
header_segments: int
footer_segments: int
def __init__(self, database_row: records.Record):
self.sdhnum = database_row.SDHNUM_0
self.stofcy = database_row.STOFCY_0
self.sdhcat = database_row.SDHCAT_0
self.salfcy = database_row.SALFCY_0
self.salfcy = database_row.SALFCY_0
self.sohnum = database_row.SOHNUM_0
self.cusordref = database_row.CUSORDREF_0
self.bpcord = database_row.BPCORD_0
self.bpdnam = database_row.BPDNAM_0
self.bpdaddlig = database_row.BPDADDLIG_0
self.bpdaddlig_1 = database_row.BPDADDLIG_1
self.bpdaddlig_2 = database_row.BPDADDLIG_2
self.cty = database_row.CTY_0
self.sat = database_row.SAT_0
self.poscod = database_row.POSCOD_0
self.cry = database_row.CRY_0
self.crynam = database_row.CRYNAM_0
self.bpcinv = database_row.BPCINV_0
self.bpinam = database_row.BPINAM_0
self.bptnum = database_row.BPTNUM_0
self.bptnam = database_row.BPTNAM_0
self.scac = database_row.SCAC_0
self.ylicplate = database_row.YLICPLATE_0
self.dlvdat = database_row.DLVDAT_0
self.credat = database_row.CREDAT_0
self.upddat = database_row.UPDDAT_0
self.cce = database_row.CCE_0
self.shidat = database_row.SHIDAT_0
raw_control_number = self.control_number()
self.short_control_number = f"{raw_control_number:04}"
self.interchange_control_number = (
f"{raw_control_number:09}" # Format to 9 characters
)
self.now = datetime.date.today()
self.date = self.now.strftime("%y%m%d")
self.long_date = self.now.strftime("%Y%m%d")
self.time = self.now.strftime("%H%m")
self.header_segments = 10
self.footer_segments = 2
def x12(self, receiver_id, mdl) -> str:
return "".join(
[
f"ISA*00* *00* *ZZ*YAMAMOTOYAMA *ZZ*{receiver_id}*",
self.date,
"*",
self.time,
"*U*00401*",
self.interchange_control_number,
"*0*P*>~",
self.line(
[
"GS",
"OW",#should this be AR? Shandex okayed "OW"
"YAMAMOTOYAMA",
f"{receiver_id}",
self.long_date,
self.time,
self.interchange_control_number,
"X",
"004010",
]
),
self.line(
[
"ST",
"943",
self.short_control_number,
]
),
self.line(
[
"W06",
"N",
self.sohnum,
self.shidat.strftime("%Y%m%d"),
self.sdhnum,
"",
"",
"",
"",
"",
"",
"AS",
]
),
"N1*RE*Pomona Wholesale*1*008930755~",
"N4*Pomona*CA*91768*US~",
self.line(
[
"N1",
"ST", # Ship to
self.bpdnam,
"53", # Building
self.bpcord,
]
),
self.line(
[
"N3",
self.bpdaddlig,
]
),
self.line(
[
"N4",
self.cty,
self.sat,
self.poscod,
self.cry,
]
),
self.line(
[
"N9",
"PO",
self.cusordref,
]
),
self.line(
[
"G62",
"17",
self.dlvdat.strftime("%Y%m%d"),
]
),
self.line(
[
"W27",
f"{mdl}",
self.scac,
]
),
]
)
def footer(self, package_count: int, detail_count: int):
"""
End footer
"""
segment_count = self.header_segments + (detail_count * 3) + self.footer_segments
return "".join(
[
self.line(
[
"W03",
str(int(package_count)),
]
),
self.line(
[
"SE",
str(segment_count),
self.short_control_number,
]
),
self.line(
[
"GE",
"1",
self.interchange_control_number,
]
),
self.line(
[
"IEA",
"1",
self.interchange_control_number,
]
),
]
)
@dataclasses.dataclass
class ShipmentDetail(X12):
"""
Shipment detail.
"""
soplin_0: str
itmref_0: str
itmdes1_0: str
stu_0: str
qtystu_0: int
lot_0: str
stofcy_0: str
zcaseupc_0: str
eancod_0: str
gtin_or_upc_code: str
gtin_or_upc_marker: str
def __init__(self, db_record: records.Record):
self.soplin_0 = str(db_record.SOPLIN_0)
self.itmref_0 = db_record.ITMREF_0
self.itmdes1_0 = db_record.ITMDES1_0
self.stu_0 = db_record.STU_0
self.qtystu_0 = db_record.LotQty
self.lot_0 = db_record.LOT_0
self.stofcy_0 = db_record.STOFCY_0
if self.stofcy_0 == 'WON' and self.stu_0 == 'CS': #Shadex requires CA for cases
self.stu_0 = 'CA'
self.zcaseupc_0 = db_record.ZCASEUPC_0
self.eancod_0 = db_record.EANCOD_0
self.gtin_or_upc_marker = 'UK'
if self.stu_0 == 'CS':
self.gtin_or_upc_code = self.zcaseupc_0
else:
self.gtin_or_upc_code = self.eancod_0
def x12(self) -> str:
"""
Format in X12
"""
return "".join(
[
self.line(
[
"W04",
str(int(self.qtystu_0)),
self.stu_0,
"",
"VN", # Vendor's (Seller's) Item Number
self.itmref_0,
"LT", # Lot number
self.lot_0,
"",
"",
"",
"",
"",
"",
self.gtin_or_upc_marker,
self.gtin_or_upc_code.replace(' ',''),#W04-15
]
),
self.line(
[
"G69",
self.itmdes1_0,
]
),
self.line(
[
"N9",
"LI",
self.soplin_0,
]
),
]
)
if __name__ == "__main__":
main()

View File

@ -1,557 +0,0 @@
#!/usr/bin/env python3
"""
Consume a generic 944 file from 3PLs, and translate into a Sage X3
readable file - import template ZPTHI.
For Shandex we also need to reply with a 997
"""
# pylint: disable=too-many-instance-attributes
import dataclasses
import datetime
import decimal
import functools
import pathlib
import re
import shutil
import typing
import pprint
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import records # type: ignore
import yamamotoyama # type: ignore
import yamamotoyama.x3_imports # type: ignore
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "incoming"
IMPORTS_DIRECTORY = THIS_DIRECTORY / "x3_imports"
EDI_997_DIRECTORY = THIS_DIRECTORY / "997_processing"
SHANDEX_944_FILENAME_RE = re.compile(
r"\A 944_STASH-YAMAMOTOYAMA_ \S+ [.]edi \Z", re.X | re.M | re.S
)
def main():
"""
Do it!
"""
for edi_filename in X12_DIRECTORY.iterdir():
if SHANDEX_944_FILENAME_RE.match(edi_filename.name):
process_file(edi_filename)
# file moved to 997 processing folder to be sent later
shutil.move(edi_filename, EDI_997_DIRECTORY / edi_filename.name)
combine_zpthis()
def new_944_alert(sdhnum, pohnum, rcpdat):
msg = MIMEMultipart()
msg['Subject'] = 'New Receipt from Shandex'
msg['Precedence'] = 'bulk'
msg['From'] = 'x3report@stashtea.com'
msg['To'] = 'isenn@yamamotoyama.com'
msg['CC'] = 'bleeson@stashtea.com'
emailtext = f'Delivery: {sdhnum}\nPO: {pohnum}\nDate: {rcpdat}'
msg.attach(MIMEText(emailtext, 'plain'))
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
smtp.send_message(msg)
def validation_alert(sdhnum):
msg = MIMEMultipart()
msg['Subject'] = 'New Receipt from Shandex'
msg['Precedence'] = 'bulk'
msg['From'] = 'x3report@stashtea.com'
msg['To'] = 'isenn@yamamotoyama.com'
msg['CC'] = 'bleeson@stashtea.com'
emailtext = f'A Shandex receipt for {sdhnum} could not be loaded into X3 because the shipment is not validated.'
msg.attach(MIMEText(emailtext, 'plain'))
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
smtp.send_message(msg)
def combine_zpthis():
"""
Collect all ZPTHI imports into a single file for easy import.
"""
archive_directory = IMPORTS_DIRECTORY / "archive"
archive_directory.mkdir(exist_ok=True)
with (IMPORTS_DIRECTORY / "ZPTHI.dat").open(
"w", encoding="utf-8", newline="\n"
) as combined_import_file:
for individual_import_filename in IMPORTS_DIRECTORY.glob(
"ZPTHI_*.dat"
):
with individual_import_filename.open(
"r", encoding="utf-8", newline="\n"
) as individual_import_file:
for line in individual_import_file:
combined_import_file.write(line)
shutil.move(
individual_import_filename,
archive_directory / individual_import_filename.name,
)
def tokens_from_edi_file(
edi_filename: pathlib.Path,
) -> typing.Iterator[typing.List[str]]:
"""
Read tokens from EDI file
"""
with edi_filename.open(encoding="utf-8", newline="") as edi_file:
for record in edi_file.read().split("~"):
fields = record.split("*")
if fields[0] in {
"ISA",
"ST",
"N2",
"N3",
"N4",
"LX",
}:
continue
yield fields
def find_shipment_line(sdhnum, itmref):
with yamamotoyama.get_connection() as database:
result = database.query(
"""
select
SDDLIN_0
from PROD.SDELIVERYD
where
SDHNUM_0 = :sdhnum
and ITMREF_0 = :itmref
""",
sdhnum=sdhnum,
itmref=itmref
).first()['SDDLIN_0']
return result
def check_shipment_status(delivery):
with yamamotoyama.get_connection() as database:
result = database.query(
"""
select
SDH.SDHNUM_0,
CFMFLG_0
from PROD.SDELIVERY SDH
where SDH.SDHNUM_0 = :sdhnum
""",
sdhnum=delivery
).first()['CFMFLG_0']
if result == 2:
return True
return False
def process_file(edi_filename: pathlib.Path):
"""
Convert a specific EDI file into an import file.
"""
warehouse_receipt = Receipt()
pohnum = ''
for fields in tokens_from_edi_file(edi_filename):
if fields[0] == "W17":
_, _, rcpdat, _, sohnum, sdhnum = fields[:6]
warehouse_receipt.sdhnum = sdhnum
validated = check_shipment_status(sdhnum)
if not validated:
validation_alert(sdhnum)
break
warehouse_receipt.header.rcpdat = datetime.datetime.strptime(
rcpdat, "%Y%m%d"
).date() # 20230922
if fields[0] == "N9" and fields[1] == "PO":
pohnum = fields[2]
if fields[0] == "W07":
# W07*1023*CA**PN*C08249*LT*07032026A***UK*10077652082491
# N9*LI*1000
_, qty_str, uom, _, _, itmref, _, lot = fields[:8]
subdetail = ReceiptSubDetail(
qtypcu=int(qty_str),
lot=lot,
)
if fields[0] == 'N9' and fields[1] == 'LI':
# N9*LI*1000
#line = fields[2] #This line isn't the line number from X3, it needs to be looked up
line = find_shipment_line(warehouse_receipt.sdhnum, itmref)
warehouse_receipt.append(
ReceiptDetail(
sdhnum=warehouse_receipt.sdhnum,
itmref=itmref,
qtyuom=int(qty_str),
poplin=int(line),
uom=uom
),
subdetail,
)
time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
new_944_alert(sdhnum, pohnum, warehouse_receipt.header.rcpdat)
with yamamotoyama.x3_imports.open_import_file(
IMPORTS_DIRECTORY / f"ZPTHI_{warehouse_receipt.sdhnum}_{time_stamp}.dat"
) as import_file:
warehouse_receipt.output(import_file)
@dataclasses.dataclass
class ReceiptSubDetail:
"""
Information that goes onto a receipt sub-detail line, taken from ZPTHI template.
"""
sta: str = "A"
pcu: str = ""
qtypcu: int = 0
loc: str = ""
lot: str = ""
bpslot: str = ""
sernum: str = ""
def stojous(self, shipment, item) -> typing.List[str]:
"""
Convert grouped lot quantities into individual STOJOU records to fit on receipt
"""
with yamamotoyama.get_connection() as database:
details = (
database.query(
"""
select
'S' [Code],
'A' [STA_0],
[STJ].[PCU_0],
cast(cast(-1*[STJ].[QTYSTU_0] as int) as nvarchar) [QTYPCU_0],
[STJ].[LOT_0],
'' [BPSLOT_0],
'' [SERNUM_0]
from [PROD].[STOJOU] [STJ]
where
[STJ].[VCRNUM_0] = :sdhnum
and [STJ].[ITMREF_0] = :itmref
and [STJ].[LOT_0] = :lot
and [STJ].[TRSTYP_0] = 4
""",
sdhnum=shipment,
itmref=item,
lot=self.lot,
)
.all()
)
return details
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to strings for X3 import writing.
"""
return yamamotoyama.x3_imports.convert_to_strings(
[
"S",
self.sta,
self.pcu,
self.qtypcu,
self.lot,
self.bpslot,
self.sernum,
]
)
@dataclasses.dataclass
class ReceiptDetail:
"""
Information that goes on a receipt detail line, taken from ZPTHI template.
"""
sdhnum: str = ""
poplin: int = 0
itmref: str = ""
itmdes: str = ""
uom: str = ""
qtyuom: int = 0
pjt: str = ""
star65: str = ""
star91: str = ""
star92: str = ""
subdetails: typing.List[ReceiptSubDetail] = dataclasses.field(
default_factory=list
)
def append(self, subdetail: ReceiptSubDetail):
"""
Add subdetail
"""
subdetail.pcu = self.uom
self.subdetails.append(subdetail)
def check_subdetail_qty(self):
"""
Check for shortages by totaling up subdetail quantities.
"""
total_cases = 0
for subdetail in self.subdetails:
total_cases -= subdetail.qtypcu
return abs(total_cases)
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to strings for X3 import writing.
"""
def fix_uom(uom):
x3_uom = ''
if uom == 'CA':
x3_uom = 'CS'
else:
x3_uom = uom
return x3_uom
self.qty = self.check_subdetail_qty()
return yamamotoyama.x3_imports.convert_to_strings(
[
"L",
self.sdhnum,
self.poplin,
self.itmref,
fix_uom(self.uom),
self.qty,
self.star65,
self.star91,
self.star92,
]
)
def __eq__(self, item: typing.Any) -> bool:
"""
Test for equality
"""
if isinstance(item, str):
return self.itmref == item
if isinstance(item, ReceiptDetail):
return self.itmref == item.itmref
return False
# def fill(self):#not needed for receipts
# """
# Set soplin & itmdes from itmref & sohnum
# """
# def get() -> records.Record:
# with yamamotoyama.get_connection() as database:
# how_many = (
# database.query(
# """
# select
# count(*) as [how_many]
# from [PROD].[SORDERP] as [SOP]
# where
# [SOP].[SOHNUM_0] = :sohnum
# and [SOP].[ITMREF_0] = :itmref
# """,
# sohnum=self.sohnum,
# itmref=self.itmref,
# )
# .first()
# .how_many
# )
# if how_many == 1:
# return database.query(
# """
# select top 1
# [SOP].[SOPLIN_0]
# ,[SOP].[ITMDES1_0]
# ,[SOP].[SAU_0]
# from [PROD].[SORDERP] as [SOP]
# where
# [SOP].[SOHNUM_0] = :sohnum
# and [SOP].[ITMREF_0] = :itmref
# order by
# [SOP].[SOPLIN_0]
# """,
# sohnum=self.sohnum,
# itmref=self.itmref,
# ).first()
result = get()
self.soplin = result.SOPLIN_0
self.itmdes = result.ITMDES1_0
self.sau = result.SAU_0
@dataclasses.dataclass
class ReceiptHeader:
"""
Information that goes on a receipt header, taken from ZPTHI template.
"""
stofcy: str = ""
bpcord: str = ""
prhfcy: str = ""
rcpdat: datetime.date = datetime.date(1753, 1, 1)
pthnum: str = ""
bpsnum: str = ""
cur: str = "USD"
star71 = ""
star72 = ""
star81 = ""
star82 = ""
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to X3 import line
"""
return yamamotoyama.x3_imports.convert_to_strings(
[
"E",
self.bpcord,
self.rcpdat.strftime("%Y%m%d"),
self.pthnum,
self.stofcy,
self.cur,
self.star71,
self.star72,
self.star81,
self.star82,
]
)
class ReceiptDetailList:
"""
List of receipt details
"""
_details: typing.List[ReceiptDetail]
_item_set: typing.Set[str]
def __init__(self):
self._details = []
self._item_set = set()
def append(
self,
receipt_detail: ReceiptDetail,
receipt_subdetail: ReceiptSubDetail,
):
"""
Append
"""
itmref = receipt_detail.itmref
if itmref in self._item_set:
for detail in self._details:
if detail == itmref:
detail.subdetails.append(receipt_subdetail)
return
self._item_set.add(itmref)
#receipt_detail.fill()
receipt_detail.append(receipt_subdetail)
self._details.append(receipt_detail)
def __iter__(self):
return iter(self._details)
class Receipt:
"""
Warehouse receipt, both header & details
"""
header: ReceiptHeader
details: ReceiptDetailList
_sdhnum: str
def __init__(self):
self.header = ReceiptHeader()
self._sdhnum = ""
self.details = ReceiptDetailList()
def append(
self,
receipt_detail: ReceiptDetail,
receipt_subdetail: ReceiptSubDetail,
):
"""
Add detail information.
"""
self.details.append(receipt_detail, receipt_subdetail)
@property
def sdhnum(self):
"""
shipment number
"""
return self._sdhnum
@sdhnum.setter
def sdhnum(self, value: str):
if self._sdhnum != value:
self._sdhnum = value
if value:
self._fill_info_from_shipment()
def _get_shipment_from_x3(self) -> records.Record:
"""
Fetch shipment from X3 database.
"""
with yamamotoyama.get_connection() as db_connection:
return db_connection.query(
"""
select
[SDH].[STOFCY_0],
[SDH].[SDHNUM_0],
[SDH].[SALFCY_0],
[SDH].[BPCORD_0],
[SDH].[CUR_0],
[SDH].[SOHNUM_0]
from [PROD].[SDELIVERY] [SDH]
where
[SDH].[SDHNUM_0] = :shipment
""",
shipment=self.sdhnum,
).first()
def _fill_info_from_shipment(self):
"""
When we learn the SOHNUM, we can copy information from the sales order.
"""
result = self._get_shipment_from_x3()
self.header.stofcy = result.STOFCY_0
self.header.sdhnum = result.SDHNUM_0
self.header.salfcy = result.SALFCY_0
self.header.bpcord = result.BPCORD_0
self.header.cur = result.CUR_0
self.header.sohnum = result.SOHNUM_0
def output(self, import_file: typing.TextIO):
"""
Output entire order to import_file.
"""
output = functools.partial(
yamamotoyama.x3_imports.output_with_file, import_file
)
output(self.header.convert_to_strings())
for detail in self.details:
output(detail.convert_to_strings())
for subdetail in detail.subdetails:
shipment = detail.sdhnum
item = detail.itmref
for record in subdetail.stojous(shipment, item):
record_list = [
record['Code'],
record['STA_0'],
record['PCU_0'],
record['QTYPCU_0'],
record['LOT_0'],
record['BPSLOT_0'],
record['SERNUM_0']
]
#pprint.pprint(record_list)
output(record_list)
if __name__ == "__main__":
main()

View File

@ -1,672 +0,0 @@
#!/usr/bin/env python3
"""
Consume a 945 file from Source Logistics, and translate into a Sage X3
readable file.
"""
# pylint: disable=too-many-instance-attributes
import dataclasses
import datetime
import decimal
import functools
import pathlib
import re
import shutil
import typing
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import records # type: ignore
import yamamotoyama # type: ignore
import yamamotoyama.x3_imports # type: ignore
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "incoming"
IMPORTS_DIRECTORY = THIS_DIRECTORY / "x3_imports"
SOURCE_945_FILENAME_RE = re.compile(
r"\A Yamamotoyama_945_ .* [.]edi \Z", re.X | re.M | re.S
)
msg = MIMEMultipart()
msg['Subject'] = '945 processing error: Possible duplicate order lines?'
msg['Precedence'] = 'bulk'
msg['From'] = 'x3report@stashtea.com'
msg['To'] = 'bleeson@stashtea.com'
def main():
"""
Do it!
"""
for edi_filename in X12_DIRECTORY.iterdir():
if SOURCE_945_FILENAME_RE.match(edi_filename.name):
process_file(edi_filename)
shutil.move(edi_filename, X12_DIRECTORY / "archive" / edi_filename.name)
def tokens_from_edi_file(
edi_filename: pathlib.Path,
) -> typing.Iterator[typing.List[str]]:
"""
Read tokens from EDI file
"""
with edi_filename.open(encoding="utf-8", newline="") as edi_file:
for record in edi_file.read().split("~"):
fields = record.split("*")
if fields[0] in {
"ISA",
"GS",
"ST",
"N1",
"N2",
"N3",
"N4",
"G62",
"W27",
"W10",
"LX",
"MAN",
}:
continue
yield fields
def process_file(edi_filename: pathlib.Path):
"""
Convert a specific EDI file into an import file.
"""
tracking_number_not_found = True
warehouse_shipment = WarehouseShipment()
warehouse_shipment.header.ylicplate = '' #if we don't find a tracking number, submit a blank
for fields in tokens_from_edi_file(edi_filename):
if fields[0] == "W06":
_, _, sohnum, shidat_str = fields[:4]
warehouse_shipment.sohnum = sohnum
warehouse_shipment.header.shidat = datetime.datetime.strptime(
shidat_str, "%Y%m%d"
).date() # 20230922
if fields[0] == "N9" and fields[1] == "2I" and len(fields) > 2 and tracking_number_not_found:
warehouse_shipment.header.ylicplate = fields[2]
tracking_number_not_found = False
if fields[0] == "W12":
# W12*CC*32*32*0*CA**VN*08279*01112025C~
_, _, qty_str, det_qty, _, _, _, _, itmref, lot = fields[:10]
subdetail = WarehouseShipmentSubDetail(
qtypcu=-1 * int(det_qty),
lot=lot,
)
warehouse_shipment.append(
WarehouseShipmentDetail(
sohnum=warehouse_shipment.sohnum,
itmref=itmref,
qty=int(qty_str),
),
subdetail,
)
if fields[0] == "W03":
_, _, weight, _ = fields
warehouse_shipment.header.growei = weight
time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
with yamamotoyama.x3_imports.open_import_file(
IMPORTS_DIRECTORY / f"ZSHIP945S_{warehouse_shipment.sohnum}_{time_stamp}.dat"
) as import_file:
warehouse_shipment.output(import_file)
@dataclasses.dataclass
class WarehouseShipmentSubDetail:
"""
Information that goes onto a shipment sub-detail line, taken from ZSHIP945 template.
"""
sta: str = "A"
pcu: str = ""
qtypcu: int = 0
loc: str = ""
lot: str = ""
sernum: str = ""
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to strings for X3 import writing.
"""
return yamamotoyama.x3_imports.convert_to_strings(
[
"S",
self.sta,
self.pcu,
self.qtypcu,
self.loc,
self.lot,
self.sernum,
]
)
@dataclasses.dataclass
class WarehouseShipmentDetail:
"""
Information that goes on a shipment detail line, taken from ZSHIP945 template.
"""
sohnum: str = ""
soplin: int = 0
itmref: str = ""
itmdes: str = ""
sau: str = ""
qty: int = 0
star91: str = ""
star92: str = ""
subdetails: typing.List[WarehouseShipmentSubDetail] = dataclasses.field(
default_factory=list
)
def append(self, subdetail: WarehouseShipmentSubDetail):
"""
Add subdetail
"""
subdetail.pcu = self.sau
self.subdetails.append(subdetail)
def check_subdetail_qty(self):
"""
Check for shortages by totaling up subdetail quantities.
"""
total_cases = 0
for subdetail in self.subdetails:
total_cases += subdetail.qtypcu
return abs(total_cases)
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to strings for X3 import writing.
"""
self.qty = self.check_subdetail_qty()
return yamamotoyama.x3_imports.convert_to_strings(
[
"L",
self.sohnum,
self.soplin,
self.itmref,
self.itmdes,
self.sau,
self.qty,
self.star91,
self.star92,
]
)
def __eq__(self, item: typing.Any) -> bool:
"""
Test for equality
"""
if isinstance(item, str):
return self.itmref == item
if isinstance(item, WarehouseShipmentDetail):
return self.itmref == item.itmref
return False
def fill(self):
"""
Set soplin & itmdes from itmref & sohnum
"""
def get() -> records.Record:
with yamamotoyama.get_connection() as database:
how_many = (
database.query(
"""
select
count(*) as [how_many]
from [PROD].[SORDERP] as [SOP]
where
[SOP].[SOHNUM_0] = :sohnum
and [SOP].[ITMREF_0] = :itmref
""",
sohnum=self.sohnum,
itmref=self.itmref,
)
.first()
.how_many
)
if how_many == 1:
return database.query(
"""
select top 1
[SOP].[SOPLIN_0]
,[SOP].[ITMDES1_0]
,[SOP].[SAU_0]
from [PROD].[SORDERP] as [SOP]
where
[SOP].[SOHNUM_0] = :sohnum
and [SOP].[ITMREF_0] = :itmref
order by
[SOP].[SOPLIN_0]
""",
sohnum=self.sohnum,
itmref=self.itmref,
).first()
else:
emailtext = str(self.sohnum +' '+str(self.itmref))
msg.attach(MIMEText(emailtext, 'plain'))
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
smtp.send_message(msg)
raise NotImplementedError # TODO
result = get()
self.soplin = result.SOPLIN_0
self.itmdes = result.ITMDES1_0
self.sau = result.SAU_0
@dataclasses.dataclass
class WarehouseShipmentHeader:
"""
Information that goes on a shipment header, taken from ZSHIP945 template.
"""
salfcy: str = "STC"
stofcy: str = ""
sdhnum: str = ""
bpcord: str = ""
bpaadd: str = "SH001"
cur: str = "USD"
shidat: datetime.date = datetime.date(1753, 1, 1)
cfmflg: int = 1
pjt: str = ""
bptnum: str = ""
ylicplate: str = ""
invdtaamt_2: decimal.Decimal = decimal.Decimal()
invdtaamt_3: decimal.Decimal = decimal.Decimal()
invdtaamt_4: decimal.Decimal = decimal.Decimal()
invdtaamt_5: decimal.Decimal = decimal.Decimal()
invdtaamt_6: decimal.Decimal = decimal.Decimal()
invdtaamt_7: decimal.Decimal = decimal.Decimal()
invdtaamt_8: decimal.Decimal = decimal.Decimal()
invdtaamt_9: decimal.Decimal = decimal.Decimal()
die: str = ""
die_1: str = ""
die_2: str = ""
die_3: str = ""
die_4: str = ""
die_5: str = ""
die_6: str = ""
die_7: str = ""
die_8: str = ""
die_9: str = ""
die_10: str = ""
die_11: str = ""
die_12: str = ""
die_13: str = ""
die_14: str = ""
die_15: str = ""
die_16: str = ""
die_17: str = ""
die_18: str = ""
die_19: str = ""
cce: str = ""
cce_1: str = ""
cce_2: str = ""
cce_3: str = ""
cce_4: str = ""
cce_5: str = ""
cce_6: str = ""
cce_7: str = ""
cce_8: str = ""
cce_9: str = ""
cce_10: str = ""
cce_11: str = ""
cce_12: str = ""
cce_13: str = ""
cce_14: str = ""
cce_15: str = ""
cce_16: str = ""
cce_17: str = ""
cce_18: str = ""
cce_19: str = ""
bpdnam: str = ""
bpdaddlig: str = ""
bpdaddlig_1: str = ""
bpdaddlig_2: str = ""
bpdposcod: str = ""
bpdcty: str = ""
bpdsat: str = ""
bpdcry: str = ""
bpdcrynam: str = ""
sdhtyp: str = "SDN"
growei: decimal.Decimal = decimal.Decimal()
pacnbr: int = 0
star71: str = ""
star72: str = ""
star81: str = ""
star82: str = ""
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to X3 import line
"""
return yamamotoyama.x3_imports.convert_to_strings(
[
"H",
self.salfcy,
self.stofcy,
self.sdhnum,
self.bpcord,
self.bpaadd,
self.cur,
self.shidat.strftime("%Y%m%d"),
self.cfmflg,
self.pjt,
self.bptnum,
self.ylicplate,
self.invdtaamt_2,
self.invdtaamt_3,
self.invdtaamt_4,
self.invdtaamt_5,
self.invdtaamt_6,
self.invdtaamt_7,
self.invdtaamt_8,
self.invdtaamt_9,
self.die,
self.die_1,
self.die_2,
self.die_3,
self.die_4,
self.die_5,
self.die_6,
self.die_7,
self.die_8,
self.die_9,
self.die_10,
self.die_11,
self.die_12,
self.die_13,
self.die_14,
self.die_15,
self.die_16,
self.die_17,
self.die_18,
self.die_19,
self.cce,
self.cce_1,
self.cce_2,
self.cce_3,
self.cce_4,
self.cce_5,
self.cce_6,
self.cce_7,
self.cce_8,
self.cce_9,
self.cce_10,
self.cce_11,
self.cce_12,
self.cce_13,
self.cce_14,
self.cce_15,
self.cce_16,
self.cce_17,
self.cce_18,
self.cce_19,
self.bpdnam,
self.bpdaddlig,
self.bpdaddlig_1,
self.bpdaddlig_2,
self.bpdposcod,
self.bpdcty,
self.bpdsat,
self.bpdcry,
self.bpdcrynam,
self.sdhtyp,
self.growei,
self.pacnbr,
self.star71,
self.star72,
self.star81,
self.star82,
]
)
class WarehouseShipmentDetailList:
"""
List of shipment details
"""
_details: typing.List[WarehouseShipmentDetail]
_item_set: typing.Set[str]
def __init__(self):
self._details = []
self._item_set = set()
def append(
self,
shipment_detail: WarehouseShipmentDetail,
shipment_subdetail: WarehouseShipmentSubDetail,
):
"""
Append
"""
itmref = shipment_detail.itmref
if itmref in self._item_set:
for detail in self._details:
if detail == itmref:
detail.subdetails.append(shipment_subdetail)
return
self._item_set.add(itmref)
shipment_detail.fill()
shipment_detail.append(shipment_subdetail)
self._details.append(shipment_detail)
def __iter__(self):
return iter(self._details)
class WarehouseShipment:
"""
Warehosue shipment, both header & details
"""
header: WarehouseShipmentHeader
details: WarehouseShipmentDetailList
_sohnum: str
def __init__(self):
self.header = WarehouseShipmentHeader()
self._sohnum = ""
self.details = WarehouseShipmentDetailList()
def append(
self,
shipment_detail: WarehouseShipmentDetail,
shipment_subdetail: WarehouseShipmentSubDetail,
):
"""
Add detail information.
"""
self.details.append(shipment_detail, shipment_subdetail)
@property
def sohnum(self):
"""
Sales order number
"""
return self._sohnum
@sohnum.setter
def sohnum(self, value: str):
if self._sohnum != value:
self._sohnum = value
if value:
self._fill_info_from_so()
def _get_so_from_x3(self) -> records.Record:
"""
Fetch sales order from X3 database.
"""
with yamamotoyama.get_connection() as db_connection:
return db_connection.query(
"""
select
[SOH].[SALFCY_0]
,[SOH].[STOFCY_0]
,[SOH].[BPCORD_0]
,[SOH].[BPAADD_0]
,[SOH].[CUR_0]
,[SOH].[INVDTAAMT_2]
,[SOH].[INVDTAAMT_3]
,[SOH].[INVDTAAMT_4]
,[SOH].[INVDTAAMT_5]
,[SOH].[INVDTAAMT_6]
,[SOH].[INVDTAAMT_7]
,[SOH].[INVDTAAMT_8]
,[SOH].[INVDTAAMT_9]
,[SOH].[DIE_0]
,[SOH].[DIE_1]
,[SOH].[DIE_2]
,[SOH].[DIE_3]
,[SOH].[DIE_4]
,[SOH].[DIE_5]
,[SOH].[DIE_6]
,[SOH].[DIE_7]
,[SOH].[DIE_8]
,[SOH].[DIE_9]
,[SOH].[DIE_10]
,[SOH].[DIE_11]
,[SOH].[DIE_12]
,[SOH].[DIE_13]
,[SOH].[DIE_14]
,[SOH].[DIE_15]
,[SOH].[DIE_16]
,[SOH].[DIE_17]
,[SOH].[DIE_18]
,[SOH].[DIE_19]
,[SOH].[CCE_0]
,[SOH].[CCE_1]
,[SOH].[CCE_2]
,[SOH].[CCE_3]
,[SOH].[CCE_4]
,[SOH].[CCE_5]
,[SOH].[CCE_6]
,[SOH].[CCE_7]
,[SOH].[CCE_8]
,[SOH].[CCE_9]
,[SOH].[CCE_10]
,[SOH].[CCE_11]
,[SOH].[CCE_12]
,[SOH].[CCE_13]
,[SOH].[CCE_14]
,[SOH].[CCE_15]
,[SOH].[CCE_16]
,[SOH].[CCE_17]
,[SOH].[CCE_18]
,[SOH].[CCE_19]
,[SOH].[BPDNAM_0]
,[SOH].[BPDADDLIG_0]
,[SOH].[BPDADDLIG_1]
,[SOH].[BPDADDLIG_2]
,[SOH].[BPDPOSCOD_0]
,[SOH].[BPDCTY_0]
,[SOH].[BPDSAT_0]
,[SOH].[BPDCRY_0]
,[SOH].[BPDCRYNAM_0]
from [PROD].[SORDER] as [SOH]
where
[SOH].[SOHNUM_0] = :order
""",
order=self.sohnum,
).first()
def _copy_accounting_codes(self, result: records.Record):
"""
Fill in all the accounting codes
"""
self.header.die = result.DIE_0
self.header.die_1 = result.DIE_1
self.header.die_2 = result.DIE_2
self.header.die_3 = result.DIE_3
self.header.die_4 = result.DIE_4
self.header.die_5 = result.DIE_5
self.header.die_6 = result.DIE_6
self.header.die_7 = result.DIE_7
self.header.die_8 = result.DIE_8
self.header.die_9 = result.DIE_9
self.header.die_10 = result.DIE_10
self.header.die_11 = result.DIE_11
self.header.die_12 = result.DIE_12
self.header.die_13 = result.DIE_13
self.header.die_14 = result.DIE_14
self.header.die_15 = result.DIE_15
self.header.die_16 = result.DIE_16
self.header.die_17 = result.DIE_17
self.header.die_18 = result.DIE_18
self.header.die_19 = result.DIE_19
self.header.cce = result.CCE_0
self.header.cce_1 = result.CCE_1
self.header.cce_2 = result.CCE_2
self.header.cce_3 = result.CCE_3
self.header.cce_4 = result.CCE_4
self.header.cce_5 = result.CCE_5
self.header.cce_6 = result.CCE_6
self.header.cce_7 = result.CCE_7
self.header.cce_8 = result.CCE_8
self.header.cce_9 = result.CCE_9
self.header.cce_10 = result.CCE_10
self.header.cce_11 = result.CCE_11
self.header.cce_12 = result.CCE_12
self.header.cce_13 = result.CCE_13
self.header.cce_14 = result.CCE_14
self.header.cce_15 = result.CCE_15
self.header.cce_16 = result.CCE_16
self.header.cce_17 = result.CCE_17
self.header.cce_18 = result.CCE_18
self.header.cce_19 = result.CCE_19
def _fill_info_from_so(self):
"""
When we learn the SOHNUM, we can copy information from the sales order.
"""
result = self._get_so_from_x3()
self.header.salfcy = result.SALFCY_0
self.header.stofcy = result.STOFCY_0
self.header.bpcord = result.BPCORD_0
self.header.bpaadd = result.BPAADD_0
self.header.cur = result.CUR_0
self.header.invdtaamt_2 = result.INVDTAAMT_2
self.header.invdtaamt_3 = result.INVDTAAMT_3
self.header.invdtaamt_4 = result.INVDTAAMT_4
self.header.invdtaamt_5 = result.INVDTAAMT_5
self.header.invdtaamt_6 = result.INVDTAAMT_6
self.header.invdtaamt_7 = result.INVDTAAMT_7
self.header.invdtaamt_8 = result.INVDTAAMT_8
self.header.invdtaamt_9 = result.INVDTAAMT_9
self._copy_accounting_codes(result)
self.header.bpdnam = result.BPDNAM_0
self.header.bpdaddlig = result.BPDADDLIG_0
self.header.bpdaddlig_1 = result.BPDADDLIG_1
self.header.bpdaddlig_2 = result.BPDADDLIG_2
self.header.bpdposcod = result.BPDPOSCOD_0
self.header.bpdcty = result.BPDCTY_0
self.header.bpdsat = result.BPDSAT_0
self.header.bpdcry = result.BPDCRY_0
self.header.bpdcrynam = result.BPDCRYNAM_0
def output(self, import_file: typing.TextIO):
"""
Output entire order to import_file.
"""
output = functools.partial(
yamamotoyama.x3_imports.output_with_file, import_file
)
output(self.header.convert_to_strings())
for detail in self.details:
output(detail.convert_to_strings())
for subdetail in detail.subdetails:
output(subdetail.convert_to_strings())
if __name__ == "__main__":
main()

View File

@ -1,555 +0,0 @@
#!/usr/bin/env python3
"""
Consume a generic 947 file from 3PLs, and translate into a Sage X3
readable file - import template ZSCS.
For Shadex we also need to reply with a 997
947 is warehouse advice, to alert us of damages or amount changes from something
like a count
"""
#what about serial numbers?
#status on L line?
#remove negative line, needs to look up stocou? not sure.
# pylint: disable=too-many-instance-attributes
import dataclasses
import datetime
import decimal
import functools
import pathlib
import re
import shutil
import typing
import smtplib
import pprint
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import records # type: ignore
import yamamotoyama # type: ignore
import yamamotoyama.x3_imports # type: ignore
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "incoming"
IMPORTS_DIRECTORY = THIS_DIRECTORY / "x3_imports"
EDI_997_DIRECTORY = THIS_DIRECTORY / "997_processing"
SHANDEX_947_FILENAME_RE = re.compile(
r"\A 947_QTY_STASH-YAMAMOTOYAMA_ \S+ [.]edi \Z", re.X | re.M | re.S
)
DAMAGE_CODE_MAPPING = {
"07" : 'RD',#Product Dumped or Destroyed
"AV" : 'RD',#Damaged in Transit
"55" : 'A',#Product Taken Off Hold
"AA" : 'A',#Physical Count
"05" : 'Q'#Product Put on Hold
}
DAMAGE_CODE_DESCRIPTIONS_MAPPING = {
"07" : "Product Dumped or Destroyed",
"AV" : "Damaged in Transit",
"55" : "Product Taken Off Hold",
"AA" : "Physical Count",
"05" : "Product Put on Hold"
}
#This transaction can also be used for inventory counts, which we will report on but not process
EMAIL_ONLY_CODES = ['AA']
#When we receive an EDI to change status, it will either be A or Q, the reverse of the earlier code
DAMAGE_CODE_SOURCE_MAPPING = {
"07" : 'A',#Product Dumped or Destroyed
"AV" : 'A',#Damaged in Transit
"55" : 'Q',#Product Taken Off Hold
"AA" : 'A',#Physical Count
"05" : 'A'#Product Put on Hold
}
def main():
"""
Do it!
"""
for edi_filename in X12_DIRECTORY.iterdir():
if SHANDEX_947_FILENAME_RE.match(edi_filename.name):
process_file(edi_filename)
# file moved to 997 processing folder to be sent later
shutil.move(edi_filename, EDI_997_DIRECTORY / edi_filename.name)
combine_zscs()
def combine_zscs():
"""
Collect all ZSCS imports into a single file for easy import.
"""
archive_directory = IMPORTS_DIRECTORY / "archive"
archive_directory.mkdir(exist_ok=True)
with (IMPORTS_DIRECTORY / "ZSCS.dat").open(
"w", encoding="utf-8", newline="\n"
) as combined_import_file:
for individual_import_filename in IMPORTS_DIRECTORY.glob(
"ZSCS_*.dat"
):
with individual_import_filename.open(
"r", encoding="utf-8", newline="\n"
) as individual_import_file:
for line in individual_import_file:
combined_import_file.write(line)
shutil.move(
individual_import_filename,
archive_directory / individual_import_filename.name,
)
def tokens_from_edi_file(
edi_filename: pathlib.Path,
) -> typing.Iterator[typing.List[str]]:
"""
Read tokens from EDI file
"""
with edi_filename.open(encoding="utf-8", newline="") as edi_file:
for record in edi_file.read().split("~"):
fields = record.split("*")
if fields[0] in {
"ISA",
"GS"
"N1",
"N9",
"SE",
"GE",
"IEA"
}:
continue
yield fields
def gtin_lookup(gtin):
with yamamotoyama.get_connection() as db_connection:
return db_connection.query(
"""
select
[ITM].[ITMREF_0],
[ITM].[ITMDES1_0],
[ITM].[EANCOD_0],
[ITM].[ZCASEUPC_0]
from [PROD].[ITMMASTER] [ITM]
join [PROD].[ITMFACILIT] [ITF]
on [ITM].[ITMREF_0] = [ITF].[ITMREF_0]
and [ITF].[STOFCY_0] = 'WON'
where
replace([ITM].[ZCASEUPC_0],' ','') = :zcaseupc
""",
zcaseupc=gtin,
).first()["ITMREF_0"]
def stock_movement_alert(itmref, qty, lot, status):
msg = MIMEMultipart()
msg['Subject'] = 'New Stock Change from Shandex'
msg['Precedence'] = 'bulk'
msg['From'] = 'x3report@stashtea.com'
msg['To'] = 'isenn@yamamotoyama.com, vgomez@yamamotoyama.com'
msg['CC'] = 'bleeson@stashtea.com'
emailtext = f'Item: {itmref}\nQty: {qty}\nLot: {lot}\nStatus: {DAMAGE_CODE_MAPPING[status]}\nReason: {DAMAGE_CODE_DESCRIPTIONS_MAPPING[status]}'
msg.attach(MIMEText(emailtext, 'plain'))
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
smtp.login(user='x3reportmk2@yamamotoyama.com', password=r'n</W<7fr"VD~\2&[pZc5')
smtp.send_message(msg)
def process_file(edi_filename: pathlib.Path):
"""
Convert a specific EDI file into an import file.
"""
warehouse_stockchange = StockChange()
vcrlin = 0
for fields in tokens_from_edi_file(edi_filename):
if fields[0] == "G62":
iptdat = fields[2]
warehouse_stockchange.header.iptdat = datetime.datetime.strptime(
iptdat, "%Y%m%d"
).date()
if fields[0] == 'W15':
transaction_number = fields[2]
if fields[0] == "W19":
vcrlin += 1000
# W19*AV*35*CA**UK*10077652082651***03022026C
_, status, qty, uom, _, _, gtin, _, _, lot = fields[:10]
product = gtin_lookup(gtin)
stock_movement_alert(product, qty, lot, status)
if status in EMAIL_ONLY_CODES:
continue
warehouse_stockchange.header.vcrdes = DAMAGE_CODE_DESCRIPTIONS_MAPPING[status]
subdetail = StockChangeSubDetail(
qtypcu=int(qty),
qtystu=int(qty),
sta=DAMAGE_CODE_MAPPING[status],
pcu=uom
)
detail_line = StockChangeDetail(
vcrlin=vcrlin,
itmref=product,
pcu=uom,
sta=DAMAGE_CODE_SOURCE_MAPPING[status],
lot=lot
)
warehouse_stockchange.append(
detail_line,
subdetail,
)
time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
with yamamotoyama.x3_imports.open_import_file(
IMPORTS_DIRECTORY / f"ZSCS_{transaction_number}_{time_stamp}.dat"
) as import_file:
warehouse_stockchange.output(import_file)
@dataclasses.dataclass
class StockChangeSubDetail:
"""
Information that goes onto a stockchange sub-detail line, taken from ZPTHI template.
"""
pcu: str = ""
qtypcu: int = 0
qtystu: int = 0
loc: str = ""
sta: str = "A"
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to strings for X3 import writing.
"""
def fix_uom(uom):
x3_uom = ''
if uom == 'CA':
x3_uom = 'CS'
else:
x3_uom = uom
return x3_uom
return yamamotoyama.x3_imports.convert_to_strings(
[
"S",
fix_uom(self.pcu),
self.qtypcu,
self.qtystu,
self.loc,
self.sta,
]
)
@dataclasses.dataclass
class StockChangeDetail:
"""
Information that goes on a stockchange detail line, taken from ZPTHI template.
"""
vcrlin: int = 0
itmref: str = ""
pcu: str = ""
pcustucoe: int = 1 #does this need a lookup?
sta: str = "A"
loctyp: str = ""
loc: str = ""
lot: str = ""
slo: str = ""
sernum: str = ""
palnum: str = ""
ctrnum: str = ""
qlyctldem: str= ""
owner: str = "WON"
subdetails: typing.List[StockChangeSubDetail] = dataclasses.field(
default_factory=list
)
def palnum_lookup(self, itmref, lot, status):#TODO prevent the crash when we don't have the lot supplied in X3
"""
Pick a palnum from X3 using the lot, location, and status
It matters which one we use, best attempt is to get the largest one available
"""
with yamamotoyama.get_connection() as db_connection:
result = db_connection.query(
"""
select top 1
[STO].[STOFCY_0],
[STO].[ITMREF_0],
[STO].[LOT_0],
[STO].[PALNUM_0],
[STO].[QTYSTU_0]
from [PROD].[STOCK] [STO]
where
[STO].[ITMREF_0] = :itmref
and [STO].[STOFCY_0] = 'WON'
and [STO].[LOT_0] = :lot
and [STO].[STA_0] = :status
order by
[STO].[QTYSTU_0] desc
""",
itmref=itmref,
lot=lot,
status=status
).first()
if result:
return result["PALNUM_0"]
else:
raise NotImplementedError
def gtin_lookup(self, gtin):
with yamamotoyama.get_connection() as db_connection:
return db_connection.query(
"""
select
[ITM].[ITMREF_0],
[ITM].[ITMDES1_0],
[ITM].[EANCOD_0],
[ITM].[ZCASEUPC_0]
from [PROD].[ITMMASTER] [ITM]
join [PROD].[ITMFACILIT] [ITF]
on [ITM].[ITMREF_0] = [ITF].[ITMREF_0]
and [ITF].[STOFCY_0] = 'WON'
where
[ITM].[ZCASEUPC_0] = :zcaseupc
""",
zcaseupc=gtin,
).first()["ITMREF_0"]
def append(self, subdetail: StockChangeSubDetail):
"""
Add subdetail
"""
subdetail.pcu = self.pcu
self.subdetails.append(subdetail)
def check_subdetail_qty(self):
"""
Check for shortages by totaling up subdetail quantities.
"""
total_cases = 0
for subdetail in self.subdetails:
total_cases -= subdetail.qtypcu
return abs(total_cases)
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to strings for X3 import writing.
"""
def fix_uom(uom):
x3_uom = ''
if uom == 'CA':
x3_uom = 'CS'
else:
x3_uom = uom
return x3_uom
self.qty = self.check_subdetail_qty()
self.palnum = self.palnum_lookup(self.itmref, self.lot, self.sta)
return yamamotoyama.x3_imports.convert_to_strings(
[
"L",
self.vcrlin,
self.itmref,
fix_uom(self.pcu),
self.pcustucoe,
self.sta,
self.loctyp,
self.loc,
self.lot,
self.slo,
self.sernum,
self.palnum,
self.ctrnum,
self.qlyctldem,
self.owner
]
)
def __eq__(self, item: typing.Any) -> bool:
"""
Test for equality
"""
if isinstance(item, str):
return self.itmref == item
if isinstance(item, StockChangeDetail):
return self.itmref == item.itmref
return False
# def fill(self):#not needed for stockchanges
# """
# Set soplin & itmdes from itmref & sohnum
# """
# def get() -> records.Record:
# with yamamotoyama.get_connection() as database:
# how_many = (
# database.query(
# """
# select
# count(*) as [how_many]
# from [PROD].[SORDERP] as [SOP]
# where
# [SOP].[SOHNUM_0] = :sohnum
# and [SOP].[ITMREF_0] = :itmref
# """,
# sohnum=self.sohnum,
# itmref=self.itmref,
# )
# .first()
# .how_many
# )
# if how_many == 1:
# return database.query(
# """
# select top 1
# [SOP].[SOPLIN_0]
# ,[SOP].[ITMDES1_0]
# ,[SOP].[SAU_0]
# from [PROD].[SORDERP] as [SOP]
# where
# [SOP].[SOHNUM_0] = :sohnum
# and [SOP].[ITMREF_0] = :itmref
# order by
# [SOP].[SOPLIN_0]
# """,
# sohnum=self.sohnum,
# itmref=self.itmref,
# ).first()
# result = get()
# self.soplin = result.SOPLIN_0
# self.itmdes = result.ITMDES1_0
# self.sau = result.SAU_0
@dataclasses.dataclass
class StockChangeHeader:
"""
Information that goes on a stockchange header, taken from ZSCS template.
"""
vcrnum: str = ""
stofcy: str = "WON"
iptdat: datetime.date = datetime.date(1753, 1, 1)
vcrdes: str = ""
pjt: str = ""
trsfam: str = "CHX"
def convert_to_strings(self) -> typing.List[str]:
"""
Convert to X3 import line
"""
return yamamotoyama.x3_imports.convert_to_strings(
[
"E",
self.vcrnum,
self.stofcy,
self.iptdat.strftime("%Y%m%d"),
self.vcrdes,
self.pjt,
self.trsfam,
]
)
class StockChangeDetailList:
"""
List of stockchange details
"""
_details: typing.List[StockChangeDetail]
_item_set: typing.Set[str]
def __init__(self):
self._details = []
self._item_set = set()
def append(
self,
stockchange_detail: StockChangeDetail,
stockchange_subdetail: StockChangeSubDetail,
):
"""
Append
"""
itmref = stockchange_detail.itmref
if itmref in self._item_set:
for detail in self._details:
if detail == itmref:
detail.subdetails.append(stockchange_subdetail)
return
self._item_set.add(itmref)
#stockchange_detail.fill()
stockchange_detail.append(stockchange_subdetail)
self._details.append(stockchange_detail)
def __iter__(self):
return iter(self._details)
class StockChange:
"""
Warehouse stockchange, both header & details
"""
header: StockChangeHeader
details: StockChangeDetailList
_sdhnum: str
def __init__(self):
self.header = StockChangeHeader()
self._sdhnum = ""
self.details = StockChangeDetailList()
def append(
self,
stockchange_detail: StockChangeDetail,
stockchange_subdetail: StockChangeSubDetail,
):
"""
Add detail information.
"""
self.details.append(stockchange_detail, stockchange_subdetail)
@property
def sdhnum(self):
"""
shipment number
"""
return self._sdhnum
@sdhnum.setter
def sdhnum(self, value: str):
if self._sdhnum != value:
self._sdhnum = value
if value:
self._fill_info_from_shipment()
def output(self, import_file: typing.TextIO):
"""
Output entire order to import_file.
"""
output = functools.partial(
yamamotoyama.x3_imports.output_with_file, import_file
)
output(self.header.convert_to_strings())
for detail in self.details:
output(detail.convert_to_strings())
for subdetail in detail.subdetails:
#shipment = detail.sdhnum
#item = detail.itmref
output(subdetail.convert_to_strings())
# for record in subdetail.stojous(shipment, item):
# output(subdetail.convert_to_strings())
# output(record)
if __name__ == "__main__":
main()

View File

@ -1,111 +0,0 @@
#!/usr/bin/env python3
"""
Consume a generic 997 file from 3PLs
Functional Acknowledgment
We don't do anything with these, code is holding old skeleton
"""
# pylint: disable=too-many-instance-attributes
import dataclasses
import datetime
import decimal
import functools
import pathlib
import re
import shutil
import typing
import records # type: ignore
import yamamotoyama # type: ignore
import yamamotoyama.x3_imports # type: ignore
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "incoming"
SHANDEX_997_FILENAME_RE = re.compile(
r"\A 997_YAMAMOTOYAMA_ .* [.]edi \Z", re.X | re.M | re.S
)
def main():
"""
Do it!
"""
for edi_filename in X12_DIRECTORY.iterdir():
if SHANDEX_997_FILENAME_RE.match(edi_filename.name):
#process_file(edi_filename)
shutil.move(edi_filename, X12_DIRECTORY / "archive" / edi_filename.name)
def tokens_from_edi_file(
edi_filename: pathlib.Path,
) -> typing.Iterator[typing.List[str]]:
"""
Read tokens from EDI file
"""
with edi_filename.open(encoding="utf-8", newline="") as edi_file:
for record in edi_file.read().split("~"):
fields = record.split("*")
if fields[0] in {
"ISA",
"GS",
"ST",
"N1",
"N2",
"N3",
"N4",
"G62",
"W27",
"W10",
"LX",
"MAN",
}:
continue
yield fields
def process_file(edi_filename: pathlib.Path):
"""
997 is a functional acknowledgment, we don't need to do anything with it in X3
Do we need to send it somewhere for reporting purposes?
"""
# warehouse_receipt = Receipt()
# warehouse_receipt.header.ylicplate = '' #if we don't find a tracking number, submit a blank
# for fields in tokens_from_edi_file(edi_filename):
# if fields[0] == "W06":
# _, _, sohnum, shidat_str = fields[:4]
# warehouse_receipt.sohnum = sohnum
# warehouse_receipt.header.shidat = datetime.datetime.strptime(
# shidat_str, "%Y%m%d"
# ).date() # 20230922
# if fields[0] == "N9" and fields[1] == "2I" and len(fields) > 2 and tracking_number_not_found:
# warehouse_receipt.header.ylicplate = fields[2]
# tracking_number_not_found = False
# if fields[0] == "W12":
# W12*CC*32*32*0*CA**VN*08279*01112025C~
# _, _, qty_str, det_qty, _, _, _, _, itmref, lot = fields[:10]
# subdetail = ReceiptSubDetail(
# qtypcu=-1 * int(det_qty),
# lot=lot,
# )
# warehouse_receipt.append(
# ReceiptDetail(
# sohnum=warehouse_receipt.sohnum,
# itmref=itmref,
# qty=int(qty_str),
# ),
# subdetail,
# )
# if fields[0] == "W03":
# _, _, weight, _ = fields
# warehouse_receipt.header.growei = weight
# time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# with yamamotoyama.x3_imports.open_import_file(
# IMPORTS_DIRECTORY / f"ZSHIP945S_{warehouse_receipt.sohnum}_{time_stamp}.dat"
# ) as import_file:
# warehouse_receipt.output(import_file)
return
if __name__ == "__main__":
main()

View File

@ -1,164 +0,0 @@
#!/usr/bin/env python3
"""
Create a 997 and load it into the proper outgoing folder
A 997 is a functional acknowledgment, we received the EDI tranmission contorl number and reply back with it
"""
import dataclasses
import datetime
import io
import pathlib
import typing
import shutil
import pprint
import records # type: ignore
import yamamotoyama # type: ignore
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_DIRECTORY = THIS_DIRECTORY / "outgoing"
INCOMING_ARCHIVE_DIRECTORY = THIS_DIRECTORY / "incoming"
ACK_DIRECTORY = THIS_DIRECTORY / "997_processing"
AK1_MAPPING = {
"944" : "RE",
"945" : "SW",
"947" : "AW",
"846" : "IB",
"867" : "PT",
}
def main():
"""
Do it!
"""
for edi_filename in ACK_DIRECTORY.iterdir():
write_997(edi_filename)
shutil.move(edi_filename, INCOMING_ARCHIVE_DIRECTORY / "archive" / edi_filename.name)
def write_997(edi_filename: pathlib.Path):
"""
Write out a 997 to a file
"""
group_control_number = ''
transaction_set_control_number = ''
company = ''
edi_type = ''
with open(edi_filename, 'r') as edi_file:
for line in edi_file:
line = line.split("~")
for field in line:
fields = field.split("*")
if fields[0] == 'ISA':
group_control_number = fields[13]
company_longname = fields[6]
elif fields[0] == 'GS':
company = fields[2]
transaction_set_control_number = fields[6]
elif fields[0] == 'ST':
edi_type = fields[1]
now = datetime.datetime.now()
datestamp_string = now.strftime("%Y-%m-%d-%H-%M-%S")
with (X12_DIRECTORY / f"{company}-{transaction_set_control_number}-{datestamp_string}-997.edi").open(
"w", encoding="utf-8", newline="\n"
) as x12_file:
raw_control_number = control_number()
output = x12_file.write
header = write_997_header(raw_control_number,company_longname,company,edi_type,group_control_number)
lines = write_997_lines(edi_type,transaction_set_control_number)
footer = write_997_footer(raw_control_number,group_control_number)
output(header)
output(lines)
output(footer)
def write_997_header(raw_control_number,company_longname,company,edi_type,group_control_number):
now = datetime.datetime.now()
date = now.strftime("%y%m%d")
longdate = now.strftime("%Y%m%d")
time = now.strftime("%H%M")
short_control_number = f"{raw_control_number:04}"
interchange_control_number = (
f"{raw_control_number:09}" # Format to 9 characters
)
AK1 = AK1_MAPPING[edi_type]
header_string = ''.join([
f"ISA*00* *00* *ZZ*YAMAMOTOYAMA *ZZ*{company_longname}*",
date,
"*",
time,
"*U*00401*",
interchange_control_number,
"*0*P*>~",
"GS*",
"FA*",
"YAMAMOTOYAMA*",
company+"*",
longdate+"*",
time+"*",
short_control_number+"*",
"X*",
"004010~",
"ST*",
"997*0001~",
"AK1*",
AK1+"*",
group_control_number+"~",
]
)
return header_string
def write_997_lines(edi_type,transaction_set_control_number):
#short_control_number = f"{group_control_number:04}"
detail_string = ''.join([
"AK2*",
edi_type+"*",
transaction_set_control_number+"~"
"AK5*",
"A~",
"AK9*A*1*1*1~",
])
return detail_string
def write_997_footer(raw_control_number,group_control_number):
interchange_control_number = (
f"{raw_control_number:09}" # Format to 9 characters
)
short_control_number = f"{raw_control_number:04}"
footer_string = ''.join([
"SE*6*0001~",
"GE*1*",
str(short_control_number)+"~",
"IEA*1*",
str(interchange_control_number)
])
return footer_string
def control_number() -> int:
"""
Next EDI serial number
"""
filepath = pathlib.Path(__file__).with_suffix(".remember")
encoding = "utf-8"
newline = "\n"
try:
with filepath.open(
"r", encoding=encoding, newline=newline
) as remember_file:
number = int(remember_file.readline().rstrip("\n"))
except (OSError, ValueError):
number = 0
number += 1
with filepath.open("w", encoding=encoding, newline=newline) as remember_file:
remember_file.write(f"{number}\n")
return number
if __name__ == "__main__":
main()

View File

@ -1,72 +0,0 @@
#!/usr/bin/env python3
"""
Simple checker for Shandex shipments in X3.
Looks for order numbers in X3's Delivery tracking number field.
Shipments sometimes cannot be brought in due to inventory, or they might
be in with a different tracking number.
"""
from tkinter import filedialog
import pprint
import openpyxl
import yamamotoyama
def main():
filecleanup()
file = select_file()
data = get_shipment_information(file)
statuses = shipment_check(data[2:])
write_to_file(statuses)
def write_to_file(statuses):
saveto = filedialog.asksaveasfilename(initialfile='report.txt')
with open(saveto, 'w', encoding='utf8') as outfile:
for status in statuses:
outfile.write(status)
outfile.write('\n')
def shipment_check(order_list):
reportables = []
with yamamotoyama.get_connection() as db_connection:
for shandex_order in order_list:
result = db_connection.query(
"""
select
SDHNUM_0,
YLICPLATE_0
from PROD.SDELIVERY
where YLICPLATE_0 = :order
""",
order=shandex_order,
).all()
if len(result) == 0:
reportables.append(shandex_order + ' was not found in X3.')
elif len(result) > 1:
reportables.append(shandex_order + ' was found multiple times.')
else:
reportables.append(shandex_order +' was found! ' + result[0]['SDHNUM_0']+ '.')
reportables.append('Orders might not be in X3 due to inventory, or missing information in EDI transmissions. Contact IT.')
return reportables
def get_shipment_information(file):
seen_orders = []
wb = openpyxl.load_workbook(file)
ws = wb['Shipped Orders']
for row in ws.iter_rows():
if row[1].value not in seen_orders:
seen_orders.append(row[1].value)
return seen_orders
def filecleanup():
return True
def select_file():
filename = filedialog.askopenfilename(title='Choose Excel file from Shandex')
return filename
if __name__ == "__main__":
main()

View File

@ -1,268 +0,0 @@
#!/usr/bin/env python3
"""
Process entries in the staging database put there by
edi_867 and prepare them for X3 batch importing
"""
import pathlib
import datetime
import pprint
import shutil
import records # type: ignore
import yamamotoyama # type: ignore
import yamamotoyama.x3_imports # type: ignore
THIS_DIRECTORY = pathlib.Path(__file__).parent
IMPORTS_DIRECTORY = THIS_DIRECTORY / 'x3_imports'
SELECT_STATEMENT = """
select
PO
from staging.dbo.shandex_shipments SDH
where
SDH.is_sent = 0
and SDH.bpcord <> ''
"""
SELECT_STATEMENT_TESTING = """
select
PO
from staging.dbo.shandex_shipments SDH
where
PO = '4542_O0216777'
"""
HEADER_STATEMENT = """
select
[H]
,[salfcy]
,[stofcy]
,[sdhnum]
,[bpcinv]
,[bpcord]
,[bpaadd]
,[cur]
,[shidat]
,[cfmflg]
,[pjt]
,[bptnum]
,[ylicplate]
,[yclippership]
,[invdtaamt_2]
,[invdtaamt_3]
,[invdtaamt_4]
,[invdtaamt_5]
,[invdtaamt_6]
,[invdtaamt_7]
,[invdtaamt_8]
,[invdtaamt_9]
,[die]
,[die_1]
,[die_2]
,[die_3]
,[die_4]
,[die_5]
,[die_6]
,[die_7]
,[die_8]
,[die_9]
,[die_10]
,[die_11]
,[die_12]
,[die_13]
,[die_14]
,[die_15]
,[die_16]
,[die_17]
,[die_18]
,[die_19]
,[cce]
,[cce_1]
,[cce_2]
,[cce_3]
,[cce_4]
,[cce_5]
,[cce_6]
,[cce_7]
,[cce_8]
,[cce_9]
,[cce_10]
,[cce_11]
,[cce_12]
,[cce_13]
,[cce_14]
,[cce_15]
,[cce_16]
,[cce_17]
,[cce_18]
,[cce_19]
,[bpdnam]
,[bpdaddlig]
,[bpdaddlig_1]
,[bpdaddlig_2]
,[bpdposcod]
,[bpdcty]
,[bpdsat]
,[bpdcry]
,[bpdcrynam]
,[sdhtyp]
,[growei]
,[pacnbr]
,[star71]
,[star72]
,[star81]
,[star82]
from staging.dbo.shandex_shipments SDH
where SDH.PO = :po
"""
DETAIL_STATEMENT = """
select
[L]
,[sohnum]
,[soplin]
,[itmref]
,[itmdes]
,[sau]
,cast(sum(cast([qty] as int)) as nvarchar)[qty]
,[gropri]
,[star91]
,[star92]
from staging.dbo.shandex_shipment_details SDD
where SDD.PO = :po
group by
[L]
,[sohnum]
,[soplin]
,[itmref]
,[itmdes]
,[sau]
,[gropri]
,[star91]
,[star92]
"""
SUBDETAIL_STATEMENT = """
select distinct
[S]
,[sta]
,[pcu]
,[qtypcu]
,[loc]
,[lot]
,[sernum]
from staging.dbo.shandex_shipment_details SDD
where SDD.PO = :po
and SDD.itmref = :itmref
"""
UPDATE_STATEMENT = """
update staging.dbo.shandex_shipments
set is_sent = 1
where PO = :po
"""
HEADER_NAMES = ['H','salfcy','stofcy','sdhnum','bpcinv','bpcord',
'bpaadd','cur','shidat','cfmflg','pjt','bptnum','ylicplate','yclippership',
'invdtaamt_2','invdtaamt_3','invdtaamt_4','invdtaamt_5','invdtaamt_6','invdtaamt_7',
'invdtaamt_8','invdtaamt_9','die','die_1','die_2','die_3','die_4','die_5','die_6','die_7',
'die_8','die_9','die_10','die_11','die_12','die_13','die_14','die_15','die_16','die_17',
'die_18','die_19','cce','cce_1','cce_2','cce_3','cce_4','cce_5','cce_6','cce_7','cce_8','cce_9',
'cce_10','cce_11','cce_12','cce_13','cce_14','cce_15','cce_16','cce_17','cce_18','cce_19','bpdnam',
'bpdaddlig','bpdaddlig_1','bpdaddlig_2','bpdposcod','bpdcty','bpdsat','bpdcry','bpdcrynam','sdhtyp',
'growei','pacnbr','star71','star72','star81','star82']
DETAIL_NAMES = ['L','sohnum','soplin','itmref','itmdes','sau','qty','gropri','star91','star92']
SUBDETAIL_NAMES = ['S','sta','pcu','qtypcu','loc','lot','sernum']
def get_shipments(database):
with database.transaction():
result = database.query(SELECT_STATEMENT_TESTING).all()#TODO REMOVE TESTING
return result
def get_shipment_headers(database, po):
result = database.query(
HEADER_STATEMENT,
po=po
).first()
return result
def get_shipment_details(database, po):
result = database.query(
DETAIL_STATEMENT,
po=po
).all()
return result
def get_shipment_subdetails(database, po, itmref):
result = database.query(
SUBDETAIL_STATEMENT,
po=po,
itmref=itmref
).all()
return result
def create_imports(shipments, database):
for shipment in shipments:
time_stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
with open(
IMPORTS_DIRECTORY / f"ZSHIP867_{shipment['PO']}_{time_stamp}.dat", 'w', encoding='utf-8', newline='\n'
) as import_file:
headers = get_shipment_headers(database, shipment['PO'])
details = get_shipment_details(database, shipment['PO'])
for name in HEADER_NAMES:
import_file.write(headers[name])
import_file.write(chr(31))
import_file.write('\n')
for record in details:
for name in DETAIL_NAMES:
import_file.write(record[name])
import_file.write(chr(31))
import_file.write('\n')
subdetails = get_shipment_subdetails(database, shipment['PO'], record['itmref'])
for subrecord in subdetails:
for name in SUBDETAIL_NAMES:
import_file.write(subrecord[name])
import_file.write(chr(31))
import_file.write('\n')
def combine_imports():
archive_directory = IMPORTS_DIRECTORY / "archive"
archive_directory.mkdir(exist_ok=True)
with (IMPORTS_DIRECTORY / "ZSHIP867.dat").open(
"w", encoding="utf-8", newline="\n"
) as combined_import_file:
for individual_import_filename in IMPORTS_DIRECTORY.glob(
"ZSHIP867_*.dat"
):
with individual_import_filename.open(
"r", encoding="utf-8", newline="\n"
) as individual_import_file:
for line in individual_import_file:
combined_import_file.write(line)
shutil.move(
individual_import_filename,
archive_directory / individual_import_filename.name,
)
def mark_sent(database, shipments):
with database.transaction():
for shipment in shipments:
result = database.query(
UPDATE_STATEMENT,
po=shipment['PO']
)
def main():
with yamamotoyama.get_connection() as database:
#retrieve everything that has a valid customer and hasn't already been sent to X3
shipments = get_shipments(database)
#turn each shipment into a X3 import file
create_imports(shipments, database)
combine_imports()
#udate the is_sent field so they are not processed again
mark_sent(database, shipments)
if __name__ == "__main__":
main()

View File

@ -1,105 +0,0 @@
#!/usr/bin/env python3
"""
Control middleware
"""
import contextlib
import shutil
import pathlib
import paramiko # type: ignore
import pprint
import edi_997_inbound
import edi_944
import edi_947
import edi_846
import edi_867_to_table
import edi_997_outbound
import update_shandex_dashboard
import edi_943
import unprocessed_files_report
import import_867s
THIS_DIRECTORY = pathlib.Path(__file__).parent
X12_SHANDEX_OUTGOING = THIS_DIRECTORY / "outgoing"
X12_SHANDEX_INCOMING = THIS_DIRECTORY / "incoming"
def main():
"""
Do it!
"""
#pick up files from Shandex
retrieve_x12_edi_files_shandex()
#report on anything not handled
unprocessed_files_report.main()
#process all EDIs that started with Shandex
edi_997_inbound.main()
edi_944.main()
edi_947.main()
edi_846.main()
edi_867_to_table.main()
import_867s.main()
# process all EDIs that start with us
edi_943.main()
edi_997_outbound.main()
#send them to Shandex
send_x12_edi_files_shandex()
#update dashboard - no longer needed now that it has been moved to the staging database 2024-08-20
#update_shandex_dashboard.main()
#report on anything not handled
unprocessed_files_report.main()
SSH_DIRECTORY = THIS_DIRECTORY / "ssh"
SSH_KNOWN_HOSTS_FILE = str(SSH_DIRECTORY / "known_hosts")
SSH_KEY_FILENAME = str(SSH_DIRECTORY / "id_ed25519")
SHANDEX_SFTP_HOST = "ftp.shandex.com"
SHANDEX_SFTP_USERNAME = "Stash"
SHANDEX_SFTP_PASSWORD = "ST@Pass2024$$"
def send_x12_edi_files_shandex():
"""
Connect to FTP & send files.
"""
with paramiko.SSHClient() as ssh_client:
ssh_client.load_system_host_keys()
ssh_client.load_host_keys(SSH_KNOWN_HOSTS_FILE)
ssh_client.set_missing_host_key_policy(paramiko.client.RejectPolicy)
ssh_client.connect(
hostname=SHANDEX_SFTP_HOST, username=SHANDEX_SFTP_USERNAME, password=SHANDEX_SFTP_PASSWORD
)
with ssh_client.open_sftp() as sftp_connection:
sftp_connection.chdir("/Stash/Prod/ToShandex")
for filename in X12_SHANDEX_OUTGOING.glob("*.edi"):
sftp_connection.put(filename, str(filename.name))
shutil.move(filename, X12_SHANDEX_OUTGOING / "archive" / filename.name)
def retrieve_x12_edi_files_shandex():
with paramiko.SSHClient() as ssh_client:
ssh_client.load_system_host_keys()
ssh_client.load_host_keys(SSH_KNOWN_HOSTS_FILE)
ssh_client.set_missing_host_key_policy(paramiko.client.RejectPolicy)
ssh_client.connect(
hostname=SHANDEX_SFTP_HOST, username=SHANDEX_SFTP_USERNAME, password=SHANDEX_SFTP_PASSWORD
)
with ssh_client.open_sftp() as sftp_connection:
sftp_connection.chdir("/Stash/Prod/FromShandex")
for filename in sftp_connection.listdir():
if filename.endswith(".edi"):
sftp_connection.get(filename, X12_SHANDEX_INCOMING / filename)
new_filename = f"/Stash/Archive/{filename}"
sftp_connection.rename(filename, new_filename)
if __name__ == "__main__":
main()

View File

@ -1,25 +0,0 @@
Process EDI documents between YU and Shandex
997 Functional acknowledgment Tell Shandex we received an EDI
944 Stock transfer receipt Receive qty and lots from replenishment shipment
947 Inventory advice Tell Stash of Q and R status inventory
846 Inventory inquiry Inventory comparison
867 Resale report Tell Stash qty, price, and lot sold
943 Stock transfer Tell Shandex qty and lots being shipped
master_controller takes EDI files from the Shandex FTP, processes them, and loads our EDI files.
Individual edi_***.py files process specific types of transactions.
Sales come in via 867 and the shandex_dashboard script stores which files have been received.
The corresponding control numbers can be matched to X3 Sales Deliveries in the YLICPLATE_0 and YCLIPPERSHIP_0 fields.
-------------------
Issues
-------------------
867s will fail to process if a customer has not been mapped for them in the edi_867.py file.
These problems are logged in the staging database in staging.dbo.shandex_shipments. To reprocess,
add the customer code to the table and check it's is_sent field. Make sure to add the right code to
the dictionary in 867 processing as well.
944s need to be rerun in the edi_944.py script after validation, so that lots appear in the import file.

View File

@ -1,7 +0,0 @@
-----BEGIN OPENSSH PRIVATE KEY-----
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW
QyNTUxOQAAACBV8x5ikQyA6ZgAO2BokIZB13CCFjiQIU8bHf8BXrkVLAAAAJj90RvK/dEb
ygAAAAtzc2gtZWQyNTUxOQAAACBV8x5ikQyA6ZgAO2BokIZB13CCFjiQIU8bHf8BXrkVLA
AAAEBIPcJFMSUHOcXD0G85tKPaaSaUfXoYz/pgoffs+Y4ul1XzHmKRDIDpmAA7YGiQhkHX
cIIWOJAhTxsd/wFeuRUsAAAAEHl1bWlkZGxld2FyZTIwMjMBAgMEBQ==
-----END OPENSSH PRIVATE KEY-----

View File

@ -1 +0,0 @@
ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIFXzHmKRDIDpmAA7YGiQhkHXcIIWOJAhTxsd/wFeuRUs yumiddleware2023

View File

@ -1,2 +0,0 @@
s-8ade4d252cc44c50b.server.transfer.us-west-1.amazonaws.com ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDIDQlEH17E193o9cOd0azlbI388A2GX4DzCWriLuj+BI+Jsi4Ij6oSKbsfKsqY0oThzdahxvafr1q1RxX4WN7yKGtQ+osOrXaSdSBOfejTJ9Wtr3DI4g6APoK4KX8luo7lYhmdVsNZtYdd2Wz7gIm4hsFtnSzrCyOvYMQ6mzvZQGGb+3V5Ce2wjYb0TjxdDdiacXXtbopVRuPAARqFz8hYMoKsZEyKuMekbErqiaC99ZZXtfmh9ZOJdSIF0N6loMWQaNtdLoyD1Xts3CDAcSg41wSfDYB3mtuIZEC/WNBj57RDuy93IsxH9z4Ak47cCrpChSpXp4pfajJS7W5g+Hyd
ftp.shandex.com ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDWht+ZlPHMwzohq4gnOTAAdtCt2u7BG/fdONXdvluMopWq8bwTKX0iKV5/7sRIIGimk1zIX19zDGR/5B9BbPFBRrxdKA915L830hj8omdo/ayA7pm/sDE7YdmSzUJ/akaO2KYDqBcpElr0Eb3gKaxy1oJGOR8zcLEffZpmYjMHKuCE6KqooCbn6326yRpl/fUhFK9QKLowIzBpeaQzGeNnGLON6j1bRPtObO0QYykdsb6mMF77ZKcf/kibnAtau2APC6xmDL3LDA6h5bwMs8nrC2Yg094dFPjvmC2FIbgiomtz8bfhLYsjDSE1JMNOUIbyoNvitWX5Zavtp70FnQcv

1
test Normal file
View File

@ -0,0 +1 @@
test

View File

@ -1,93 +0,0 @@
#!/usr/bin/env python3
"""
Update X3 with information from middleware
To reprocess a file, move it to the correct folder on the FTP
"""
import shutil
import yamamotoyama
import pathlib
import pprint
THIS_DIRECTORY = pathlib.Path(__file__).parent
RECEIVED_867_DIRECTORY = THIS_DIRECTORY / "processed_867s"
ARCHIVE = RECEIVED_867_DIRECTORY / "archive"
def main():
"""
Do it!
"""
process_received_867s()
remove_completed_867s()
def remove_completed_867s():
"""
Clean up all of the files received from the FTP
"""
for file in RECEIVED_867_DIRECTORY.iterdir():
if file.name.endswith('.edi'):
shutil.move(file,ARCHIVE / file.name)
def import_received_867s(data):
with yamamotoyama.get_connection() as data_base:
with data_base.transaction():
data_base.query(
"""
EXECUTE [PROD].[insert_shandex_867_data]
:ctrlnum,
:ylicplate,
:credat,
:customer_po,
:shandex_so
""",
ctrlnum=data['ctrlnum']+'-'+data['ylicplate'],
ylicplate=data['ylicplate'],
credat=data['credat'],
customer_po=data['customer_po'],
shandex_so=data['shandex_so'],
)
def process_received_867s():
#pull out the control number on the GS line and the BOL# on PTD
for file in RECEIVED_867_DIRECTORY.iterdir():
control_number = ''
transaction_date = ''
picking_number = ''
customer_po = ''
shandex_so = ''
data_set = []
if file.name.endswith('.edi'):
with file.open(encoding="utf-8", newline="") as edi_file:
for record in edi_file.read().split("~"):
fields = record.split("*")
if fields[0] == 'GS':
control_number = fields[6]
if fields[0] == 'BPT':
transaction_date = fields[3]
# REF*PO*3L49287E
if fields[0] == 'REF' and fields[1] == 'PO':
customer_po = fields[2]
# REF*IL*O0215276
if fields[0] == 'REF' and fields[1] == 'IL':
shandex_so = fields[2]
if fields[0] == "PTD" and len(fields) > 2:
picking_number = fields[5]
data = {
"ctrlnum":control_number,
"ylicplate":picking_number,
"credat":transaction_date,
"customer_po":customer_po,
"shandex_so":shandex_so,
}
if data not in data_set:
data_set.append(data)
import_received_867s(data)
if __name__ == "__main__":
main()