Code|SMTC.fm:获取SMTC音频数据并上传到Last.fm

结月lalalll

自从认识了Last.fm,这网易云用得就是不得劲,本地音乐况且都支持它,所以就开始找能在国内在线音乐平台听歌时候记录Last.fm的
很遗憾,没有
于是就寻思着自己整个程序吧,so开始翻阅Last.fm的文档…

项目地址: https://github.com/lalalll-lalalll/SMTC.fm/

基本思路

程序大体分为三个部分:
1.与Last.fm平台连接并利用开发者Key为用户注册到自己的私人密钥(UserAuthentication)
2.使用系统API获取SMTC中的音乐数据(New_SMTC)
3.将数据存储到临时数据库并上传到Last.fm听歌记录(New_Scrobbling)

具体程序

首先取得API_KEY和SHARED_SECRET的值,以下所有程序直接使用

UserAuthentication

API_sig 获取API方法签名

每次与Last.fm的通讯都需要对所进行的操作进行封装并求出哈希值用以保证安全

1
2
3
4
def generate_api_sig(params):
param_string = ''.join([f"{key}{params[key]}" for key in sorted(params.keys())])
param_string += SHARED_SECRET
return hashlib.md5(param_string.encode('utf-8')).hexdigest()

Authentication Tokens 获取临时令牌

1
2
3
4
5
6
7
8
9
10
11
12
def get_token():
params = {
'method': 'auth.getToken',
'api_key': API_KEY,
'format': 'json'
}
params['api_sig'] = generate_api_sig(params)

url = f'http://ws.audioscrobbler.com/2.0/?{urlencode(params)}'
response = requests.get(url)
data = json.loads(response.content)
return data['token']

首先调用 generate_api_sig 取得本次方法的apisig,接着发送参数,取得返回值定义为 token

Session Key 获取用户密钥

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def get_session(token):
auth_params = {
'api_key': API_KEY,
'token': token,
'method': 'auth.getSession'
}
auth_params['api_sig'] = generate_api_sig(auth_params)

auth_url = f'http://www.last.fm/api/auth/?{urlencode(auth_params)}'
webbrowser.open(auth_url)

print("Once you have authorized, press Enter to continue...")
input()

session_params = {
'api_key': API_KEY,
'method': 'auth.getSession',
'token': token,
}
session_params['api_sig'] = generate_api_sig(session_params)

session_url = f'http://ws.audioscrobbler.com/2.0/?{urlencode(session_params)}'

response = requests.get(session_url)
if response.status_code == 200:
root = ET.fromstring(response.content)
session_key = root.find('.//key').text
print(f"Session key: {session_key}")
return session_key
else:
print(f"Failed to get session. Status code: {response.status_code}")
return None

首先调用 generate_api_sig 取得本次方法的apisig,合成参数进行传入,使用 webbrowser.open 打开浏览器并让用户手动授权,取得返回值并定义为 session key

简单的交互操作

为了方便使用而添加的部分操作引导,调用 pyperclip库 用以剪切板操作


到此为止,我们就获得了用户的私钥 Session Key

New_SMTC

最麻烦的一个部分

初始化操作(定义变量)

1
2
3
4
5
6
7
8
# 获取当前脚本所在的目录
script_dir = os.path.dirname(os.path.abspath(__file__))

# 持久化ID json文件
LAST_ID_FILE = os.path.join(script_dir, 'last_id.json')

# 定义数据库文件路径(与 .py 文件同目录)
db_path = os.path.join(script_dir, 'media_history.db')

初始化操作(创建临时数据库)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 连接到 SQLite 数据库(如果不存在则会自动创建)
conn = sqlite3.connect(db_path)
cursor = conn.cursor()

# 创建 media_history 表
cursor.execute('''
CREATE TABLE IF NOT EXISTS media_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
track TEXT NOT NULL,
artist TEXT NOT NULL,
album TEXT,
timestamp INTEGER NOT NULL
)
''')

# 提交更改并关闭连接
conn.commit()
conn.close()

print(f"数据库文件已创建或连接成功,位于: {db_path}")

创建临时json文件用来确保程序读取的最新数据,创建临时数据库来进行音乐信息的传递

数据库写入操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def insert_media_info(track, artist, album, timestamp):
"""将媒体信息插入到 SQLite 数据库"""
try:
# 连接到数据库
conn = sqlite3.connect(db_path)
cursor = conn.cursor()

# 插入媒体信息
cursor.execute('''
INSERT INTO media_history (track, artist, album, timestamp) VALUES (?, ?, ?, ?)
''', (track, artist, album, timestamp))

# 提交更改
conn.commit()
print("媒体信息已保存到数据库。")
except Exception as e:
print(f"保存媒体信息到数据库时发生错误: {e}")
finally:
# 关闭连接
conn.close()

