YansTool/main.py
mxr612 ec3782fbdd feat: 增加工龄调整津贴计算逻辑
- 在add_history函数中新增工龄调整津贴的计算逻辑,基于参加工作时间每五年进行津贴调整
- 优化历史记录的字段更新,确保包含最新的变动后时间和变动原因
2025-06-14 06:40:39 +08:00

502 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
from openpyxl.utils import get_column_letter
from openpyxl import load_workbook
from datetime import datetime
from dateutil.relativedelta import relativedelta
import logging, os
# 配置日志记录
logging.basicConfig(filename='log.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 全局变量
## 常量
P_LIMIT = 6 # 最大晋升次数
P_START = 10 # 晋升记录开始行
H_START = 15 + P_LIMIT # 历史记录开始行
NOWTIME = datetime.now()
## 通过函数读取
BaseData = pd.DataFrame()
Promote = pd.DataFrame()
Rule_Role = []
Rule_Level = []
Rule_RoleName = []
Level_Limit = pd.DataFrame()
Promote_Level = pd.DataFrame()
Promote_verify = pd.DataFrame()
Allowance = []
## 统计量
max_promote = 0
max_history = 0
# 工具函数
def custom_date_parser(x):
try:
return datetime.strptime(x, '%Y-%m-%d')
except:
return x
def format_time(dt,info):
try:
return dt.strftime("%Y.%m")
except:
logging.warning(f"[{info}]时间格式错误:{dt}")
return dt
def format_time_ymd(dt,info):
try:
return dt.strftime("%Y.%m.%d")
except:
logging.warning(f"[{info}]时间格式错误:{dt}")
return dt
def to_int(x):
try:
return int(x)
except:
return 0
def fallback(x):
for i in x:
if pd.notna(i) and i != '':
return i
return ''
def split_level(level:str):
try:
parts = level.split('-')
return (int(parts[0]), int(parts[1]))
except:
raise Exception(f"职级[{level}]格式错误")
def calculate_seniority(row, year):
return year - row["参加工作时间"].year + row["工龄调增"] - row["工龄调减"] + 1
# 读取信息
def read_base_data(): # 读取员工数据
global BaseData
BaseData = pd.read_excel("原数据.xlsx", sheet_name="入职信息")
for col in ["出生年月","任职年月","原职时间","参加工作时间","入职时间", "晋档起始", "晋级起始", "日期2"]:
BaseData[col] = BaseData[col].apply(custom_date_parser)
for col in ["晋档起始", "晋级起始"]:
BaseData[col] = BaseData[col].apply(lambda x: datetime(x.year, 1, 1) if isinstance(x, datetime) else x)
BaseData["Latest_Role"] = None
BaseData["Latest_Prom"] = None
BaseData["工龄调增"] = BaseData["工龄调增"].apply(to_int)
BaseData["工龄调减"] = BaseData["工龄调减"].apply(to_int)
BaseData["学龄"] = BaseData["学龄"].apply(to_int)
BaseData["工龄"] = BaseData.apply(lambda row: NOWTIME.year-row["参加工作时间"].year+row["工龄调增"]-row["工龄调减"]+1, axis=1)
def read_promote(): # 读取晋升记录
global Promote
Promote = pd.read_excel("原数据.xlsx", sheet_name="职务变动")
for col in ["任职时间","工资执行时间"]:
Promote[col] = Promote[col].apply(custom_date_parser)
def read_rule_role(): # 读取职位规则
global Rule_Role
col = 4
while True:
try:
rule = pd.read_excel("原数据.xlsx", sheet_name="职位规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
Rule_Role.append({
"start":rule.iloc[0,1],
"end":rule.iloc[1,1],
"rule":pd.read_excel("原数据.xlsx", sheet_name="职位规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2, names=["role","salary"])
})
col += 2
except:
break
Rule_Role = sorted(Rule_Role, key=lambda x: x['start'])
def read_rule_level(): # 读取职级规则
global Rule_Level
col = 1
while True: # 职级规则
try:
rule = pd.read_excel("原数据.xlsx", sheet_name="职级规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
Rule_Level.append({
"start":rule.iloc[0,1],
"end":rule.iloc[1,1],
"rule":pd.read_excel("原数据.xlsx", sheet_name="职级规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2, names=["level","salary"])
})
col += 2
except:
break
Rule_Level = sorted(Rule_Level, key=lambda x: x['start'])
def read_rule_role_name(): # 读取名称变化规则
global Rule_RoleName
col = 1
while True: # 名称变化
try:
rule = pd.read_excel("原数据.xlsx", sheet_name="名称变化",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
Rule_RoleName.append({
"start":rule.iloc[0,1],
"end":rule.iloc[1,1],
"rule":pd.read_excel("原数据.xlsx", sheet_name="名称变化",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2)
})
col += 2
except:
break
Rule_RoleName = sorted(Rule_RoleName, key=lambda x: x['start'])
def read_level_limit(): # 读取职位对应的级别限制
global Level_Limit, Promote_Level
Level_Limit_tmp = pd.read_excel("原数据.xlsx", sheet_name="职位规则", usecols="A:A", skiprows=2, names=["limit"])
Promote_Level_tmp = pd.read_excel("原数据.xlsx", sheet_name="职位规则", usecols="B:C", skiprows=2, names=["级别","档次"])
Level_Limit = {}
Promote_Level = {}
for rule in Rule_Role:
for index, row in rule["rule"].iterrows():
Level_Limit[row["role"]] = Level_Limit_tmp.iloc[index]["limit"]
Promote_Level[row["role"]] = (Promote_Level_tmp.iloc[index]["级别"], Promote_Level_tmp.iloc[index]["档次"])
def read_promote_verify(): # 读取晋升校验
global Promote_verify
Promote_verify = pd.read_excel("原数据.xlsx", sheet_name="晋升校验", usecols="A:B")
def read_allowance(): # 读取津贴
global Allowance
col = 1
while True:
try:
rule = pd.read_excel("原数据.xlsx", sheet_name="津贴规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
Allowance.append({
"start":rule.iloc[0,1],
"end":rule.iloc[1,1],
"rule":pd.read_excel("原数据.xlsx", sheet_name="津贴规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2, names=["level","salary"])
})
col += 2
except:
break
Allowance = sorted(Allowance, key=lambda x: x['start'])
def load_people():
read_base_data()
read_promote()
logging.info("人员信息加载完成")
def load_rule():
read_rule_role()
read_rule_level()
read_rule_role_name()
read_level_limit()
read_promote_verify()
read_allowance()
logging.info("规则加载完成")
# 获取配置类函数
def role_salary(role:str, time):
for rule in Rule_Role:
if rule["start"] <= time <= rule["end"]:
try:
tmp = rule["rule"][rule["rule"]["role"] == role].iloc[0]
return tmp["salary"]
except:
if role == "":
logging.error("空职级")
else:
logging.warning(f"职位[{role}]在[{time}]时不存在工资规则")
logging.warning(f"时间[{time}]时不存在职位工资规则")
return 0
def level_salary(level:str, time):
for rule in Rule_Level:
if rule["start"] <= time <= rule["end"]:
try:
tmp = rule["rule"][rule["rule"]["level"] == level].iloc[0]
return tmp["salary"]
except:
logging.warning(f"职级[{level}]在[{time}]时不存在工资规则")
logging.warning(f"时间[{time}]时不存在职级工资规则")
return 0
def role_limit(role:str):
if role in Level_Limit.keys():
return Level_Limit[role]
else:
logging.warning(f"职位[{role}]不存在职级上限规则")
return -1
def allowance(role:str, level:int, time):
for rule in Allowance:
if rule["start"] <= time <= rule["end"]:
try:
tmp = rule["rule"][rule["rule"]["level"] == f"{role}-{level}"].iloc[0]
return tmp["salary"]
except:
logging.warning(f"组合[{role}-{level}]在[{time}]时不存在津贴规则")
return 0
logging.warning(f"时间[{time}]时不存在津贴规则")
return 0
# 填充类辅助函数
def fill_basic_info(ws, row):# 填充基本信息
ws.cell(row=2, column=1, value=f"部门:{row['部门']} 职务:{row['职务']}")
ws.cell(row=3, column=2, value=row["姓名"])
ws.cell(row=3, column=4, value=row["性别"])
ws.cell(row=3, column=6, value=row["民族"])
ws.cell(row=5, column=2, value=row["现职"])
ws.cell(row=5, column=6, value=row["任职年限"])
ws.cell(row=6, column=2, value=row["原职"])
ws.cell(row=6, column=6, value=row["原职年限"])
ws.cell(row=7, column=2, value=row["学历"])
ws.cell(row=7, column=4, value=row["学龄"])
ws.cell(row=7, column=6, value=row["套改年限"])
ws.cell(row=7, column=7, value=row["职务工资"])
ws.cell(row=7, column=8, value=row["职务工资金额"])
ws.cell(row=7, column=9, value=row["级别工资"])
ws.cell(row=7, column=10, value=row["职务工资金额"])
ws.cell(row=17, column=1, value=row["个人评价结果"])
ws.cell(row=3, column=8, value=format_time(row["出生年月"],"出生年月"))
ws.cell(row=3, column=10, value=format_time(row["参加工作时间"],"参加工作时间"))
ws.cell(row=5, column=4, value=format_time(row["任职年月"],"任职年月"))
ws.cell(row=6, column=4, value=format_time(row["原职时间"],"原职时间"))
def fill_prompt_info(ws, promote):# 填充晋升信息
for index, prow in promote.iterrows():
if index > P_LIMIT-1:
logging.warning(f"超过[{P_LIMIT}]条晋升信息,共[{promote.shape[0]}]条。")
max_promote = max(max_promote, promote.shape[0])
break
ws.cell(row=P_START+index, column=1, value=format_time(prow["任职时间"],"晋升时间"))
ws.cell(row=P_START+index, column=2, value=prow["变动批注"])
ws.cell(row=P_START+index, column=3, value=""+prow["新职务"])
def add_history(History_pd, row, promote):
# 添加入职记录
History_pd.loc[len(History_pd), ["变动后时间","变动后职务","变动原因","变动后级别档次"]] = [
row["入职时间"],row["初始职务"],"套改/定级",row["入职时的初始级别"]]
for index, prow in promote.iterrows(): # 添加晋升记录
History_pd.loc[len(History_pd),["变动后时间","变动后职务","变动原因","晋升备注"]] = [
prow["工资执行时间"]+relativedelta(hours=prow["任职时间"].month,minutes=prow["任职时间"].day),
prow["新职务"],"晋升",f"{prow['新职务']} {prow['变动批注'] if pd.notna(prow['变动批注']) else ''}"]
try:
# 添加晋档记录
calctime=row["晋档起始"] + relativedelta(minute=1)
while True:
calctime += relativedelta(years=row["晋档间隔"])
if calctime > NOWTIME:
break
History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [
calctime,"两年晋档"]
calctime=row["晋级起始"]
# 添加晋级记录
while True:
calctime += relativedelta(years=row["晋级间隔"])
if calctime > NOWTIME:
break
History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [
calctime,"五年晋级"]
except:
raise Exception(f"晋级、档起始或间隔时间格式错误:{row['晋级起始']}-{row['晋档起始']}-{row['晋级间隔']}-{row['晋档间隔']}")
# 工资调标
for rule in Rule_Level:
if row["入职时间"] < rule["start"]:
History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [rule["start"], "工资调标"]
calctime = row["参加工作时间"]
while True:
calctime += relativedelta(years=1)
if calctime > NOWTIME:
break
elif int(calculate_seniority(row,calctime.year)) % 5 == 0:
History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = \
[calctime,"调整津贴"]
History_pd["身份证号码"] = row["身份证号码"]
History_pd["姓名"] = row["姓名"]
History_pd["工龄"] = History_pd.apply(lambda x: calculate_seniority(row, x["变动后时间"].year), axis=1)
History_pd["五年1级年份"] = History_pd.apply(lambda x: x["变动后时间"].year - row["晋级起始"].year, axis=1)
History_pd["两年1档年份"] = History_pd.apply(lambda x: x["变动后时间"].year - row["晋档起始"].year, axis=1)
def calc_history(History_pd, row):
# 复杂数据计算
for index, hrow in History_pd.iterrows():
# 继承上一条复杂计算数据
if index > 0:
History_pd.at[index, "变动前时间"] = History_pd.at[index-1, "变动后时间"]
History_pd.at[index, "变动前职务"] = History_pd.at[index-1, "变动后职务"]
History_pd.at[index, "变动前级别档次"] = History_pd.at[index-1, "变动后级别档次"]
History_pd.at[index, "变动前职务工资"] = History_pd.at[index-1, "变动后职务工资"]
History_pd.at[index, "变动前级别工资"] = History_pd.at[index-1, "变动后级别工资"]
History_pd.at[index, "变动前津贴工资"] = History_pd.at[index-1, "变动后津贴工资"]
History_pd.at[index, "变动前工资合计"] = History_pd.at[index-1, "变动后工资合计"]
# 继承名称
if pd.isna(hrow["变动后职务"]):
History_pd.at[index,"变动后职务"] = History_pd.at[index,"变动前职务"]
# 名称变化
for rule in Rule_RoleName:
if rule["start"] <= hrow["变动后时间"] <= rule["end"]:
if History_pd.at[index,"变动后职务"] in rule["rule"]["原名称"].values:
History_pd.at[index,"变动后职务"] = rule["rule"][rule["rule"]["原名称"] == History_pd.at[index,"变动后职务"]]["现名称"].values[0]
# 级别档次
if index > 0:
jb, dc = split_level(History_pd.at[index,"变动前级别档次"])
if hrow["变动原因"] == "两年晋档":
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}"
elif hrow["变动原因"] == "五年晋级":
if jb-1 < 1 or jb-1 < role_limit(History_pd.iloc[index]["变动后职务"]):
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}"
else:
History_pd.at[index, "变动后级别档次"] = f"{jb-1}-{dc-1}"
elif hrow["变动原因"] == "工资调标":
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc}"
elif hrow["变动原因"] == "晋升":
role = History_pd.iloc[index]["变动后职务"]
if role in Promote_Level.keys():
new_jb = jb + Promote_Level[role][0]
new_dc = dc + Promote_Level[role][1]
if pd.isna(new_jb) or pd.isna(new_dc):
raise Exception(f"级别档次计算出现NaN值[{new_jb}]-[{new_dc}]({role})")
else:
new_jb = int(new_jb)
new_dc = int(new_dc)
if (History_pd.at[index,"变动后职务"] in Promote_verify.iloc[:,0].values and
role in Promote_verify.iloc[:,1].values):
logging.info(f"[{row['身份证号码']}]命中晋升校验规则[{History_pd.at[index,'变动前职务']}]->[{role}]")
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc}"
elif new_jb < role_limit(role):
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}"
elif new_jb < 1 or new_dc < 1:
raise Exception(f"级别档次小于0[{new_jb}]-[{new_dc}]")
else:
History_pd.at[index, "变动后级别档次"] = f"{new_jb}-{new_dc}"
else:
logging.warning(f"职位[{role}]不存在职级上限规则")
else:
History_pd.at[index, "变动后级别档次"] = History_pd.at[index, "变动前级别档次"]
# 计算工资
History_pd.at[index, "变动后职务工资"] = role_salary(History_pd.at[index,"变动后职务"], hrow["变动后时间"])
History_pd.at[index, "变动后级别工资"] = level_salary(History_pd.at[index,"变动后级别档次"], hrow["变动后时间"])
History_pd.at[index, "变动后工资合计"] = to_int(History_pd.at[index,"变动后职务工资"]) + to_int(History_pd.at[index,"变动后级别工资"])
History_pd.at[index, "变动后津贴工资"] = allowance(History_pd.at[index,"变动后职务"], History_pd.at[index,"工龄"], History_pd.at[index,"变动后时间"])
def fill_history_info(ws, History_pd):# 填充历史记录
for index, hrow in History_pd.iterrows(): # 打印
for col in range(1, 11): # 复制样式
ws.cell(row=H_START+index, column=col)._style = ws.cell(row=H_START, column=col)._style
try:
ws.cell(row=H_START+index, column=1, value=format_time(hrow["变动后时间"],"历史时间"))
except:
logging.warning(f"历史时间格式错误:{hrow['时间']}")
ws.cell(row=H_START+index, column=2, value=hrow["变动后职务"])
ws.cell(row=H_START+index, column=3, value=hrow["变动后职务工资"])
ws.cell(row=H_START+index, column=4, value=hrow["变动后级别档次"])
ws.cell(row=H_START+index, column=5, value=hrow["变动后级别工资"])
ws.cell(row=H_START+index, column=6, value=hrow["变动后工资合计"])
ws.cell(row=H_START+index, column=7, value=hrow["变动原因"])
# ws.cell(row=H_START+index, column=8, value=index) # Debug
def fill_roster(): # 填充花名册
wb = load_workbook("模板/汇总名册.xlsx")
ws = wb.active
for index, row in BaseData.iterrows(): # 汇总
try:
logging.info(f"汇总:第[{index+1}]共[{BaseData.shape[0]}]现在是[{row['身份证号码']}]")
for col in range(1,16):
ws.cell(row=6+index, column=col)._style = ws.cell(row=6, column=col)._style
ws.cell(row=6+index, column=1, value=index+1)
ws.cell(row=6+index, column=2, value=row["姓名"])
ws.cell(row=6+index, column=3, value=row["性别"])
ws.cell(row=6+index, column=4, value=format_time(row["出生年月"], "出生年月"))
ws.cell(row=6+index, column=5, value=format_time(row["参加工作时间"], "参加工作时间"))
ws.cell(row=6+index, column=6, value=fallback([row["现学历"],row["学历"]]))
ws.cell(row=6+index, column=7, value=row['工龄']+row["学龄"])
ws.cell(row=6+index, column=8, value=row['工龄'])
ws.cell(row=6+index, column=9, value=row["学龄"])
ws.cell(row=6+index, column=10, value=row["工龄调减"])
ws.cell(row=6+index, column=11, value=row["Latest_Role"])
ws.cell(row=6+index, column=12, value=format_time(row["Latest_Prom"], "Latest_Prom"))
ws.cell(row=6+index, column=13, value=row["职务2"])
ws.cell(row=6+index, column=14, value=format_time(row["日期2"], "日期2"))
except Exception as e:
logging.error(f"{row['身份证号码']}:{e}")
wb.save("汇总名册.xlsx") # 保存汇总
def main():
load_people()
load_rule()
# 创建一个空的DataFrame来存储所有历史记录
all_history = pd.DataFrame(columns=[
"身份证号码", "姓名", "工龄","变动原因", "晋升备注",
"变动前时间", "变动前职务", "变动前级别档次", "变动前职务工资", "变动前级别工资", "变动前津贴工资",
"变动后时间", "变动后职务", "变动后级别档次", "变动后职务工资", "变动后级别工资", "变动后津贴工资",
"五年1级年份", "两年1档年份"])
for index, row in BaseData.iterrows():
try:
logging.info(f"台账:第[{index+1}]共[{BaseData.shape[0]}]现在是[{row['身份证号码']}]")
BaseData.at[index, "Latest_Role"] = row["初始职务"]
BaseData.at[index, "Latest_Prom"] = row["入职时间"]
wb = load_workbook("模板/个人台账.xlsx")
ws = wb.active
fill_basic_info(ws, row)# 填充基本信息
# 查找晋升信息
promote = Promote[Promote["身份证号"] == row["身份证号码"]]
if not promote.empty:
promote = promote.sort_values(by=["工资执行时间", "任职时间"], ascending=[False, False]).reset_index(drop=True)
BaseData.at[index, "Latest_Role"] = promote.iloc[0]["新职务"]
BaseData.at[index, "Latest_Prom"] = promote.iloc[0]["任职时间"]
# 把原职务取出来
if promote.shape[0] > 1:
BaseData.at[index, "职务2"] = promote.iloc[1]["新职务"]
BaseData.at[index, "日期2"] = promote.iloc[1]["任职时间"]
else:
BaseData.at[index, "职务2"] = row["初始职务"]
BaseData.at[index, "日期2"] = row["入职时间"]
promote = promote.sort_values(by=["工资执行时间", "任职时间"]).reset_index(drop=True)
fill_prompt_info(ws, promote)# 填充晋升信息
# 根据规则匹配职级薪资
History_pd = pd.DataFrame(columns=[
"身份证号码", "姓名", # 统一填入
"变动后时间", "变动后职务", "变动原因", "晋升备注", # 直接填入
"工龄", "五年1级年份", "两年1档年份", # 简单计算更新
"变动前时间", "变动前职务", "变动前级别档次", "变动前职务工资", "变动前级别工资", "变动前津贴工资", "变动前工资合计", # 排序后更新
"变动后级别档次", "变动后职务工资", "变动后级别工资", "变动后津贴工资", "变动后工资合计"]) # 复杂计算更新
add_history(History_pd, row, promote)
History_pd = History_pd.sort_values(by="变动后时间").reset_index(drop=True)
if History_pd.at[0,"变动后时间"] != row["入职时间"]:
raise Exception(f"入职时间晚于其他时间:{row['入职时间']} < {History_pd.at[0,'变动后时间']} ({History_pd.at[0,'变动原因']})")
calc_history(History_pd, row)
fill_history_info(ws, History_pd)# 填充历史记录
# 将当前人员的历史记录添加到总表中
all_history = pd.concat([all_history, History_pd], ignore_index=True)
wb.save(f"个人台账/{row['身份证号码']}_{row['姓名']}.xlsx")
except Exception as e:
logging.error(f"{row['身份证号码']}:{e}")
# 保存所有历史记录到Excel文件
all_history["变动后时间"] = all_history["变动后时间"].apply(lambda x: format_time_ymd(x, "历史记录时间"))
all_history["变动前时间"] = all_history["变动前时间"].apply(lambda x: format_time_ymd(x, "历史记录时间"))
all_history.to_excel("所有人员历史记录.xlsx", index=False)
logging.info("所有人员历史记录已保存到'所有人员历史记录.xlsx'")
fill_roster()
if max_promote > 0:
logging.warning(f"最多有[{max_promote}]条晋升信息,需要调整模板。记得同时调整薪资历史的起始行和个人评价结果。")
if max_history > 0:
logging.warning(f"最多有[{max_history}]条薪资历史,需要调整模板。")
if __name__ == "__main__":
main()