import pandas as pd from openpyxl.utils import get_column_letter from openpyxl import load_workbook from datetime import datetime from dateutil.relativedelta import relativedelta import logging, os # 配置日志记录 logging.basicConfig(filename='log.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # 全局变量 ## 常量 P_LIMIT = 6 # 最大晋升次数 P_START = 10 # 晋升记录开始行 H_START = 15 + P_LIMIT # 历史记录开始行 NOWTIME = datetime.now() ## 通过函数读取 BaseData = pd.DataFrame() Promote = pd.DataFrame() Rule_Role = [] Rule_Level = [] Rule_RoleName = [] Level_Limit = pd.DataFrame() Promote_Level = pd.DataFrame() Promote_verify = pd.DataFrame() Allowance = [] ## 统计量 max_promote = 0 max_history = 0 # 工具函数 def custom_date_parser(x): try: return datetime.strptime(x, '%Y-%m-%d') except: return x def format_time(dt,info): try: return dt.strftime("%Y.%m") except: logging.warning(f"[{info}]时间格式错误:{dt}") return dt def format_time_ymd(dt,info): try: return dt.strftime("%Y.%m.%d") except: logging.warning(f"[{info}]时间格式错误:{dt}") return dt def to_int(x): try: return int(x) except: return 0 def fallback(x): for i in x: if pd.notna(i) and i != '': return i return '' def split_level(level:str): try: parts = level.split('-') return (int(parts[0]), int(parts[1])) except: raise Exception(f"职级[{level}]格式错误") def calculate_seniority(row, year): return year - row["参加工作时间"].year + row["工龄调增"] - row["工龄调减"] + 1 # 读取信息 def read_base_data(): # 读取员工数据 global BaseData BaseData = pd.read_excel("原数据.xlsx", sheet_name="入职信息") for col in ["出生年月","任职年月","原职时间","参加工作时间","入职时间", "晋档起始", "晋级起始", "日期2"]: BaseData[col] = BaseData[col].apply(custom_date_parser) for col in ["晋档起始", "晋级起始"]: BaseData[col] = BaseData[col].apply(lambda x: datetime(x.year, 1, 1) if isinstance(x, datetime) else x) BaseData["Latest_Role"] = None BaseData["Latest_Prom"] = None BaseData["工龄调增"] = BaseData["工龄调增"].apply(to_int) BaseData["工龄调减"] = BaseData["工龄调减"].apply(to_int) BaseData["学龄"] = BaseData["学龄"].apply(to_int) BaseData["工龄"] = BaseData.apply(lambda row: NOWTIME.year-row["参加工作时间"].year+row["工龄调增"]-row["工龄调减"]+1, axis=1) def read_promote(): # 读取晋升记录 global Promote Promote = pd.read_excel("原数据.xlsx", sheet_name="职务变动") for col in ["任职时间","工资执行时间"]: Promote[col] = Promote[col].apply(custom_date_parser) def read_rule_role(): # 读取职位规则 global Rule_Role col = 4 while True: try: rule = pd.read_excel("原数据.xlsx", sheet_name="职位规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None) Rule_Role.append({ "start":rule.iloc[0,1], "end":rule.iloc[1,1], "rule":pd.read_excel("原数据.xlsx", sheet_name="职位规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2, names=["role","salary"]) }) col += 2 except: break Rule_Role = sorted(Rule_Role, key=lambda x: x['start']) def read_rule_level(): # 读取职级规则 global Rule_Level col = 1 while True: # 职级规则 try: rule = pd.read_excel("原数据.xlsx", sheet_name="职级规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None) Rule_Level.append({ "start":rule.iloc[0,1], "end":rule.iloc[1,1], "rule":pd.read_excel("原数据.xlsx", sheet_name="职级规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2, names=["level","salary"]) }) col += 2 except: break Rule_Level = sorted(Rule_Level, key=lambda x: x['start']) def read_rule_role_name(): # 读取名称变化规则 global Rule_RoleName col = 1 while True: # 名称变化 try: rule = pd.read_excel("原数据.xlsx", sheet_name="名称变化",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None) Rule_RoleName.append({ "start":rule.iloc[0,1], "end":rule.iloc[1,1], "rule":pd.read_excel("原数据.xlsx", sheet_name="名称变化",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2) }) col += 2 except: break Rule_RoleName = sorted(Rule_RoleName, key=lambda x: x['start']) def read_level_limit(): # 读取职位对应的级别限制 global Level_Limit, Promote_Level Level_Limit_tmp = pd.read_excel("原数据.xlsx", sheet_name="职位规则", usecols="A:A", skiprows=2, names=["limit"]) Promote_Level_tmp = pd.read_excel("原数据.xlsx", sheet_name="职位规则", usecols="B:C", skiprows=2, names=["级别","档次"]) Level_Limit = {} Promote_Level = {} for rule in Rule_Role: for index, row in rule["rule"].iterrows(): Level_Limit[row["role"]] = Level_Limit_tmp.iloc[index]["limit"] Promote_Level[row["role"]] = (Promote_Level_tmp.iloc[index]["级别"], Promote_Level_tmp.iloc[index]["档次"]) def read_promote_verify(): # 读取晋升校验 global Promote_verify Promote_verify = pd.read_excel("原数据.xlsx", sheet_name="晋升校验", usecols="A:B") def read_allowance(): # 读取津贴 global Allowance col = 1 while True: try: rule = pd.read_excel("原数据.xlsx", sheet_name="津贴规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None) Allowance.append({ "start":rule.iloc[0,1], "end":rule.iloc[1,1], "rule":pd.read_excel("原数据.xlsx", sheet_name="津贴规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2, names=["level","salary"]) }) col += 2 except: break Allowance = sorted(Allowance, key=lambda x: x['start']) def load_people(): read_base_data() read_promote() logging.info("人员信息加载完成") def load_rule(): read_rule_role() read_rule_level() read_rule_role_name() read_level_limit() read_promote_verify() read_allowance() logging.info("规则加载完成") # 获取配置类函数 def role_salary(role:str, time): for rule in Rule_Role: if rule["start"] <= time <= rule["end"]: try: tmp = rule["rule"][rule["rule"]["role"] == role].iloc[0] return tmp["salary"] except: if role == "": logging.error("空职级") else: logging.warning(f"职位[{role}]在[{time}]时不存在工资规则") logging.warning(f"时间[{time}]时不存在职位工资规则") return 0 def level_salary(level:str, time): for rule in Rule_Level: if rule["start"] <= time <= rule["end"]: try: tmp = rule["rule"][rule["rule"]["level"] == level].iloc[0] return tmp["salary"] except: logging.warning(f"职级[{level}]在[{time}]时不存在工资规则") logging.warning(f"时间[{time}]时不存在职级工资规则") return 0 def role_limit(role:str): if role in Level_Limit.keys(): return Level_Limit[role] else: logging.warning(f"职位[{role}]不存在职级上限规则") return -1 def allowance(role:str, level:int, time): for rule in Allowance: if rule["start"] <= time <= rule["end"]: try: tmp = rule["rule"][rule["rule"]["level"] == f"{role}-{level}"].iloc[0] return tmp["salary"] except: logging.warning(f"组合[{role}-{level}]在[{time}]时不存在津贴规则") return 0 logging.warning(f"时间[{time}]时不存在津贴规则") return 0 # 填充类辅助函数 def fill_basic_info(ws, row):# 填充基本信息 ws.cell(row=2, column=1, value=f"部门:{row['部门']} 职务:{row['职务']}") ws.cell(row=3, column=2, value=row["姓名"]) ws.cell(row=3, column=4, value=row["性别"]) ws.cell(row=3, column=6, value=row["民族"]) ws.cell(row=5, column=2, value=row["现职"]) ws.cell(row=5, column=6, value=row["任职年限"]) ws.cell(row=6, column=2, value=row["原职"]) ws.cell(row=6, column=6, value=row["原职年限"]) ws.cell(row=7, column=2, value=row["学历"]) ws.cell(row=7, column=4, value=row["学龄"]) ws.cell(row=7, column=6, value=row["套改年限"]) ws.cell(row=7, column=7, value=row["职务工资"]) ws.cell(row=7, column=8, value=row["职务工资金额"]) ws.cell(row=7, column=9, value=row["级别工资"]) ws.cell(row=7, column=10, value=row["职务工资金额"]) ws.cell(row=17, column=1, value=row["个人评价结果"]) ws.cell(row=3, column=8, value=format_time(row["出生年月"],"出生年月")) ws.cell(row=3, column=10, value=format_time(row["参加工作时间"],"参加工作时间")) ws.cell(row=5, column=4, value=format_time(row["任职年月"],"任职年月")) ws.cell(row=6, column=4, value=format_time(row["原职时间"],"原职时间")) def fill_prompt_info(ws, promote):# 填充晋升信息 for index, prow in promote.iterrows(): if index > P_LIMIT-1: logging.warning(f"超过[{P_LIMIT}]条晋升信息,共[{promote.shape[0]}]条。") max_promote = max(max_promote, promote.shape[0]) break ws.cell(row=P_START+index, column=1, value=format_time(prow["任职时间"],"晋升时间")) ws.cell(row=P_START+index, column=2, value=prow["变动批注"]) ws.cell(row=P_START+index, column=3, value="任"+prow["新职务"]) def add_history(History_pd, row, promote): # 添加入职记录 History_pd.loc[len(History_pd), ["变动后时间","变动后职务","变动原因","变动后级别档次"]] = [ row["入职时间"],row["初始职务"],"套改/定级",row["入职时的初始级别"]] for index, prow in promote.iterrows(): # 添加晋升记录 History_pd.loc[len(History_pd),["变动后时间","变动后职务","变动原因","晋升备注"]] = [ prow["工资执行时间"]+relativedelta(hours=prow["任职时间"].month,minutes=prow["任职时间"].day), prow["新职务"],"晋升",f"任{prow['新职务']} {prow['变动批注'] if pd.notna(prow['变动批注']) else ''}"] try: # 添加晋档记录 calctime=row["晋档起始"] + relativedelta(minute=1) while True: calctime += relativedelta(years=row["晋档间隔"]) if calctime > NOWTIME: break History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [ calctime,"两年晋档"] calctime=row["晋级起始"] # 添加晋级记录 while True: calctime += relativedelta(years=row["晋级间隔"]) if calctime > NOWTIME: break History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [ calctime,"五年晋级"] except: raise Exception(f"晋级、档起始或间隔时间格式错误:{row['晋级起始']}-{row['晋档起始']}-{row['晋级间隔']}-{row['晋档间隔']}") # 工资调标 for rule in Rule_Level: if row["入职时间"] < rule["start"]: History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [rule["start"], "工资调标"] calctime = row["入职时间"] while True: calctime += relativedelta(years=1) if calctime > NOWTIME: break elif int(calculate_seniority(row,calctime.year)) % 5 == 0: History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = \ [calctime,"调整津贴"] History_pd["身份证号码"] = row["身份证号码"] History_pd["姓名"] = row["姓名"] History_pd["工龄"] = History_pd.apply(lambda x: calculate_seniority(row, x["变动后时间"].year), axis=1) History_pd["五年1级年份"] = History_pd.apply(lambda x: x["变动后时间"].year - row["晋级起始"].year, axis=1) History_pd["两年1档年份"] = History_pd.apply(lambda x: x["变动后时间"].year - row["晋档起始"].year, axis=1) def calc_history(History_pd, row): # 复杂数据计算 for index, hrow in History_pd.iterrows(): # 继承上一条复杂计算数据 if index > 0: History_pd.at[index, "变动前时间"] = History_pd.at[index-1, "变动后时间"] History_pd.at[index, "变动前职务"] = History_pd.at[index-1, "变动后职务"] History_pd.at[index, "变动前级别档次"] = History_pd.at[index-1, "变动后级别档次"] History_pd.at[index, "变动前职务工资"] = History_pd.at[index-1, "变动后职务工资"] History_pd.at[index, "变动前级别工资"] = History_pd.at[index-1, "变动后级别工资"] History_pd.at[index, "变动前津贴工资"] = History_pd.at[index-1, "变动后津贴工资"] History_pd.at[index, "变动前工资合计"] = History_pd.at[index-1, "变动后工资合计"] # 继承名称 if pd.isna(hrow["变动后职务"]): History_pd.at[index,"变动后职务"] = History_pd.at[index,"变动前职务"] # 名称变化 for rule in Rule_RoleName: if rule["start"] <= hrow["变动后时间"] <= rule["end"]: if History_pd.at[index,"变动后职务"] in rule["rule"]["原名称"].values: History_pd.at[index,"变动后职务"] = rule["rule"][rule["rule"]["原名称"] == History_pd.at[index,"变动后职务"]]["现名称"].values[0] # 级别档次 if index > 0: jb, dc = split_level(History_pd.at[index,"变动前级别档次"]) if hrow["变动原因"] == "两年晋档": History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}" elif hrow["变动原因"] == "五年晋级": if jb-1 < 1 or jb-1 < role_limit(History_pd.iloc[index]["变动后职务"]): History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}" else: History_pd.at[index, "变动后级别档次"] = f"{jb-1}-{dc-1}" elif hrow["变动原因"] == "工资调标": History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc}" elif hrow["变动原因"] == "晋升": role = History_pd.iloc[index]["变动后职务"] if role in Promote_Level.keys(): new_jb = jb + Promote_Level[role][0] new_dc = dc + Promote_Level[role][1] if pd.isna(new_jb) or pd.isna(new_dc): raise Exception(f"级别档次计算出现NaN值:[{new_jb}]-[{new_dc}]({role})") else: new_jb = int(new_jb) new_dc = int(new_dc) if (History_pd.at[index,"变动后职务"] in Promote_verify.iloc[:,0].values and role in Promote_verify.iloc[:,1].values): logging.info(f"[{row['身份证号码']}]命中晋升校验规则[{History_pd.at[index,'变动前职务']}]->[{role}]") History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc}" elif new_jb < role_limit(role): History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}" elif new_jb < 1 or new_dc < 1: raise Exception(f"级别档次小于0:[{new_jb}]-[{new_dc}]") else: History_pd.at[index, "变动后级别档次"] = f"{new_jb}-{new_dc}" else: logging.warning(f"职位[{role}]不存在职级上限规则") else: History_pd.at[index, "变动后级别档次"] = History_pd.at[index, "变动前级别档次"] # 计算工资 History_pd.at[index, "变动后职务工资"] = role_salary(History_pd.at[index,"变动后职务"], hrow["变动后时间"]) History_pd.at[index, "变动后级别工资"] = level_salary(History_pd.at[index,"变动后级别档次"], hrow["变动后时间"]) History_pd.at[index, "变动后工资合计"] = to_int(History_pd.at[index,"变动后职务工资"]) + to_int(History_pd.at[index,"变动后级别工资"]) History_pd.at[index, "变动后津贴工资"] = allowance(History_pd.at[index,"变动后职务"], History_pd.at[index,"工龄"], History_pd.at[index,"变动后时间"]) def fill_history_info(ws, History_pd):# 填充历史记录 for index, hrow in History_pd.iterrows(): # 打印 for col in range(1, 11): # 复制样式 ws.cell(row=H_START+index, column=col)._style = ws.cell(row=H_START, column=col)._style try: ws.cell(row=H_START+index, column=1, value=format_time(hrow["变动后时间"],"历史时间")) except: logging.warning(f"历史时间格式错误:{hrow['时间']}") ws.cell(row=H_START+index, column=2, value=hrow["变动后职务"]) ws.cell(row=H_START+index, column=3, value=hrow["变动后职务工资"]) ws.cell(row=H_START+index, column=4, value=hrow["变动后级别档次"]) ws.cell(row=H_START+index, column=5, value=hrow["变动后级别工资"]) ws.cell(row=H_START+index, column=6, value=hrow["变动后工资合计"]) ws.cell(row=H_START+index, column=7, value=hrow["变动原因"]) # ws.cell(row=H_START+index, column=8, value=index) # Debug def fill_roster(): # 填充花名册 wb = load_workbook("模板/汇总名册.xlsx") ws = wb.active for index, row in BaseData.iterrows(): # 汇总 try: logging.info(f"汇总:第[{index+1}]共[{BaseData.shape[0]}]现在是[{row['身份证号码']}]") for col in range(1,16): ws.cell(row=6+index, column=col)._style = ws.cell(row=6, column=col)._style ws.cell(row=6+index, column=1, value=index+1) ws.cell(row=6+index, column=2, value=row["姓名"]) ws.cell(row=6+index, column=3, value=row["性别"]) ws.cell(row=6+index, column=4, value=format_time(row["出生年月"], "出生年月")) ws.cell(row=6+index, column=5, value=format_time(row["参加工作时间"], "参加工作时间")) ws.cell(row=6+index, column=6, value=fallback([row["现学历"],row["学历"]])) ws.cell(row=6+index, column=7, value=row['工龄']+row["学龄"]) ws.cell(row=6+index, column=8, value=row['工龄']) ws.cell(row=6+index, column=9, value=row["学龄"]) ws.cell(row=6+index, column=10, value=row["工龄调减"]) ws.cell(row=6+index, column=11, value=row["Latest_Role"]) ws.cell(row=6+index, column=12, value=format_time(row["Latest_Prom"], "Latest_Prom")) ws.cell(row=6+index, column=13, value=row["职务2"]) ws.cell(row=6+index, column=14, value=format_time(row["日期2"], "日期2")) except Exception as e: logging.error(f"{row['身份证号码']}:{e}") wb.save("汇总名册.xlsx") # 保存汇总 def main(): load_people() load_rule() # 创建一个空的DataFrame来存储所有历史记录 all_history = pd.DataFrame(columns=[ "身份证号码", "姓名", "工龄","变动原因", "晋升备注", "变动前时间", "变动前职务", "变动前级别档次", "变动前职务工资", "变动前级别工资", "变动前津贴工资", "变动后时间", "变动后职务", "变动后级别档次", "变动后职务工资", "变动后级别工资", "变动后津贴工资", "五年1级年份", "两年1档年份"]) for index, row in BaseData.iterrows(): try: logging.info(f"台账:第[{index+1}]共[{BaseData.shape[0]}]现在是[{row['身份证号码']}]") BaseData.at[index, "Latest_Role"] = row["初始职务"] BaseData.at[index, "Latest_Prom"] = row["入职时间"] wb = load_workbook("模板/个人台账.xlsx") ws = wb.active fill_basic_info(ws, row)# 填充基本信息 # 查找晋升信息 promote = Promote[Promote["身份证号"] == row["身份证号码"]] if not promote.empty: promote = promote.sort_values(by=["工资执行时间", "任职时间"], ascending=[False, False]).reset_index(drop=True) BaseData.at[index, "Latest_Role"] = promote.iloc[0]["新职务"] BaseData.at[index, "Latest_Prom"] = promote.iloc[0]["任职时间"] # 把原职务取出来 if promote.shape[0] > 1: BaseData.at[index, "职务2"] = promote.iloc[1]["新职务"] BaseData.at[index, "日期2"] = promote.iloc[1]["任职时间"] else: BaseData.at[index, "职务2"] = row["初始职务"] BaseData.at[index, "日期2"] = row["入职时间"] promote = promote.sort_values(by=["工资执行时间", "任职时间"]).reset_index(drop=True) fill_prompt_info(ws, promote)# 填充晋升信息 # 根据规则匹配职级薪资 History_pd = pd.DataFrame(columns=[ "身份证号码", "姓名", # 统一填入 "变动后时间", "变动后职务", "变动原因", "晋升备注", # 直接填入 "工龄", "五年1级年份", "两年1档年份", # 简单计算更新 "变动前时间", "变动前职务", "变动前级别档次", "变动前职务工资", "变动前级别工资", "变动前津贴工资", "变动前工资合计", # 排序后更新 "变动后级别档次", "变动后职务工资", "变动后级别工资", "变动后津贴工资", "变动后工资合计"]) # 复杂计算更新 add_history(History_pd, row, promote) History_pd = History_pd.sort_values(by="变动后时间").reset_index(drop=True) if History_pd.at[0,"变动后时间"] != row["入职时间"]: raise Exception(f"入职时间晚于其他时间:{row['入职时间']} < {History_pd.at[0,'变动后时间']} ({History_pd.at[0,'变动原因']})") calc_history(History_pd, row) fill_history_info(ws, History_pd)# 填充历史记录 # 将当前人员的历史记录添加到总表中 all_history = pd.concat([all_history, History_pd], ignore_index=True) wb.save(f"个人台账/{row['身份证号码']}_{row['姓名']}.xlsx") except Exception as e: logging.error(f"{row['身份证号码']}:{e}") # 保存所有历史记录到Excel文件 all_history["变动后时间"] = all_history["变动后时间"].apply(lambda x: format_time_ymd(x, "历史记录时间")) all_history["变动前时间"] = all_history["变动前时间"].apply(lambda x: format_time_ymd(x, "历史记录时间")) all_history.to_excel("所有人员历史记录.xlsx", index=False) logging.info("所有人员历史记录已保存到'所有人员历史记录.xlsx'") fill_roster() if max_promote > 0: logging.warning(f"最多有[{max_promote}]条晋升信息,需要调整模板。记得同时调整薪资历史的起始行和个人评价结果。") if max_history > 0: logging.warning(f"最多有[{max_history}]条薪资历史,需要调整模板。") if __name__ == "__main__": main()