四项都是日后要进行上传的,没什么好说的

SMTC主程序

初始化类MediaWatcher

1
2
3
4
5
6
class MediaWatcher:
def __init__(self):
self.session = None
self.loop = None # 保存主事件循环的引用
self.last_media_properties = None # 保存上一次的媒体属性,用于去重
self.last_playback_status = None # 保存上一次的播放状态,用于去重

媒体属性变化处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
async def on_media_properties_changed(self, sender, args):
"""处理媒体属性变化的事件回调函数"""
try:
# 检查 sender 是否为有效的会话对象
if not hasattr(sender, "try_get_media_properties_async"):
print("----------")
print("无效的 sender 对象,跳过处理。")
print("----------")
return

# 获取当前播放会话的媒体属性
media_props = await sender.try_get_media_properties_async()

# 检查媒体属性是否有效且发生变化
if media_props and (media_props.title, media_props.artist, media_props.album_title) != self.last_media_properties:
self.last_media_properties = (media_props.title, media_props.artist, media_props.album_title)
print("----------")
print("媒体信息更新:")
print(f"标题: {media_props.title}") # 打印歌曲标题
print(f"艺术家: {media_props.artist}") # 打印艺术家名称
print(f"专辑: {media_props.album_title}") # 打印专辑名称
print("----------")

# 将媒体信息写入数据库
insert_media_info(media_props.title, media_props.artist, media_props.album_title, int(time.time()))

scrobbler = LastFmScrobbler(API_KEY, API_SECRET, SESSION_KEY)
db_handler = MediaDatabaseHandler(db_path)
queue_processor = QueueProcessor(db_handler, scrobbler, LAST_ID_FILE)
queue_processor.process_queue()

except Exception as e:
print(f"获取媒体属性时发生错误: {e}")

检查 sender 是否有效,检查媒体属性是否变化,若变化则更新 self.last_media_properties 用以进行下一次比较,打印歌曲信息并写入数据库,调用 New_Scrobbling 执行上传操作

监听媒体播放状态变化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
async def watch_for_changes(self):
"""监听系统级别的媒体播放状态变化"""
try:
# 获取当前的事件循环,并保存引用
self.loop = asyncio.get_running_loop()

# 请求全局媒体传输控制会话管理器
session_manager = await GlobalSystemMediaTransportControlsSessionManager.request_async()

if session_manager:
# 定义一个内部方法来更新当前会话
async def update_current_session():
current_session = session_manager.get_current_session()
if current_session != self.session:
self.session = current_session
if self.session:
# 注册媒体属性变化事件
self.session.add_media_properties_changed(
lambda sender, args: self._schedule_coroutine(self.on_media_properties_changed(sender, args))
)
else:
print("当前没有活动的媒体会话。")

# 初始更新当前会话
await update_current_session()

print("开始监听媒体控制属性变化...")

# 阻止程序退出,保持监听状态
await asyncio.Event().wait()

except Exception as e:
print(f"监听过程中发生错误: {e}")

关键要请求全局媒体传输控制会话管理器 GlobalSystemMediaTransportControlsSessionManager.request_async

调度异步任务

1
2
3
4
5
def _schedule_coroutine(self, coroutine):
if self.loop and not self.loop.is_closed():
asyncio.run_coroutine_threadsafe(coroutine, self.loop)
else:
print("事件循环不可用,无法调度任务")

其他

剩下的就是创建实例,启动异步任务,在退出时清除临时json文件和临时数据库

New_Scrobbling

New_Scrobbling 直接引用于 New_SMTC

读取用户私钥 session_key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def get_exe_directory():
"""获取 .exe 文件所在的目录"""
if getattr(sys, 'frozen', False): # 判断是否为打包后的 exe 文件
# 如果是打包后的 exe 文件,获取 exe 所在目录
return os.path.dirname(sys.executable)
else:
# 如果是普通 Python 脚本,获取脚本所在目录
return os.path.dirname(os.path.abspath(__file__))

def get_config_path(config_filename):
"""获取配置文件的完整路径"""
exe_dir = get_exe_directory()
config_path = os.path.join(exe_dir, config_filename)
return config_path

# 使用函数获取配置文件路径
config_file = "config.json" # 配置文件名
json_file_path = get_config_path(config_file)

# 检查配置文件是否存在
if os.path.exists(json_file_path):
print(f"Config file found at: {json_file_path}")
# 在这里读取配置文件内容
else:
print("Config file not found!")


# 读取 JSON 文件并解析
try:
with open(json_file_path, 'r', encoding='utf-8') as file:
data = json.load(file)
session_key = data.get('SESSION_KEY') # 获取 SESSION_KEY 的值
if session_key:
print(f"SESSION_KEY: {session_key}")
else:
print("SESSION_KEY 未找到")
except FileNotFoundError:
print(f"文件 {json_file_path} 未找到")
except json.JSONDecodeError as e:
print(f"JSON 解析错误: {e}")

