486 lines
24 KiB
Python
486 lines
24 KiB
Python
import pandas as pd
|
||
from openpyxl.utils import get_column_letter
|
||
from openpyxl import load_workbook
|
||
from datetime import datetime
|
||
from dateutil.relativedelta import relativedelta
|
||
import logging, os
|
||
|
||
# 配置日志记录
|
||
logging.basicConfig(filename='log.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||
|
||
# 全局变量
|
||
|
||
## 常量
|
||
|
||
P_LIMIT = 6 # 最大晋升次数
|
||
P_START = 10 # 晋升记录开始行
|
||
H_START = 15 + P_LIMIT # 历史记录开始行
|
||
|
||
NOWTIME = datetime.now()
|
||
|
||
## 通过函数读取
|
||
BaseData = pd.DataFrame()
|
||
Promote = pd.DataFrame()
|
||
Rule_Role = []
|
||
Rule_Level = []
|
||
Rule_RoleName = []
|
||
Level_Limit = pd.DataFrame()
|
||
Promote_Level = pd.DataFrame()
|
||
Promote_verify = pd.DataFrame()
|
||
Allowance = []
|
||
|
||
## 统计量
|
||
max_promote = 0
|
||
max_history = 0
|
||
|
||
# 工具函数
|
||
|
||
def custom_date_parser(x):
|
||
try:
|
||
return datetime.strptime(x, '%Y-%m-%d')
|
||
except:
|
||
return x
|
||
|
||
def format_time(dt,info):
|
||
try:
|
||
return dt.strftime("%Y.%m")
|
||
except:
|
||
logging.warning(f"[{info}]时间格式错误:{dt}")
|
||
return dt
|
||
|
||
def format_time_ymd(dt,info):
|
||
try:
|
||
return dt.strftime("%Y.%m.%d")
|
||
|
||
except:
|
||
logging.warning(f"[{info}]时间格式错误:{dt}")
|
||
return dt
|
||
|
||
def to_int(x):
|
||
try:
|
||
return int(x)
|
||
except:
|
||
return 0
|
||
|
||
def fallback(x):
|
||
for i in x:
|
||
if pd.notna(i) and i != '':
|
||
return i
|
||
return ''
|
||
|
||
def split_level(level:str):
|
||
try:
|
||
parts = level.split('-')
|
||
return (int(parts[0]), int(parts[1]))
|
||
except:
|
||
raise Exception(f"职级[{level}]格式错误")
|
||
|
||
def calculate_seniority(row, year):
|
||
return year - row["参加工作时间"].year + row["工龄调增"] - row["工龄调减"] + 1
|
||
|
||
# 读取信息
|
||
|
||
def read_base_data(): # 读取员工数据
|
||
global BaseData
|
||
BaseData = pd.read_excel("原数据.xlsx", sheet_name="入职信息")
|
||
for col in ["出生年月","任职年月","原职时间","参加工作时间","入职时间", "晋档起始", "晋级起始", "日期2"]:
|
||
BaseData[col] = BaseData[col].apply(custom_date_parser)
|
||
for col in ["晋档起始", "晋级起始"]:
|
||
BaseData[col] = BaseData[col].apply(lambda x: datetime(x.year, 1, 1) if isinstance(x, datetime) else x)
|
||
BaseData["Latest_Role"] = None
|
||
BaseData["Latest_Prom"] = None
|
||
BaseData["工龄调增"] = BaseData["工龄调增"].apply(to_int)
|
||
BaseData["工龄调减"] = BaseData["工龄调减"].apply(to_int)
|
||
BaseData["学龄"] = BaseData["学龄"].apply(to_int)
|
||
BaseData["工龄"] = BaseData.apply(lambda row: NOWTIME.year-row["参加工作时间"].year+row["工龄调增"]-row["工龄调减"]+1, axis=1)
|
||
|
||
def read_promote(): # 读取晋升记录
|
||
global Promote
|
||
Promote = pd.read_excel("原数据.xlsx", sheet_name="职务变动")
|
||
for col in ["任职时间","工资执行时间"]:
|
||
Promote[col] = Promote[col].apply(custom_date_parser)
|
||
|
||
def read_rule_role(): # 读取职位规则
|
||
global Rule_Role
|
||
col = 4
|
||
while True:
|
||
try:
|
||
rule = pd.read_excel("原数据.xlsx", sheet_name="职位规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
|
||
Rule_Role.append({
|
||
"start":rule.iloc[0,1],
|
||
"end":rule.iloc[1,1],
|
||
"rule":pd.read_excel("原数据.xlsx", sheet_name="职位规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2, names=["role","salary"])
|
||
})
|
||
col += 2
|
||
except:
|
||
break
|
||
Rule_Role = sorted(Rule_Role, key=lambda x: x['start'])
|
||
|
||
def read_rule_level(): # 读取职级规则
|
||
global Rule_Level
|
||
col = 1
|
||
while True: # 职级规则
|
||
try:
|
||
rule = pd.read_excel("原数据.xlsx", sheet_name="职级规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
|
||
Rule_Level.append({
|
||
"start":rule.iloc[0,1],
|
||
"end":rule.iloc[1,1],
|
||
"rule":pd.read_excel("原数据.xlsx", sheet_name="职级规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2, names=["level","salary"])
|
||
})
|
||
col += 2
|
||
except:
|
||
break
|
||
Rule_Level = sorted(Rule_Level, key=lambda x: x['start'])
|
||
|
||
def read_rule_role_name(): # 读取名称变化规则
|
||
global Rule_RoleName
|
||
col = 1
|
||
while True: # 名称变化
|
||
try:
|
||
rule = pd.read_excel("原数据.xlsx", sheet_name="名称变化",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
|
||
Rule_RoleName.append({
|
||
"start":rule.iloc[0,1],
|
||
"end":rule.iloc[1,1],
|
||
"rule":pd.read_excel("原数据.xlsx", sheet_name="名称变化",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2)
|
||
})
|
||
col += 2
|
||
except:
|
||
break
|
||
Rule_RoleName = sorted(Rule_RoleName, key=lambda x: x['start'])
|
||
|
||
def read_level_limit(): # 读取职位对应的级别限制
|
||
global Level_Limit, Promote_Level
|
||
Level_Limit_tmp = pd.read_excel("原数据.xlsx", sheet_name="职位规则", usecols="A:A", skiprows=2, names=["limit"])
|
||
Promote_Level_tmp = pd.read_excel("原数据.xlsx", sheet_name="职位规则", usecols="B:C", skiprows=2, names=["级别","档次"])
|
||
Level_Limit = {}
|
||
Promote_Level = {}
|
||
for rule in Rule_Role:
|
||
for index, row in rule["rule"].iterrows():
|
||
Level_Limit[row["role"]] = Level_Limit_tmp.iloc[index]["limit"]
|
||
Promote_Level[row["role"]] = (Promote_Level_tmp.iloc[index]["级别"], Promote_Level_tmp.iloc[index]["档次"])
|
||
|
||
def read_promote_verify(): # 读取晋升校验
|
||
global Promote_verify
|
||
Promote_verify = pd.read_excel("原数据.xlsx", sheet_name="晋升校验", usecols="A:B")
|
||
|
||
def read_allowance(): # 读取津贴
|
||
global Allowance
|
||
col = 1
|
||
while True:
|
||
try:
|
||
rule = pd.read_excel("原数据.xlsx", sheet_name="津贴规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
|
||
Allowance.append({
|
||
"start":rule.iloc[0,1],
|
||
"end":rule.iloc[1,1],
|
||
"rule":pd.read_excel("原数据.xlsx", sheet_name="津贴规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2, names=["level","salary"])
|
||
})
|
||
col += 2
|
||
except:
|
||
break
|
||
Allowance = sorted(Allowance, key=lambda x: x['start'])
|
||
|
||
def load_people():
|
||
read_base_data()
|
||
read_promote()
|
||
logging.info("人员信息加载完成")
|
||
|
||
def load_rule():
|
||
read_rule_role()
|
||
read_rule_level()
|
||
read_rule_role_name()
|
||
read_level_limit()
|
||
read_promote_verify()
|
||
read_allowance()
|
||
logging.info("规则加载完成")
|
||
|
||
# 获取配置类函数
|
||
|
||
def role_salary(role:str, time):
|
||
for rule in Rule_Role:
|
||
if rule["start"] <= time <= rule["end"]:
|
||
try:
|
||
tmp = rule["rule"][rule["rule"]["role"] == role].iloc[0]
|
||
return tmp["salary"]
|
||
except:
|
||
if role == "":
|
||
logging.error("空职级")
|
||
else:
|
||
logging.warning(f"职位[{role}]在[{time}]时不存在工资规则")
|
||
logging.warning(f"时间[{time}]时不存在职位工资规则")
|
||
return 0
|
||
|
||
def level_salary(level:str, time):
|
||
for rule in Rule_Level:
|
||
if rule["start"] <= time <= rule["end"]:
|
||
try:
|
||
tmp = rule["rule"][rule["rule"]["level"] == level].iloc[0]
|
||
return tmp["salary"]
|
||
except:
|
||
logging.warning(f"职级[{level}]在[{time}]时不存在工资规则")
|
||
logging.warning(f"时间[{time}]时不存在职级工资规则")
|
||
return 0
|
||
|
||
def role_limit(role:str):
|
||
if role in Level_Limit.keys():
|
||
return Level_Limit[role]
|
||
else:
|
||
logging.warning(f"职位[{role}]不存在职级上限规则")
|
||
return -1
|
||
|
||
def allowance(role:str, level:int, time):
|
||
for rule in Allowance:
|
||
if rule["start"] <= time <= rule["end"]:
|
||
try:
|
||
tmp = rule["rule"][rule["rule"]["level"] == f"{role}-{level}"].iloc[0]
|
||
return tmp["salary"]
|
||
except:
|
||
logging.warning(f"组合[{role}-{level}]在[{time}]时不存在工资规则")
|
||
logging.warning(f"时间[{time}]时不存在津贴规则")
|
||
raise 0
|
||
|
||
# 填充类辅助函数
|
||
|
||
def fill_basic_info(ws, row):# 填充基本信息
|
||
ws.cell(row=2, column=1, value=f"部门:{row['部门']} 职务:{row['职务']}")
|
||
ws.cell(row=3, column=2, value=row["姓名"])
|
||
ws.cell(row=3, column=4, value=row["性别"])
|
||
ws.cell(row=3, column=6, value=row["民族"])
|
||
ws.cell(row=5, column=2, value=row["现职"])
|
||
ws.cell(row=5, column=6, value=row["任职年限"])
|
||
ws.cell(row=6, column=2, value=row["原职"])
|
||
ws.cell(row=6, column=6, value=row["原职年限"])
|
||
ws.cell(row=7, column=2, value=row["学历"])
|
||
ws.cell(row=7, column=4, value=row["学龄"])
|
||
ws.cell(row=7, column=6, value=row["套改年限"])
|
||
ws.cell(row=7, column=7, value=row["职务工资"])
|
||
ws.cell(row=7, column=8, value=row["职务工资金额"])
|
||
ws.cell(row=7, column=9, value=row["级别工资"])
|
||
ws.cell(row=7, column=10, value=row["职务工资金额"])
|
||
ws.cell(row=17, column=1, value=row["个人评价结果"])
|
||
ws.cell(row=3, column=8, value=format_time(row["出生年月"],"出生年月"))
|
||
ws.cell(row=3, column=10, value=format_time(row["参加工作时间"],"参加工作时间"))
|
||
ws.cell(row=5, column=4, value=format_time(row["任职年月"],"任职年月"))
|
||
ws.cell(row=6, column=4, value=format_time(row["原职时间"],"原职时间"))
|
||
|
||
def fill_prompt_info(ws, promote):# 填充晋升信息
|
||
for index, prow in promote.iterrows():
|
||
if index > P_LIMIT-1:
|
||
logging.warning(f"超过[{P_LIMIT}]条晋升信息,共[{promote.shape[0]}]条。")
|
||
max_promote = max(max_promote, promote.shape[0])
|
||
break
|
||
ws.cell(row=P_START+index, column=1, value=format_time(prow["任职时间"],"晋升时间"))
|
||
ws.cell(row=P_START+index, column=2, value=prow["变动批注"])
|
||
ws.cell(row=P_START+index, column=3, value="任"+prow["新职务"])
|
||
|
||
def fill_history_info(ws, History_pd):# 填充历史记录
|
||
for index, hrow in History_pd.iterrows(): # 打印
|
||
for col in range(1, 11): # 复制样式
|
||
ws.cell(row=H_START+index, column=col)._style = ws.cell(row=H_START, column=col)._style
|
||
try:
|
||
ws.cell(row=H_START+index, column=1, value=format_time(hrow["变动后时间"],"历史时间"))
|
||
except:
|
||
logging.warning(f"历史时间格式错误:{hrow['时间']}")
|
||
ws.cell(row=H_START+index, column=2, value=hrow["变动后职务"])
|
||
ws.cell(row=H_START+index, column=3, value=hrow["变动后职务工资"])
|
||
ws.cell(row=H_START+index, column=4, value=hrow["变动后级别档次"])
|
||
ws.cell(row=H_START+index, column=5, value=hrow["变动后级别工资"])
|
||
ws.cell(row=H_START+index, column=6, value=hrow["变动后工资合计"])
|
||
ws.cell(row=H_START+index, column=7, value=hrow["变动原因"])
|
||
# ws.cell(row=H_START+index, column=8, value=index) # Debug
|
||
|
||
def fill_roster(): # 填充花名册
|
||
wb = load_workbook("模板/汇总名册.xlsx")
|
||
ws = wb.active
|
||
for index, row in BaseData.iterrows(): # 汇总
|
||
try:
|
||
logging.info(f"汇总:第[{index+1}]共[{BaseData.shape[0]}]现在是[{row['身份证号码']}]")
|
||
for col in range(1,16):
|
||
ws.cell(row=6+index, column=col)._style = ws.cell(row=6, column=col)._style
|
||
ws.cell(row=6+index, column=1, value=index+1)
|
||
ws.cell(row=6+index, column=2, value=row["姓名"])
|
||
ws.cell(row=6+index, column=3, value=row["性别"])
|
||
ws.cell(row=6+index, column=4, value=format_time(row["出生年月"], "出生年月"))
|
||
ws.cell(row=6+index, column=5, value=format_time(row["参加工作时间"], "参加工作时间"))
|
||
ws.cell(row=6+index, column=6, value=fallback([row["现学历"],row["学历"]]))
|
||
ws.cell(row=6+index, column=7, value=row['工龄']+row["学龄"])
|
||
ws.cell(row=6+index, column=8, value=row['工龄'])
|
||
ws.cell(row=6+index, column=9, value=row["学龄"])
|
||
ws.cell(row=6+index, column=10, value=row["工龄调减"])
|
||
ws.cell(row=6+index, column=11, value=row["Latest_Role"])
|
||
ws.cell(row=6+index, column=12, value=format_time(row["Latest_Prom"], "Latest_Prom"))
|
||
ws.cell(row=6+index, column=13, value=row["职务2"])
|
||
ws.cell(row=6+index, column=14, value=format_time(row["日期2"], "日期2"))
|
||
except Exception as e:
|
||
logging.error(f"{row['身份证号码']}:{e}")
|
||
wb.save("汇总名册.xlsx") # 保存汇总
|
||
|
||
def main():
|
||
|
||
load_people()
|
||
load_rule()
|
||
|
||
# 创建一个空的DataFrame来存储所有历史记录
|
||
all_history = pd.DataFrame(columns=[
|
||
"身份证号码", "姓名", "工龄","变动原因", "晋升备注",
|
||
"变动前时间", "变动前职务", "变动前级别档次", "变动前职务工资", "变动前级别工资", "变动前津贴工资",
|
||
"变动后时间", "变动后职务", "变动后级别档次", "变动后职务工资", "变动后级别工资", "变动后津贴工资",
|
||
"五年1级年份", "两年1档年份"])
|
||
|
||
for index, row in BaseData.iterrows():
|
||
try:
|
||
logging.info(f"台账:第[{index+1}]共[{BaseData.shape[0]}]现在是[{row['身份证号码']}]")
|
||
BaseData.at[index, "Latest_Role"] = row["初始职务"]
|
||
BaseData.at[index, "Latest_Prom"] = row["入职时间"]
|
||
wb = load_workbook("模板/个人台账.xlsx")
|
||
ws = wb.active
|
||
fill_basic_info(ws, row)# 填充基本信息
|
||
|
||
# 查找晋升信息
|
||
promote = Promote[Promote["身份证号"] == row["身份证号码"]]
|
||
if not promote.empty:
|
||
promote = promote.sort_values(by=["工资执行时间", "任职时间"], ascending=[False, False]).reset_index(drop=True)
|
||
BaseData.at[index, "Latest_Role"] = promote.iloc[0]["新职务"]
|
||
BaseData.at[index, "Latest_Prom"] = promote.iloc[0]["任职时间"]
|
||
# 把原职务取出来
|
||
if promote.shape[0] > 1:
|
||
BaseData.at[index, "职务2"] = promote.iloc[1]["新职务"]
|
||
BaseData.at[index, "日期2"] = promote.iloc[1]["任职时间"]
|
||
else:
|
||
BaseData.at[index, "职务2"] = row["初始职务"]
|
||
BaseData.at[index, "日期2"] = row["入职时间"]
|
||
promote = promote.sort_values(by=["工资执行时间", "任职时间"]).reset_index(drop=True)
|
||
fill_prompt_info(ws, promote)# 填充晋升信息
|
||
|
||
# 根据规则匹配职级薪资
|
||
History_pd = pd.DataFrame(columns=[
|
||
"身份证号码", "姓名", # 统一填入
|
||
"变动后时间", "变动后职务", "变动原因", "晋升备注", # 直接填入
|
||
"工龄", "五年1级年份", "两年1档年份", # 简单计算更新
|
||
"变动前时间", "变动前职务", "变动前级别档次", "变动前职务工资", "变动前级别工资", "变动前津贴工资", "变动前工资合计", # 排序后更新
|
||
"变动后级别档次", "变动后职务工资", "变动后级别工资", "变动后津贴工资", "变动后工资合计"]) # 复杂计算更新
|
||
|
||
# 添加入职记录
|
||
History_pd.loc[len(History_pd), ["变动后时间","变动后职务","变动原因","变动后级别档次"]] = [
|
||
row["入职时间"],row["初始职务"],"套改/定级",row["入职时的初始级别"]]
|
||
for index, prow in promote.iterrows(): # 添加晋升记录
|
||
History_pd.loc[len(History_pd),["变动后时间","变动后职务","变动原因","晋升备注"]] = [
|
||
prow["工资执行时间"]+relativedelta(hours=prow["任职时间"].month,minutes=prow["任职时间"].day),
|
||
prow["新职务"],"晋升",f"任{prow['新职务']} {prow['变动批注'] if pd.notna(prow['变动批注']) else ''}"]
|
||
try:
|
||
# 添加晋档记录
|
||
calctime=row["晋档起始"] + relativedelta(minute=1)
|
||
while True:
|
||
calctime += relativedelta(years=row["晋档间隔"])
|
||
if calctime > NOWTIME:
|
||
break
|
||
History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [
|
||
calctime,"两年晋档"]
|
||
calctime=row["晋级起始"]
|
||
# 添加晋级记录
|
||
while True:
|
||
calctime += relativedelta(years=row["晋级间隔"])
|
||
if calctime > NOWTIME:
|
||
break
|
||
History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [
|
||
calctime,"五年晋级"]
|
||
except:
|
||
raise Exception(f"晋级、档起始或间隔时间格式错误:{row['晋级起始']}-{row['晋档起始']}-{row['晋级间隔']}-{row['晋档间隔']}")
|
||
# 工资调标
|
||
for rule in Rule_Level:
|
||
if row["入职时间"] < rule["start"]:
|
||
History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [rule["start"], "工资调标"]
|
||
History_pd["身份证号码"] = row["身份证号码"]
|
||
History_pd["姓名"] = row["姓名"]
|
||
History_pd["工龄"] = History_pd.apply(lambda x: calculate_seniority(row, x["变动后时间"].year), axis=1)
|
||
History_pd["五年1级年份"] = History_pd.apply(lambda x: x["变动后时间"].year - row["晋级起始"].year, axis=1)
|
||
History_pd["两年1档年份"] = History_pd.apply(lambda x: x["变动后时间"].year - row["晋档起始"].year, axis=1)
|
||
History_pd = History_pd.sort_values(by="变动后时间").reset_index(drop=True)
|
||
|
||
if History_pd.at[0,"变动后时间"] != row["入职时间"]:
|
||
raise Exception(f"入职时间晚于其他时间:{row['入职时间']} < {History_pd.at[0,'变动后时间']} ({History_pd.at[0,'变动原因']})")
|
||
|
||
# 复杂数据计算
|
||
for index, hrow in History_pd.iterrows():
|
||
# 继承上一条复杂计算数据
|
||
if index > 0:
|
||
History_pd.at[index, "变动前时间"] = History_pd.at[index-1, "变动后时间"]
|
||
History_pd.at[index, "变动前职务"] = History_pd.at[index-1, "变动后职务"]
|
||
History_pd.at[index, "变动前级别档次"] = History_pd.at[index-1, "变动后级别档次"]
|
||
History_pd.at[index, "变动前职务工资"] = History_pd.at[index-1, "变动后职务工资"]
|
||
History_pd.at[index, "变动前级别工资"] = History_pd.at[index-1, "变动后级别工资"]
|
||
History_pd.at[index, "变动前津贴工资"] = History_pd.at[index-1, "变动后津贴工资"]
|
||
History_pd.at[index, "变动前工资合计"] = History_pd.at[index-1, "变动后工资合计"]
|
||
# 继承名称
|
||
if pd.isna(hrow["变动后职务"]):
|
||
History_pd.at[index,"变动后职务"] = History_pd.at[index,"变动前职务"]
|
||
# 名称变化
|
||
for rule in Rule_RoleName:
|
||
if rule["start"] <= hrow["变动后时间"] <= rule["end"]:
|
||
if History_pd.at[index,"变动后职务"] in rule["rule"]["原名称"].values:
|
||
History_pd.at[index,"变动后职务"] = rule["rule"][rule["rule"]["原名称"] == History_pd.at[index,"变动后职务"]]["现名称"].values[0]
|
||
# 级别档次
|
||
if index > 0:
|
||
jb, dc = split_level(History_pd.at[index,"变动前级别档次"])
|
||
if hrow["变动原因"] == "两年晋档":
|
||
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}"
|
||
elif hrow["变动原因"] == "五年晋级":
|
||
if jb-1 < 1 or jb-1 < role_limit(History_pd.iloc[index]["变动后职务"]):
|
||
|
||
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}"
|
||
else:
|
||
History_pd.at[index, "变动后级别档次"] = f"{jb-1}-{dc-1}"
|
||
elif hrow["变动原因"] == "工资调标":
|
||
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc}"
|
||
elif hrow["变动原因"] == "晋升":
|
||
role = History_pd.iloc[index]["变动后职务"]
|
||
if role in Promote_Level.keys():
|
||
new_jb = jb + Promote_Level[role][0]
|
||
new_dc = dc + Promote_Level[role][1]
|
||
if pd.isna(new_jb) or pd.isna(new_dc):
|
||
raise Exception(f"级别档次计算出现NaN值:[{new_jb}]-[{new_dc}]({role})")
|
||
else:
|
||
new_jb = int(new_jb)
|
||
new_dc = int(new_dc)
|
||
if (History_pd.at[index,"变动后职务"] in Promote_verify.iloc[:,0].values and
|
||
role in Promote_verify.iloc[:,1].values):
|
||
logging.info(f"[{row['身份证号码']}]命中晋升校验规则[{History_pd.at[index,'变动前职务']}]->[{role}]")
|
||
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc}"
|
||
elif new_jb < role_limit(role):
|
||
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}"
|
||
elif new_jb < 1 or new_dc < 1:
|
||
raise Exception(f"级别档次小于0:[{new_jb}]-[{new_dc}]")
|
||
else:
|
||
History_pd.at[index, "变动后级别档次"] = f"{new_jb}-{new_dc}"
|
||
else:
|
||
logging.warning(f"职位[{role}]不存在职级上限规则")
|
||
else:
|
||
History_pd.at[index, "变动后级别档次"] = History_pd.at[index, "变动前级别档次"]
|
||
# 计算工资
|
||
History_pd.at[index, "变动后职务工资"] = role_salary(History_pd.iloc[index]["变动后职务"], hrow["变动后时间"])
|
||
History_pd.at[index, "变动后级别工资"] = level_salary(History_pd.iloc[index]["变动后级别档次"], hrow["变动后时间"])
|
||
History_pd.at[index, "变动后工资合计"] = to_int(History_pd.iloc[index]["变动后职务工资"]) + to_int(History_pd.iloc[index]["变动后级别工资"])
|
||
|
||
fill_history_info(ws, History_pd)# 填充历史记录
|
||
|
||
# 将当前人员的历史记录添加到总表中
|
||
all_history = pd.concat([all_history, History_pd], ignore_index=True)
|
||
|
||
wb.save(f"个人台账/{row['身份证号码']}_{row['姓名']}.xlsx")
|
||
except Exception as e:
|
||
logging.error(f"{row['身份证号码']}:{e}")
|
||
|
||
# 保存所有历史记录到Excel文件
|
||
all_history["变动后时间"] = all_history["变动后时间"].apply(lambda x: format_time_ymd(x, "历史记录时间"))
|
||
all_history["变动前时间"] = all_history["变动前时间"].apply(lambda x: format_time_ymd(x, "历史记录时间"))
|
||
all_history.to_excel("所有人员历史记录.xlsx", index=False)
|
||
logging.info("所有人员历史记录已保存到'所有人员历史记录.xlsx'")
|
||
|
||
fill_roster()
|
||
|
||
if max_promote > 0:
|
||
logging.warning(f"最多有[{max_promote}]条晋升信息,需要调整模板。记得同时调整薪资历史的起始行和个人评价结果。")
|
||
if max_history > 0:
|
||
logging.warning(f"最多有[{max_history}]条薪资历史,需要调整模板。")
|
||
|
||
if __name__ == "__main__":
|
||
main() |