import pandas as pd from openpyxl.utils import get_column_letter from openpyxl import load_workbook from datetime import datetime from dateutil.relativedelta import relativedelta import logging from datetime import datetime # 配置日志记录 logging.basicConfig(filename='log.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') P_LIMIT = 6 # 最大晋升次数 P_START = 10 # 晋升记录开始行 H_START = 15 + P_LIMIT # 历史记录开始行 # 自定义日期解析函数 def custom_date_parser(x): try: return datetime.strptime(x, '%Y-%m-%d') except: return x BaseData = pd.read_excel("原数据.xlsx", sheet_name="入职信息") Promote = pd.read_excel("原数据.xlsx", sheet_name="职务变动") # Level_Limit = pd.read_excel("原数据.xlsx", sheet_name="职位规则",usecols="A:B", skiprows=2, names=["limit","role"]) BaseData = BaseData.drop(0) BaseData = BaseData.reset_index(drop=True) Promote = Promote.drop(0) Promote = Promote.reset_index(drop=True) for index, row in BaseData.iterrows(): for col in ["出生年月","任职年月","原职时间","参加工作时间","入职时间", "二档起始", "五档起始", "日期2"]: BaseData.at[index, col] = custom_date_parser(row[col]) for index, row in Promote.iterrows(): for col in ["任职时间","工资执行时间"]: Promote.at[index, col] = custom_date_parser(row[col]) logging.info("人员信息加载完成") Rule_Role = [] col = 2 while True: # 职位规则 try: rule = pd.read_excel("原数据.xlsx", sheet_name="职位规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None) Rule_Role.append({ "start":rule.iloc[0,1], "end":rule.iloc[1,1], "rule":pd.read_excel("原数据.xlsx", sheet_name="职位规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2, names=["role","salary"]) }) col += 2 except: break Rule_Level = [] col = 1 while True: # 职级规则 try: rule = pd.read_excel("原数据.xlsx", sheet_name="职级规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None) Rule_Level.append({ "start":rule.iloc[0,1], "end":rule.iloc[1,1], "rule":pd.read_excel("原数据.xlsx", sheet_name="职级规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2, names=["level","salary"]) }) col += 2 except: break Rule_RoleName = [] col = 1 while True: # 名称变化 try: rule = pd.read_excel("原数据.xlsx", sheet_name="名称变化",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None) Rule_RoleName.append({ "start":rule.iloc[0,1], "end":rule.iloc[1,1], "rule":pd.read_excel("原数据.xlsx", sheet_name="名称变化",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2) }) col += 2 except: break logging.info("规则加载完成") Rule_Role = sorted(Rule_Role, key=lambda x: x['start']) Rule_Level = sorted(Rule_Level, key=lambda x: x['start']) Rule_RoleName = sorted(Rule_RoleName, key=lambda x: x['start']) nowtime = datetime.now() def split_level(level:str): try: parts = level.split('-') return (int(parts[0]), int(parts[1])) except: logging.warning(f"职级[{level}]格式错误") return (0, 0) def role_salary(role:str, time): for rule in Rule_Role: if rule["start"] <= time <= rule["end"]: try: tmp = rule["rule"][rule["rule"]["role"] == role].iloc[0] return tmp["salary"] except: if role == "": logging.error("空职级") else: logging.warning(f"职位[{role}]在[{time}]时不存在工资规则") return 0 def level_salary(level:str, time): for rule in Rule_Level: if rule["start"] <= time <= rule["end"]: try: tmp = rule["rule"][rule["rule"]["level"] == level].iloc[0] return tmp["salary"] except: logging.warning(f"职级[{level}]在[{time}]时不存在工资规则") return 0 def role_limit(role:str): try: tmp = Level_Limit[Level_Limit["role"] == role].iloc[0] return tmp["limit"] except: logging.warning(f"职位[{role}]不存在职级上限规则") return -1 def format_time(dt): try: return dt.strftime("%Y.%m") except: return dt def to_int(x): try: return int(x) except: return 0 BaseData["工龄调增"] = BaseData["工龄调增"].apply(to_int) BaseData["工龄调减"] = BaseData["工龄调减"].apply(to_int) max_promote = 0 max_history = 0 def fill_basic_info(ws, row):# 填充基本信息 ws.cell(row=2, column=1, value=f"部门:{row['部门']} 职务:{row['职务']}") ws.cell(row=3, column=2, value=row["姓名"]) ws.cell(row=3, column=4, value=row["性别"]) ws.cell(row=3, column=6, value=row["民族"]) ws.cell(row=5, column=2, value=row["现职"]) ws.cell(row=5, column=6, value=row["任职年限"]) ws.cell(row=6, column=2, value=row["原职"]) ws.cell(row=6, column=6, value=row["原职年限"]) ws.cell(row=7, column=2, value=row["学历"]) ws.cell(row=7, column=4, value=row["学龄"]) ws.cell(row=7, column=6, value=row["套改年限"]) ws.cell(row=7, column=7, value=row["职务工资"]) ws.cell(row=7, column=8, value=row["职务工资金额"]) ws.cell(row=7, column=9, value=row["级别工资"]) ws.cell(row=7, column=10, value=row["职务工资金额"]) ws.cell(row=17, column=1, value=row["个人评价结果"]) ws.cell(row=3, column=8, value=format_time(row["出生年月"])) ws.cell(row=3, column=10, value=format_time(row["参加工作时间"])) ws.cell(row=5, column=4, value=format_time(row["任职年月"])) ws.cell(row=6, column=4, value=format_time(row["原职时间"])) def fill_prompt_info(ws, promote):# 填充晋升信息 for index, prow in promote.iterrows(): if index > P_LIMIT-1: logging.warning(f"超过[{P_LIMIT}]条晋升信息,共[{promote.shape[0]}]条。") max_promote = max(max_promote, promote.shape[0]) break try: ws.cell(row=P_START+index, column=1, value=format_time(prow["任职时间"])) except: logging.warning(f"晋升时间格式错误:{prow['任职时间']}") ws.cell(row=P_START+index, column=2, value=prow["变动批注"]) ws.cell(row=P_START+index, column=3, value="任"+prow["新职务"]) def fill_history_info(ws, History_pd):# 填充历史记录 for index, hrow in History_pd.iterrows(): # 打印 for col in range(1, 11): # 复制样式 ws.cell(row=H_START+index, column=col)._style = ws.cell(row=H_START, column=col)._style try: ws.cell(row=H_START+index, column=1, value=hrow["时间"].strftime("%Y.%m")) except: logging.warning(f"历史时间格式错误:{hrow['时间']}") ws.cell(row=H_START+index, column=2, value=hrow["职务"]) ws.cell(row=H_START+index, column=3, value=hrow["工资额1"]) ws.cell(row=H_START+index, column=4, value=hrow["级别档次"]) ws.cell(row=H_START+index, column=5, value=hrow["工资额2"]) ws.cell(row=H_START+index, column=6, value=hrow["工资合计"]) ws.cell(row=H_START+index, column=7, value=hrow["变动原因"]) # ws.cell(row=H_START+index, column=8, value=index) # Debug BaseData["Latest_Role"] = None BaseData["Latest_Prom"] = None for index, row in BaseData.iterrows(): try: logging.info(f"台账:第[{index+1}]共[{BaseData.shape[0]}]现在是[{row['身份证号码']}]") BaseData.at[index, "Latest_Role"] = row["初始职务"] BaseData.at[index, "Latest_Prom"] = row["入职时间"] wb = load_workbook("模板/个人台账.xlsx") ws = wb.active fill_basic_info(ws, row)# 填充基本信息 # 查找晋升信息 promote = Promote[Promote["身份证号"] == row["身份证号码"]] if not promote.empty: promote = promote.sort_values(by="任职时间", ascending=False).reset_index(drop=True) BaseData.at[index, "Latest_Role"] = promote.iloc[0]["新职务"] BaseData.at[index, "Latest_Prom"] = promote.iloc[0]["任职时间"] if promote.shape[0] > 1: BaseData.at[index, "职务2"] = promote.iloc[1]["新职务"] BaseData.at[index, "日期2"] = promote.iloc[1]["任职时间"] promote = promote.sort_values(by="任职时间").reset_index(drop=True) fill_prompt_info(ws, promote)# 填充晋升信息 # 根据规则匹配职级薪资 History_pd = pd.DataFrame(columns=["时间", "职务", "工资额1", "级别档次", "工资额2", "工资合计", "变动原因"]) # 添加入职记录 History_pd.loc[len(History_pd)] = [row["入职时间"], row["初始职务"], "", row["入职时的初始级别"], "", "", "套改/定级"] for index, prow in promote.iterrows(): # 添加晋升记录 History_pd.loc[len(History_pd)] = [prow["工资执行时间"], prow["新职务"], "", "", "", "", "晋升"] try: calctime=row["二档起始"] + relativedelta(minute=1) while True: # 添加二级记录 calctime += relativedelta(years=2) if calctime > nowtime: break History_pd.loc[len(History_pd)] = [calctime, "", "", "", "", "", "两年晋档"] calctime=row["五档起始"] while True: # 添加五级记录 calctime += relativedelta(years=5) if calctime > nowtime: break History_pd.loc[len(History_pd)] = [calctime, "", "", "", "", "", "五年晋级"] except: raise Exception(f"二、五档起始时间格式错误:{row['二档起始']}或{row['五档起始']}") for rule in Rule_Level: # 工资调标 if row["入职时间"] < rule["start"]: History_pd.loc[len(History_pd)] = [rule["start"], "", "", "", "", "", "工资调标"] History_pd = History_pd.sort_values(by="时间").reset_index(drop=True) if History_pd.at[0,"时间"] != row["入职时间"]: raise Exception(f"入职时间晚于其他计算的时间:{row['入职时间']} < {History_pd.at[0,'时间']} ({History_pd.at[0,'变动原因']})") for index, hrow in History_pd.iterrows(): # 数据计算 # 调整职务职级 if index > 0 and hrow["职务"] == "": History_pd.at[index, "职务"] = History_pd.iloc[index-1]["职务"] for rule in Rule_RoleName: # 名称变化 if rule["start"] <= hrow["时间"] <= rule["end"]: if History_pd.iloc[index]["职务"] in rule["rule"]["原名称"].values: History_pd.at[index, "职务"] = rule["rule"][rule["rule"]["原名称"] == History_pd.iloc[index]["职务"]]["现名称"].values[0] if index > 0 and hrow["级别档次"] == "": jb, dc = split_level(History_pd.iloc[index-1]["级别档次"]) if hrow["变动原因"] == "两年晋档": History_pd.at[index, "级别档次"] = f"{jb}-{dc+1}" elif hrow["变动原因"] == "五年晋级": History_pd.at[index, "级别档次"] = f"{jb-1}-{dc-1}" elif hrow["变动原因"] == "工资调标": History_pd.at[index, "级别档次"] = f"{jb}-{dc}" elif hrow["变动原因"] == "晋升": if role_limit(History_pd.iloc[index]["职务"]) == 99: # 99tag History_pd.at[index, "级别档次"] = f"{jb}-{dc}" elif jb == role_limit(History_pd.iloc[index]["职务"]): History_pd.at[index, "级别档次"] = f"{jb}-{dc+1}" else: History_pd.at[index, "级别档次"] = f"{jb-1}-{dc-1}" # 计算工资 History_pd.at[index, "工资额1"] = role_salary(History_pd.iloc[index]["职务"], hrow["时间"]) History_pd.at[index, "工资额2"] = level_salary(History_pd.iloc[index]["级别档次"], hrow["时间"]) History_pd.at[index, "工资合计"] = to_int(History_pd.iloc[index]["工资额1"]) + to_int(History_pd.iloc[index]["工资额2"]) fill_history_info(ws, History_pd)# 填充历史记录 wb.save(f"个人台账/{row['身份证号码']}_{row['姓名']}.xlsx") except Exception as e: logging.error(f"{row['身份证号码']}:{e}") wb = load_workbook("模板/汇总名册.xlsx") ws = wb.active for index, row in BaseData.iterrows(): # 汇总 try: logging.info(f"汇总:第[{index+1}]共[{BaseData.shape[0]}]现在是[{row['身份证号码']}]") for col in range(1,16): ws.cell(row=6+index, column=col)._style = ws.cell(row=6, column=col)._style ws.cell(row=6+index, column=1, value=index+1) ws.cell(row=6+index, column=2, value=row["姓名"]) ws.cell(row=6+index, column=3, value=row["性别"]) ws.cell(row=6+index, column=4, value=format_time(row["出生年月"])) ws.cell(row=6+index, column=5, value=format_time(row["参加工作时间"])) ws.cell(row=6+index, column=6, value=row["学历"]) ws.cell(row=6+index, column=7, value=relativedelta(nowtime, row["入职时间"]).years+row["工龄调增"]-row["工龄调减"]+1) ws.cell(row=6+index, column=8, value=relativedelta(nowtime, row["入职时间"]).years) ws.cell(row=6+index, column=9, value=row["工龄调增"]) ws.cell(row=6+index, column=10, value=row["工龄调减"]) ws.cell(row=6+index, column=11, value=row["Latest_Role"]) ws.cell(row=6+index, column=12, value=format_time(row["Latest_Prom"])) ws.cell(row=6+index, column=13, value=row["职务2"]) ws.cell(row=6+index, column=14, value=format_time(row["日期2"])) except Exception as e: logging.error(f"{row['身份证号码']}:{e}") wb.save("汇总名册.xlsx") # 保存汇总 if max_promote > 0: logging.warning(f"最多有[{max_promote}]条晋升信息,需要调整模板。记得同时调整薪资历史的起始行和个人评价结果。") if max_history > 0: logging.warning(f"最多有[{max_history}]条薪资历史,需要调整模板。")