SESSION_KEY = session_key

读取程序运行同目录下 config.json 的值,定义为 session_key

歌曲上传

初始化类LastFmScrobbler

1
2
3
4
5
class LastFmScrobbler:
def __init__(self, api_key, api_secret, session_key):
self.api_key = api_key
self.api_secret = api_secret
self.session_key = session_key

API_sig 获取API方法签名

老套路,当时有个先后写,完全没想到引用模块,于是…造了两套轮子…嘛,也算是"历史原因"了 ╮(╯▽╰)╭

1
2
3
4
5
def generate_api_sig(self, params):
"""生成 API 签名"""
param_string = ''.join([f"{key}{params[key]}" for key in sorted(params.keys())])
param_string += self.api_secret
return hashlib.md5(param_string.encode('utf-8')).hexdigest()

track.scrobble

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def scrobble_track(self, track_info):
"""上传单首歌曲到 Last.fm"""
params = {
'method': 'track.scrobble',
'api_key': self.api_key,
'sk': self.session_key,
'track': track_info['track'],
'artist': track_info['artist'],
'album': track_info.get('album', ''),
'timestamp': track_info['timestamp']
}

# 添加 API 签名
params['api_sig'] = self.generate_api_sig(params)

try:
response = requests.post(API_URL, data=params)
if response.status_code == 200:
print(f"歌曲 '{track_info['track']}' 成功上传到 Last.fm.")
else:
print(f"上传歌曲 '{track_info['track']}' 失败. 状态码: {response.status_code}")
print(response.text)
except requests.RequestException as e:
print(f"请求失败: {e}")

数据库获取

初始化MediaDatabaseHandler类

1
2
3
class MediaDatabaseHandler:
def __init__(self, db_path):
self.db_path = db_path

获取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def get_new_tracks(self, last_id=0):
"""获取数据库中最新的歌曲记录"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT id, track, artist, album, timestamp FROM media_history WHERE id > ? ORDER BY id ASC",
(last_id,)
)
rows = cursor.fetchall()

return [
{'id': row[0], 'track': row[1], 'artist': row[2], 'album': row[3], 'timestamp': row[4]}
for row in rows
]

Scrobbling主程序

初始化QueueProcessor类

1
2
3
4
5
6
class QueueProcessor:
def __init__(self, db_handler, scrobbler, last_id_file):
self.db_handler = db_handler
self.scrobbler = scrobbler
self.last_id_file = last_id_file
self.last_id = self.load_last_id() # 加载上次保存的 last_id

临时记忆历史

1
2
3
4
5
6
7
8
9
10
11
def load_last_id(self):
"""从文件中加载 last_id"""
if os.path.exists(self.last_id_file):
with open(self.last_id_file, 'r') as f:
return json.load(f).get('last_id', 0)
return 0

def save_last_id(self, last_id):
"""保存 last_id 到文件"""
with open(self.last_id_file, 'w') as f:
json.dump({'last_id': last_id}, f)

track.scrobble上传

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def process_queue(self):
"""检查新记录并上传到 Last.fm"""
new_tracks = self.db_handler.get_new_tracks(last_id=self.last_id)
if not new_tracks:
print("没有发现新记录...")
return

print(f"发现 {len(new_tracks)} 条新记录...")
for track in new_tracks:
print(f"处理新记录: {track}")
self.scrobbler.scrobble_track(track)
self.last_id = max(self.last_id, track['id']) # 更新最后处理的 ID

# 保存最新的 last_id
self.save_last_id(self.last_id)

使用函数

1
2
3
4
5
6
7
8
def main(db_path):
"""主函数:初始化组件并启动队列处理器"""
scrobbler = LastFmScrobbler(API_KEY, API_SECRET, SESSION_KEY)
db_handler = MediaDatabaseHandler(db_path)
queue_processor = QueueProcessor(db_handler, scrobbler, LAST_ID_FILE)

# 处理一次队列
queue_processor.process_queue()

就是在 New_SMTC 使用的那段

使用须知!!!

条件限制,本程序自动将所有听过的歌都作为音乐记录发送到Last.fm,
而不遵守
1.总时长必须长于30秒
2.完整播放50%或完整播放4分钟以上
使用即代表你已知晓!

本程序引用的外部库有: pyperclip winrt sqlite3 requests json

  • 本文标题:Code|SMTC.fm:获取SMTC音频数据并上传到Last.fm
  • 本文作者:结月lalalll
  • 创建时间:2025-02-26 19:58:48
  • 本文链接:https://lalalll-lalalll.github.io/code/SMTC.fm/
  • 版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!