refactor: 重构数据读取逻辑以提升代码结构和可读性
- 将员工数据、晋升记录、职位规则和职级规则的读取逻辑封装为独立函数,增强代码的模块化 - 更新全局变量的定义,确保数据处理的一致性和清晰性 - 移除冗余代码,提升整体代码的可维护性和可读性
This commit is contained in:
parent
bef29011c3
commit
f77bf1d096
157
main.py
157
main.py
@ -16,11 +16,26 @@ P_LIMIT = 6 # 最大晋升次数
|
|||||||
P_START = 10 # 晋升记录开始行
|
P_START = 10 # 晋升记录开始行
|
||||||
H_START = 15 + P_LIMIT # 历史记录开始行
|
H_START = 15 + P_LIMIT # 历史记录开始行
|
||||||
|
|
||||||
nowtime = datetime.now()
|
NOWTIME = datetime.now()
|
||||||
|
|
||||||
|
# 全局变量
|
||||||
|
|
||||||
|
## 通过函数读取
|
||||||
|
BaseData = pd.DataFrame()
|
||||||
|
Promote = pd.DataFrame()
|
||||||
|
Rule_Role = []
|
||||||
|
Rule_Level = []
|
||||||
|
Rule_RoleName = []
|
||||||
|
Level_Limit = pd.DataFrame()
|
||||||
|
Promote_Level = pd.DataFrame()
|
||||||
|
Promote_verify = pd.DataFrame()
|
||||||
|
|
||||||
|
## 统计量
|
||||||
|
max_promote = 0
|
||||||
|
max_history = 0
|
||||||
|
|
||||||
# 工具函数
|
# 工具函数
|
||||||
|
|
||||||
# 自定义日期解析函数
|
|
||||||
def custom_date_parser(x):
|
def custom_date_parser(x):
|
||||||
try:
|
try:
|
||||||
return datetime.strptime(x, '%Y-%m-%d')
|
return datetime.strptime(x, '%Y-%m-%d')
|
||||||
@ -53,34 +68,32 @@ def split_level(level:str):
|
|||||||
except:
|
except:
|
||||||
raise Exception(f"职级[{level}]格式错误")
|
raise Exception(f"职级[{level}]格式错误")
|
||||||
|
|
||||||
# 读取员工数据
|
# 读取信息
|
||||||
|
|
||||||
BaseData = pd.read_excel("原数据.xlsx", sheet_name="入职信息")
|
def read_base_data(): # 读取员工数据
|
||||||
|
global BaseData
|
||||||
for col in ["出生年月","任职年月","原职时间","参加工作时间","入职时间", "晋档起始", "晋级起始", "日期2"]:
|
BaseData = pd.read_excel("原数据.xlsx", sheet_name="入职信息")
|
||||||
|
for col in ["出生年月","任职年月","原职时间","参加工作时间","入职时间", "晋档起始", "晋级起始", "日期2"]:
|
||||||
BaseData[col] = BaseData[col].apply(custom_date_parser)
|
BaseData[col] = BaseData[col].apply(custom_date_parser)
|
||||||
for col in ["晋档起始", "晋级起始"]:
|
for col in ["晋档起始", "晋级起始"]:
|
||||||
BaseData[col] = BaseData[col].apply(lambda x: datetime(x.year, 1, 1) if isinstance(x, datetime) else x)
|
BaseData[col] = BaseData[col].apply(lambda x: datetime(x.year, 1, 1) if isinstance(x, datetime) else x)
|
||||||
|
BaseData["Latest_Role"] = None
|
||||||
|
BaseData["Latest_Prom"] = None
|
||||||
|
BaseData["工龄调增"] = BaseData["工龄调增"].apply(to_int)
|
||||||
|
BaseData["工龄调减"] = BaseData["工龄调减"].apply(to_int)
|
||||||
|
BaseData["学龄"] = BaseData["学龄"].apply(to_int)
|
||||||
|
BaseData["工龄"] = BaseData.apply(lambda row: NOWTIME.year-row["参加工作时间"].year+row["工龄调增"]-row["工龄调减"]+1, axis=1)
|
||||||
|
|
||||||
BaseData["工龄调增"] = BaseData["工龄调增"].apply(to_int)
|
def read_promote(): # 读取晋升记录
|
||||||
BaseData["工龄调减"] = BaseData["工龄调减"].apply(to_int)
|
global Promote
|
||||||
BaseData["学龄"] = BaseData["学龄"].apply(to_int)
|
Promote = pd.read_excel("原数据.xlsx", sheet_name="职务变动")
|
||||||
BaseData["工龄"] = BaseData.apply(lambda row: nowtime.year-row["参加工作时间"].year+row["工龄调增"]-row["工龄调减"]+1, axis=1)
|
for col in ["任职时间","工资执行时间"]:
|
||||||
|
|
||||||
# 读取晋升记录
|
|
||||||
|
|
||||||
Promote = pd.read_excel("原数据.xlsx", sheet_name="职务变动") #
|
|
||||||
|
|
||||||
for col in ["任职时间","工资执行时间"]:
|
|
||||||
Promote[col] = Promote[col].apply(custom_date_parser)
|
Promote[col] = Promote[col].apply(custom_date_parser)
|
||||||
|
|
||||||
logging.info("人员信息加载完成")
|
def read_rule_role(): # 读取职位规则
|
||||||
|
global Rule_Role
|
||||||
# 读取规则
|
col = 4
|
||||||
|
while True:
|
||||||
Rule_Role = []
|
|
||||||
col = 4
|
|
||||||
while True: # 职位规则
|
|
||||||
try:
|
try:
|
||||||
rule = pd.read_excel("原数据.xlsx", sheet_name="职位规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
|
rule = pd.read_excel("原数据.xlsx", sheet_name="职位规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
|
||||||
Rule_Role.append({
|
Rule_Role.append({
|
||||||
@ -91,9 +104,12 @@ while True: # 职位规则
|
|||||||
col += 2
|
col += 2
|
||||||
except:
|
except:
|
||||||
break
|
break
|
||||||
Rule_Level = []
|
Rule_Role = sorted(Rule_Role, key=lambda x: x['start'])
|
||||||
col = 1
|
|
||||||
while True: # 职级规则
|
def read_rule_level(): # 读取职级规则
|
||||||
|
global Rule_Level
|
||||||
|
col = 1
|
||||||
|
while True: # 职级规则
|
||||||
try:
|
try:
|
||||||
rule = pd.read_excel("原数据.xlsx", sheet_name="职级规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
|
rule = pd.read_excel("原数据.xlsx", sheet_name="职级规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
|
||||||
Rule_Level.append({
|
Rule_Level.append({
|
||||||
@ -104,9 +120,12 @@ while True: # 职级规则
|
|||||||
col += 2
|
col += 2
|
||||||
except:
|
except:
|
||||||
break
|
break
|
||||||
Rule_RoleName = []
|
Rule_Level = sorted(Rule_Level, key=lambda x: x['start'])
|
||||||
col = 1
|
|
||||||
while True: # 名称变化
|
def read_rule_role_name(): # 读取名称变化规则
|
||||||
|
global Rule_RoleName
|
||||||
|
col = 1
|
||||||
|
while True: # 名称变化
|
||||||
try:
|
try:
|
||||||
rule = pd.read_excel("原数据.xlsx", sheet_name="名称变化",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
|
rule = pd.read_excel("原数据.xlsx", sheet_name="名称变化",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
|
||||||
Rule_RoleName.append({
|
Rule_RoleName.append({
|
||||||
@ -117,25 +136,37 @@ while True: # 名称变化
|
|||||||
col += 2
|
col += 2
|
||||||
except:
|
except:
|
||||||
break
|
break
|
||||||
|
Rule_RoleName = sorted(Rule_RoleName, key=lambda x: x['start'])
|
||||||
|
|
||||||
# 读取职位对应的级别限制
|
def read_level_limit(): # 读取职位对应的级别限制
|
||||||
Level_Limit_tmp = pd.read_excel("原数据.xlsx", sheet_name="职位规则", usecols="A:A", skiprows=2, names=["limit"])
|
global Level_Limit, Promote_Level
|
||||||
Promote_Level_tmp = pd.read_excel("原数据.xlsx", sheet_name="职位规则", usecols="B:C", skiprows=2, names=["级别","档次"])
|
Level_Limit_tmp = pd.read_excel("原数据.xlsx", sheet_name="职位规则", usecols="A:A", skiprows=2, names=["limit"])
|
||||||
Level_Limit = {}
|
Promote_Level_tmp = pd.read_excel("原数据.xlsx", sheet_name="职位规则", usecols="B:C", skiprows=2, names=["级别","档次"])
|
||||||
Promote_Level = {}
|
Level_Limit = {}
|
||||||
for rule in Rule_Role:
|
Promote_Level = {}
|
||||||
|
for rule in Rule_Role:
|
||||||
for index, row in rule["rule"].iterrows():
|
for index, row in rule["rule"].iterrows():
|
||||||
Level_Limit[row["role"]] = Level_Limit_tmp.iloc[index]["limit"]
|
Level_Limit[row["role"]] = Level_Limit_tmp.iloc[index]["limit"]
|
||||||
Promote_Level[row["role"]] = (Promote_Level_tmp.iloc[index]["级别"], Promote_Level_tmp.iloc[index]["档次"])
|
Promote_Level[row["role"]] = (Promote_Level_tmp.iloc[index]["级别"], Promote_Level_tmp.iloc[index]["档次"])
|
||||||
|
|
||||||
# 晋升校验
|
def read_promote_verify(): # 读取晋升校验
|
||||||
Promote_verify = Level_Limit_tmp = pd.read_excel("原数据.xlsx", sheet_name="晋升校验", usecols="A:B")
|
global Promote_verify
|
||||||
|
Promote_verify = pd.read_excel("原数据.xlsx", sheet_name="晋升校验", usecols="A:B")
|
||||||
|
|
||||||
logging.info("规则加载完成")
|
def load_people():
|
||||||
|
read_base_data()
|
||||||
|
read_promote()
|
||||||
|
logging.info("人员信息加载完成")
|
||||||
|
|
||||||
Rule_Role = sorted(Rule_Role, key=lambda x: x['start'])
|
def load_rule():
|
||||||
Rule_Level = sorted(Rule_Level, key=lambda x: x['start'])
|
read_rule_role()
|
||||||
Rule_RoleName = sorted(Rule_RoleName, key=lambda x: x['start'])
|
read_rule_level()
|
||||||
|
read_rule_role_name()
|
||||||
|
read_level_limit()
|
||||||
|
read_promote_verify()
|
||||||
|
logging.info("规则加载完成")
|
||||||
|
|
||||||
|
# 获取配置类函数
|
||||||
|
|
||||||
def role_salary(role:str, time):
|
def role_salary(role:str, time):
|
||||||
for rule in Rule_Role:
|
for rule in Rule_Role:
|
||||||
@ -167,8 +198,7 @@ def role_limit(role:str):
|
|||||||
logging.warning(f"职位[{role}]不存在职级上限规则")
|
logging.warning(f"职位[{role}]不存在职级上限规则")
|
||||||
return -1
|
return -1
|
||||||
|
|
||||||
max_promote = 0
|
# 填充类辅助函数
|
||||||
max_history = 0
|
|
||||||
|
|
||||||
def fill_basic_info(ws, row):# 填充基本信息
|
def fill_basic_info(ws, row):# 填充基本信息
|
||||||
ws.cell(row=2, column=1, value=f"部门:{row['部门']} 职务:{row['职务']}")
|
ws.cell(row=2, column=1, value=f"部门:{row['部门']} 职务:{row['职务']}")
|
||||||
@ -218,13 +248,15 @@ def fill_history_info(ws, History_pd):# 填充历史记录
|
|||||||
ws.cell(row=H_START+index, column=7, value=hrow["变动原因"])
|
ws.cell(row=H_START+index, column=7, value=hrow["变动原因"])
|
||||||
# ws.cell(row=H_START+index, column=8, value=index) # Debug
|
# ws.cell(row=H_START+index, column=8, value=index) # Debug
|
||||||
|
|
||||||
BaseData["Latest_Role"] = None
|
def main():
|
||||||
BaseData["Latest_Prom"] = None
|
|
||||||
|
|
||||||
# 创建一个空的DataFrame来存储所有历史记录
|
load_people()
|
||||||
all_history = pd.DataFrame(columns=["身份证号码", "姓名", "时间", "职务", "职务工资", "级别档次", "级别工资", "工资合计", "变动原因", "晋升备注"])
|
load_rule()
|
||||||
|
|
||||||
for index, row in BaseData.iterrows():
|
# 创建一个空的DataFrame来存储所有历史记录
|
||||||
|
all_history = pd.DataFrame(columns=["身份证号码", "姓名", "时间", "职务", "职务工资", "级别档次", "级别工资", "工资合计", "变动原因", "晋升备注"])
|
||||||
|
|
||||||
|
for index, row in BaseData.iterrows():
|
||||||
try:
|
try:
|
||||||
logging.info(f"台账:第[{index+1}]共[{BaseData.shape[0]}]现在是[{row['身份证号码']}]")
|
logging.info(f"台账:第[{index+1}]共[{BaseData.shape[0]}]现在是[{row['身份证号码']}]")
|
||||||
BaseData.at[index, "Latest_Role"] = row["初始职务"]
|
BaseData.at[index, "Latest_Role"] = row["初始职务"]
|
||||||
@ -260,13 +292,13 @@ for index, row in BaseData.iterrows():
|
|||||||
calctime=row["晋档起始"] + relativedelta(minute=1)
|
calctime=row["晋档起始"] + relativedelta(minute=1)
|
||||||
while True: # 添加晋档记录
|
while True: # 添加晋档记录
|
||||||
calctime += relativedelta(years=row["晋档间隔"])
|
calctime += relativedelta(years=row["晋档间隔"])
|
||||||
if calctime > nowtime:
|
if calctime > NOWTIME:
|
||||||
break
|
break
|
||||||
History_pd.loc[len(History_pd)] = [calctime, "", "", "", "", "", "两年晋档", ""]
|
History_pd.loc[len(History_pd)] = [calctime, "", "", "", "", "", "两年晋档", ""]
|
||||||
calctime=row["晋级起始"]
|
calctime=row["晋级起始"]
|
||||||
while True: # 添加晋级记录
|
while True: # 添加晋级记录
|
||||||
calctime += relativedelta(years=row["晋级间隔"])
|
calctime += relativedelta(years=row["晋级间隔"])
|
||||||
if calctime > nowtime:
|
if calctime > NOWTIME:
|
||||||
break
|
break
|
||||||
History_pd.loc[len(History_pd)] = [calctime, "", "", "", "", "", "五年晋级", ""]
|
History_pd.loc[len(History_pd)] = [calctime, "", "", "", "", "", "五年晋级", ""]
|
||||||
except:
|
except:
|
||||||
@ -336,14 +368,14 @@ for index, row in BaseData.iterrows():
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"{row['身份证号码']}:{e}")
|
logging.error(f"{row['身份证号码']}:{e}")
|
||||||
|
|
||||||
# 保存所有历史记录到Excel文件
|
# 保存所有历史记录到Excel文件
|
||||||
all_history["时间"] = all_history["时间"].apply(lambda x: format_time(x, "历史记录时间"))
|
all_history["时间"] = all_history["时间"].apply(lambda x: format_time(x, "历史记录时间"))
|
||||||
all_history.to_excel("所有人员历史记录.xlsx", index=False)
|
all_history.to_excel("所有人员历史记录.xlsx", index=False)
|
||||||
logging.info("所有人员历史记录已保存到'所有人员历史记录.xlsx'")
|
logging.info("所有人员历史记录已保存到'所有人员历史记录.xlsx'")
|
||||||
|
|
||||||
wb = load_workbook("模板/汇总名册.xlsx")
|
wb = load_workbook("模板/汇总名册.xlsx")
|
||||||
ws = wb.active
|
ws = wb.active
|
||||||
for index, row in BaseData.iterrows(): # 汇总
|
for index, row in BaseData.iterrows(): # 汇总
|
||||||
try:
|
try:
|
||||||
logging.info(f"汇总:第[{index+1}]共[{BaseData.shape[0]}]现在是[{row['身份证号码']}]")
|
logging.info(f"汇总:第[{index+1}]共[{BaseData.shape[0]}]现在是[{row['身份证号码']}]")
|
||||||
for col in range(1,16):
|
for col in range(1,16):
|
||||||
@ -364,9 +396,12 @@ for index, row in BaseData.iterrows(): # 汇总
|
|||||||
ws.cell(row=6+index, column=14, value=format_time(row["日期2"], "日期2"))
|
ws.cell(row=6+index, column=14, value=format_time(row["日期2"], "日期2"))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"{row['身份证号码']}:{e}")
|
logging.error(f"{row['身份证号码']}:{e}")
|
||||||
wb.save("汇总名册.xlsx") # 保存汇总
|
wb.save("汇总名册.xlsx") # 保存汇总
|
||||||
|
|
||||||
if max_promote > 0:
|
if max_promote > 0:
|
||||||
logging.warning(f"最多有[{max_promote}]条晋升信息,需要调整模板。记得同时调整薪资历史的起始行和个人评价结果。")
|
logging.warning(f"最多有[{max_promote}]条晋升信息,需要调整模板。记得同时调整薪资历史的起始行和个人评价结果。")
|
||||||
if max_history > 0:
|
if max_history > 0:
|
||||||
logging.warning(f"最多有[{max_history}]条薪资历史,需要调整模板。")
|
logging.warning(f"最多有[{max_history}]条薪资历史,需要调整模板。")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
Loading…
x
Reference in New Issue
Block a user