自从认识了Last.fm ,这网易云用得就是不得劲,本地音乐况且都支持它,所以就开始找能在国内在线音乐平台听歌时候记录Last.fm的
很遗憾,没有
于是就寻思着自己整个程序吧,so开始翻阅Last.fm的文档…
项目地址: https://github.com/lalalll-lalalll/SMTC.fm/
基本思路
程序大体分为三个部分:
1.与Last.fm平台连接并利用开发者Key为用户注册到自己的私人密钥(UserAuthentication)
2.使用系统API获取SMTC中的音乐数据(New_SMTC)
3.将数据存储到临时数据库并上传到Last.fm听歌记录(New_Scrobbling)
具体程序
首先取得API_KEY和SHARED_SECRET的值,以下所有程序直接使用
UserAuthentication
API_sig 获取API方法签名
每次与Last.fm的通讯都需要对所进行的操作进行封装并求出哈希值用以保证安全
1 2 3 4 def generate_api_sig (params ): param_string = '' .join([f"{key} {params[key]} " for key in sorted (params.keys())]) param_string += SHARED_SECRET return hashlib.md5(param_string.encode('utf-8' )).hexdigest()
Authentication Tokens 获取临时令牌
1 2 3 4 5 6 7 8 9 10 11 12 def get_token (): params = { 'method' : 'auth.getToken' , 'api_key' : API_KEY, 'format' : 'json' } params['api_sig' ] = generate_api_sig(params) url = f'http://ws.audioscrobbler.com/2.0/?{urlencode(params)} ' response = requests.get(url) data = json.loads(response.content) return data['token' ]
首先调用 generate_api_sig
取得本次方法的apisig,接着发送参数,取得返回值定义为 token
Session Key 获取用户密钥
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 def get_session (token ): auth_params = { 'api_key' : API_KEY, 'token' : token, 'method' : 'auth.getSession' } auth_params['api_sig' ] = generate_api_sig(auth_params) auth_url = f'http://www.last.fm/api/auth/?{urlencode(auth_params)} ' webbrowser.open (auth_url) print ("Once you have authorized, press Enter to continue..." ) input () session_params = { 'api_key' : API_KEY, 'method' : 'auth.getSession' , 'token' : token, } session_params['api_sig' ] = generate_api_sig(session_params) session_url = f'http://ws.audioscrobbler.com/2.0/?{urlencode(session_params)} ' response = requests.get(session_url) if response.status_code == 200 : root = ET.fromstring(response.content) session_key = root.find('.//key' ).text print (f"Session key: {session_key} " ) return session_key else : print (f"Failed to get session. Status code: {response.status_code} " ) return None
首先调用 generate_api_sig
取得本次方法的apisig,合成参数进行传入,使用 webbrowser.open
打开浏览器并让用户手动授权,取得返回值并定义为 session key
简单的交互操作
为了方便使用而添加的部分操作引导,调用 pyperclip
库 用以剪切板操作
到此为止,我们就获得了用户的私钥 Session Key
New_SMTC
最麻烦的一个部分
初始化操作(定义变量)
1 2 3 4 5 6 7 8 script_dir = os.path.dirname(os.path.abspath(__file__)) LAST_ID_FILE = os.path.join(script_dir, 'last_id.json' ) db_path = os.path.join(script_dir, 'media_history.db' )
初始化操作(创建临时数据库)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS media_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, track TEXT NOT NULL, artist TEXT NOT NULL, album TEXT, timestamp INTEGER NOT NULL ) ''' )conn.commit() conn.close() print (f"数据库文件已创建或连接成功,位于: {db_path} " )
创建临时json文件用来确保程序读取的最新数据,创建临时数据库来进行音乐信息的传递
数据库写入操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def insert_media_info (track, artist, album, timestamp ): """将媒体信息插入到 SQLite 数据库""" try : conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO media_history (track, artist, album, timestamp) VALUES (?, ?, ?, ?) ''' , (track, artist, album, timestamp)) conn.commit() print ("媒体信息已保存到数据库。" ) except Exception as e: print (f"保存媒体信息到数据库时发生错误: {e} " ) finally : conn.close()
四项都是日后要进行上传的,没什么好说的
SMTC主程序
1 2 3 4 5 6 class MediaWatcher : def __init__ (self ): self.session = None self.loop = None self.last_media_properties = None self.last_playback_status = None
媒体属性变化处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 async def on_media_properties_changed (self, sender, args ): """处理媒体属性变化的事件回调函数""" try : if not hasattr (sender, "try_get_media_properties_async" ): print ("----------" ) print ("无效的 sender 对象,跳过处理。" ) print ("----------" ) return media_props = await sender.try_get_media_properties_async() if media_props and (media_props.title, media_props.artist, media_props.album_title) != self.last_media_properties: self.last_media_properties = (media_props.title, media_props.artist, media_props.album_title) print ("----------" ) print ("媒体信息更新:" ) print (f"标题: {media_props.title} " ) print (f"艺术家: {media_props.artist} " ) print (f"专辑: {media_props.album_title} " ) print ("----------" ) insert_media_info(media_props.title, media_props.artist, media_props.album_title, int (time.time())) scrobbler = LastFmScrobbler(API_KEY, API_SECRET, SESSION_KEY) db_handler = MediaDatabaseHandler(db_path) queue_processor = QueueProcessor(db_handler, scrobbler, LAST_ID_FILE) queue_processor.process_queue() except Exception as e: print (f"获取媒体属性时发生错误: {e} " )
检查 sender
是否有效,检查媒体属性是否变化,若变化则更新 self.last_media_properties
用以进行下一次比较,打印歌曲信息并写入数据库,调用 New_Scrobbling
执行上传操作
监听媒体播放状态变化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 async def watch_for_changes (self ): """监听系统级别的媒体播放状态变化""" try : self.loop = asyncio.get_running_loop() session_manager = await GlobalSystemMediaTransportControlsSessionManager.request_async() if session_manager: async def update_current_session (): current_session = session_manager.get_current_session() if current_session != self.session: self.session = current_session if self.session: self.session.add_media_properties_changed( lambda sender, args: self._schedule_coroutine(self.on_media_properties_changed(sender, args)) ) else : print ("当前没有活动的媒体会话。" ) await update_current_session() print ("开始监听媒体控制属性变化..." ) await asyncio.Event().wait() except Exception as e: print (f"监听过程中发生错误: {e} " )
关键要请求全局媒体传输控制会话管理器 GlobalSystemMediaTransportControlsSessionManager.request_async
调度异步任务
1 2 3 4 5 def _schedule_coroutine (self, coroutine ): if self.loop and not self.loop.is_closed(): asyncio.run_coroutine_threadsafe(coroutine, self.loop) else : print ("事件循环不可用,无法调度任务" )
其他
剩下的就是创建实例,启动异步任务,在退出时清除临时json文件和临时数据库
New_Scrobbling
New_Scrobbling
直接引用于 New_SMTC
读取用户私钥 session_key
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 def get_exe_directory (): """获取 .exe 文件所在的目录""" if getattr (sys, 'frozen' , False ): return os.path.dirname(sys.executable) else : return os.path.dirname(os.path.abspath(__file__)) def get_config_path (config_filename ): """获取配置文件的完整路径""" exe_dir = get_exe_directory() config_path = os.path.join(exe_dir, config_filename) return config_path config_file = "config.json" json_file_path = get_config_path(config_file) if os.path.exists(json_file_path): print (f"Config file found at: {json_file_path} " ) else : print ("Config file not found!" ) try : with open (json_file_path, 'r' , encoding='utf-8' ) as file: data = json.load(file) session_key = data.get('SESSION_KEY' ) if session_key: print (f"SESSION_KEY: {session_key} " ) else : print ("SESSION_KEY 未找到" ) except FileNotFoundError: print (f"文件 {json_file_path} 未找到" ) except json.JSONDecodeError as e: print (f"JSON 解析错误: {e} " ) SESSION_KEY = session_key
读取程序运行同目录下 config.json
的值,定义为 session_key
歌曲上传
初始化类LastFmScrobbler
1 2 3 4 5 class LastFmScrobbler : def __init__ (self, api_key, api_secret, session_key ): self.api_key = api_key self.api_secret = api_secret self.session_key = session_key
API_sig 获取API方法签名
老套路,当时有个先后写,完全没想到引用模块,于是…造了两套轮子…嘛,也算是"历史原因"了 ╮(╯▽╰)╭
1 2 3 4 5 def generate_api_sig (self, params ): """生成 API 签名""" param_string = '' .join([f"{key} {params[key]} " for key in sorted (params.keys())]) param_string += self.api_secret return hashlib.md5(param_string.encode('utf-8' )).hexdigest()
track.scrobble
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def scrobble_track (self, track_info ): """上传单首歌曲到 Last.fm""" params = { 'method' : 'track.scrobble' , 'api_key' : self.api_key, 'sk' : self.session_key, 'track' : track_info['track' ], 'artist' : track_info['artist' ], 'album' : track_info.get('album' , '' ), 'timestamp' : track_info['timestamp' ] } params['api_sig' ] = self.generate_api_sig(params) try : response = requests.post(API_URL, data=params) if response.status_code == 200 : print (f"歌曲 '{track_info['track' ]} ' 成功上传到 Last.fm." ) else : print (f"上传歌曲 '{track_info['track' ]} ' 失败. 状态码: {response.status_code} " ) print (response.text) except requests.RequestException as e: print (f"请求失败: {e} " )
数据库获取
1 2 3 class MediaDatabaseHandler : def __init__ (self, db_path ): self.db_path = db_path
获取数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def get_new_tracks (self, last_id=0 ): """获取数据库中最新的歌曲记录""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( "SELECT id, track, artist, album, timestamp FROM media_history WHERE id > ? ORDER BY id ASC" , (last_id,) ) rows = cursor.fetchall() return [ {'id' : row[0 ], 'track' : row[1 ], 'artist' : row[2 ], 'album' : row[3 ], 'timestamp' : row[4 ]} for row in rows ]
Scrobbling主程序
初始化QueueProcessor类
1 2 3 4 5 6 class QueueProcessor : def __init__ (self, db_handler, scrobbler, last_id_file ): self.db_handler = db_handler self.scrobbler = scrobbler self.last_id_file = last_id_file self.last_id = self.load_last_id()
临时记忆历史
1 2 3 4 5 6 7 8 9 10 11 def load_last_id (self ): """从文件中加载 last_id""" if os.path.exists(self.last_id_file): with open (self.last_id_file, 'r' ) as f: return json.load(f).get('last_id' , 0 ) return 0 def save_last_id (self, last_id ): """保存 last_id 到文件""" with open (self.last_id_file, 'w' ) as f: json.dump({'last_id' : last_id}, f)
track.scrobble上传
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def process_queue (self ): """检查新记录并上传到 Last.fm""" new_tracks = self.db_handler.get_new_tracks(last_id=self.last_id) if not new_tracks: print ("没有发现新记录..." ) return print (f"发现 {len (new_tracks)} 条新记录..." ) for track in new_tracks: print (f"处理新记录: {track} " ) self.scrobbler.scrobble_track(track) self.last_id = max (self.last_id, track['id' ]) self.save_last_id(self.last_id)
使用函数
1 2 3 4 5 6 7 8 def main (db_path ): """主函数:初始化组件并启动队列处理器""" scrobbler = LastFmScrobbler(API_KEY, API_SECRET, SESSION_KEY) db_handler = MediaDatabaseHandler(db_path) queue_processor = QueueProcessor(db_handler, scrobbler, LAST_ID_FILE) queue_processor.process_queue()
就是在 New_SMTC
使用的那段
使用须知!!!
条件限制,本程序自动将所有听过的歌都作为音乐记录发送到Last.fm ,
而不遵守
1.总时长必须长于30秒
2.完整播放50%或完整播放4分钟以上
使用即代表你已知晓!
本程序引用的外部库有: pyperclip
winrt
sqlite3
requests
json