Implement file manager

This commit is contained in:
Stanislav Mykhailenko 2023-02-01 19:38:20 +02:00
parent 758302bb2a
commit 1c510c1de8
GPG key ID: 1E95E66A9C9D6A36
7 changed files with 229 additions and 15 deletions

View file

@ -14,7 +14,9 @@ This program is a PC kill switch Telegram bot. It is a project for New Generatio
- works on Windows only
## Usage
Before starting the bot, copy config-example.py file to config.py and put the password and the token. Administrator privileges are required so that the bot is able to disable input and format partitions. Once the bot is started, send /start to its account on Telegram and enter the password to see what can be done.
In order to run the bot, you need a Telegram bot token and Google Drive API credentials. A directory on Google Drive where the bot put its files should also be created.
Before starting the bot, copy config-example.py file to config.py and put the Telegram bot token, the bot protection password and the Google Drive folder ID. Put your Google API credentials into credentials.json file. Administrator privileges are required so that the bot is able to disable input and format partitions. Once the bot is started, send /start to its account on Telegram and enter the password to see what can be done.
## Licensing
All code in this repository is Unlicensed, see UNLICENSE.

View file

@ -9,3 +9,6 @@ token = "put your token here"
# Bot password
master_password = "put your password here"
# Google Drive folder to be used by the program
root_folder = "put folder ID here"

83
googledrive.py Normal file
View file

@ -0,0 +1,83 @@
# Project: Telegram PC kill switch bot
# Author: Stanislav Mykhailenko
# License: Unlicense
# This file contains Google Drive interactions
from __future__ import print_function
import os
from mimetypes import MimeTypes
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaFileUpload
creds = None
def authenticate():
global creds
# The file token.json stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
if os.path.exists(os.path.join(os.path.dirname(__file__), 'token.json')):
creds = Credentials.from_authorized_user_file(os.path.join(os.path.dirname(__file__), 'token.json'), ['https://www.googleapis.com/auth/drive'])
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
os.path.join(os.path.dirname(__file__), 'credentials.json'), ['https://www.googleapis.com/auth/drive'])
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open(os.path.join(os.path.dirname(__file__), 'token.json'), 'w') as token:
token.write(creds.to_json())
def createFolder(name, parent):
try:
service = build('drive', 'v3', credentials=creds)
file_metadata = {
'name': [name],
'mimeType': 'application/vnd.google-apps.folder',
'parents': [parent]
}
file = service.files().create(body=file_metadata, fields='id'
).execute()
return file.get('id')
except HttpError as error:
print(F'An error occurred: {error}')
return None
def uploadFile(path, folder):
try:
service = build('drive', 'v3', credentials=creds)
name = os.path.basename(os.path.normpath(path))
file_metadata = {
'name': [name],
'parents': [folder]
}
mimetype = MimeTypes().guess_type(path)[0]
media = MediaFileUpload(path,
mimetype=mimetype)
file = service.files().create(body=file_metadata, media_body=media,
fields='id').execute()
return file.get('id')
except HttpError as error:
print(F'An error occurred: {error}')
file = None

View file

@ -3,6 +3,8 @@
# License: Unlicense
import ctypes, sys
from googledrive import authenticate
from tg import startBot
def main() -> None:
@ -12,4 +14,5 @@ def main() -> None:
ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, " ".join(sys.argv), None, 1)
if __name__ == '__main__':
authenticate()
main()

View file

@ -4,7 +4,9 @@
# This file contains the payloads
import ctypes, os, pyttsx3
from googledrive import *
from config import *
import ctypes, os, pyttsx3, shutil
def enableInput():
ctypes.windll.user32.BlockInput(False)
@ -23,3 +25,33 @@ def playMessage(message):
engine = pyttsx3.init()
engine.say(message)
engine.runAndWait()
def listData(path):
try:
return os.listdir(path)
except NotADirectoryError:
return "Is a file."
except FileNotFoundError:
return "File or directory not found."
except:
return "An error occurred when trying to access the data."
def uploadFolder(path, parentDirectory):
directory = createFolder(os.path.basename(os.path.normpath(path)), parentDirectory)
entries = os.listdir(path)
for entry in entries:
if os.path.isdir(os.path.join(path, entry)):
uploadFolder(os.path.join(path, entry), directory)
else:
uploadFile(os.path.join(path, entry), directory)
def uploadData(path):
if not os.path.exists(path):
return False
if os.path.isdir(path):
uploadFolder(path, root_folder)
else:
uploadFile(path, root_folder)
def deleteData(path):
shutil.rmtree(path, ignore_errors=True)

View file

@ -1,10 +1,34 @@
anyio==3.6.2
cachetools==5.3.0
certifi==2022.12.7
charset-normalizer==3.0.1
comtypes==1.1.14
google-api-core==2.11.0
google-api-python-client==2.75.0
google-auth==2.16.0
google-auth-httplib2==0.1.0
google-auth-oauthlib==0.8.0
google-drive-api==0.0.4
googleapis-common-protos==1.58.0
h11==0.14.0
httpcore==0.16.3
httplib2==0.21.0
httpx==0.23.3
idna==3.4
oauthlib==3.2.2
protobuf==4.21.12
pyasn1==0.4.8
pyasn1-modules==0.2.8
pyparsing==3.0.9
pypiwin32==223
python-telegram-bot==20.0
pyttsx3==2.90
pywin32==305
requests==2.28.2
requests-oauthlib==1.3.1
rfc3986==1.5.0
rsa==4.9
six==1.16.0
sniffio==1.3.0
uritemplate==4.1.1
urllib3==1.26.14

93
tg.py
View file

@ -8,7 +8,7 @@ from config import *
from telegram import ReplyKeyboardMarkup, ReplyKeyboardRemove, Update
from telegram.ext import Application, CommandHandler, ContextTypes, ConversationHandler, filters, MessageHandler
from payload import *
import logging
import logging, googledrive
from threading import Thread
logging.basicConfig(
@ -17,15 +17,15 @@ logging.basicConfig(
)
logger = logging.getLogger(__name__)
PASSWORD, ACTION, FORMAT, MESSAGE = range(4)
PASSWORD, ACTION, FORMAT, MESSAGE, FILE_MANAGER, LIST, DELETE, UPLOAD = range(8)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
await update.message.reply_text("Enter password.")
return PASSWORD
async def show_keyboard(update: Update):
reply_keyboard = [["Enable input", "Disable input", "Lock screen", "Format volumes", "Play message"]]
async def action_keyboard(update: Update):
reply_keyboard = [["Enable input", "Disable input", "Lock screen", "Format volumes", "Play message", "File manager"]]
await update.message.reply_text(
"Choose your action:",
@ -34,12 +34,23 @@ async def show_keyboard(update: Update):
),
)
async def file_keyboard(update: Update):
reply_keyboard = [["List files", "Upload files", "Delete files", "Go back"]]
await update.message.reply_text(
"Choose your action:",
reply_markup=ReplyKeyboardMarkup(
reply_keyboard, one_time_keyboard=True, input_field_placeholder="Choose your action"
),
)
async def password(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
user = update.message.from_user
if update.message.text == master_password:
logger.info("User %s logged in.", user.first_name)
await show_keyboard(update)
await action_keyboard(update)
return ACTION
@ -56,28 +67,51 @@ async def action(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
logger.info("Got a request from %s to enable input.", user.first_name)
enableInput()
await update.message.reply_text("Input enabled.", reply_markup=ReplyKeyboardRemove())
await show_keyboard(update)
await action_keyboard(update)
return ACTION
case "Disable input":
logger.info("Got a request from %s to disable input.", user.first_name)
disableInput()
await update.message.reply_text("Input disabled.", reply_markup=ReplyKeyboardRemove())
await show_keyboard(update)
await action_keyboard(update)
return ACTION
case "Lock screen":
logger.info("Got a request from %s to lock screen.", user.first_name)
lockScreen()
await update.message.reply_text("Screen locked.", reply_markup=ReplyKeyboardRemove())
await show_keyboard(update)
await action_keyboard(update)
return ACTION
case "Format volumes":
logger.info("Got a request from %s to format volumes.", user.first_name)
await update.message.reply_text("Please type a space-separated list of the volumes you want to format.", reply_markup=ReplyKeyboardRemove())
await update.message.reply_text("Please enter a space-separated list of the volumes you want to format.", reply_markup=ReplyKeyboardRemove())
return FORMAT
case "Play message":
logger.info("Got a request from %s to play a message.", user.first_name)
await update.message.reply_text("Please type the message you want to play.", reply_markup=ReplyKeyboardRemove())
await update.message.reply_text("Please enter the message you want to play.", reply_markup=ReplyKeyboardRemove())
return MESSAGE
case "File manager":
await file_keyboard(update)
return FILE_MANAGER
async def file_manager(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
user = update.message.from_user
match update.message.text:
case "List files":
logger.info("Got a request from %s to list files.", user.first_name)
await update.message.reply_text("Please enter the path.", reply_markup=ReplyKeyboardRemove())
return LIST
case "Upload files":
logger.info("Got a request from %s to upload files.", user.first_name)
await update.message.reply_text("Please enter the path.", reply_markup=ReplyKeyboardRemove())
return UPLOAD
case "Delete files":
logger.info("Got a request from %s to delete files.", user.first_name)
await update.message.reply_text("Please enter the path.", reply_markup=ReplyKeyboardRemove())
return DELETE
case "Go back":
await action_keyboard(update)
return ACTION
async def format(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
user = update.message.from_user
@ -86,7 +120,7 @@ async def format(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
Thread(target=formatVolumes,args=(update.message.text.split(),)).start()
await update.message.reply_text("Command to format the volumes sent.")
await show_keyboard(update)
await action_keyboard(update)
return ACTION
async def message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
@ -95,9 +129,38 @@ async def message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
Thread(target=playMessage,args=(update.message.text,)).start()
await update.message.reply_text("Command to play the message sent.")
await show_keyboard(update)
await action_keyboard(update)
return ACTION
async def list(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
user = update.message.from_user
logger.info("Got a request from %s to list files at %s.", user.first_name, update.message.text)
await update.message.reply_text(listData(os.path.join(update.message.text)))
await file_keyboard(update)
return FILE_MANAGER
async def upload(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
user = update.message.from_user
logger.info("Got a request from %s to upload %s.", user.first_name, update.message.text)
Thread(target=uploadData,args=(os.path.join(update.message.text),)).start()
await update.message.reply_text("Command to upload the data sent.")
await file_keyboard(update)
return FILE_MANAGER
async def delete(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
user = update.message.from_user
logger.info("Got a request from %s to delete %s.", user.first_name, update.message.text)
Thread(target=deleteData,args=(os.path.join(update.message.text),)).start()
await update.message.reply_text("Command to delete the data sent.")
await file_keyboard(update)
return FILE_MANAGER
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
user = update.message.from_user
logger.info("User %s cancelled the conversation.", user.first_name)
@ -112,9 +175,13 @@ def startBot():
entry_points=[CommandHandler("start", start)],
states={
PASSWORD: [MessageHandler(filters.TEXT & ~filters.COMMAND, password)],
ACTION: [MessageHandler(filters.Regex("^(Enable input|Disable input|Lock screen|Format volumes|Play message)$"), action)],
ACTION: [MessageHandler(filters.Regex("^(Enable input|Disable input|Lock screen|Format volumes|Play message|File manager)$"), action)],
FORMAT: [MessageHandler(filters.TEXT & ~filters.COMMAND, format)],
MESSAGE: [MessageHandler(filters.TEXT & ~filters.COMMAND, message)],
FILE_MANAGER: [MessageHandler(filters.Regex("^(List files|Upload files|Delete files|Go back)$"), file_manager)],
LIST: [MessageHandler(filters.TEXT & ~filters.COMMAND, list)],
DELETE: [MessageHandler(filters.TEXT & ~filters.COMMAND, delete)],
UPLOAD: [MessageHandler(filters.TEXT & ~filters.COMMAND, upload)],
},
fallbacks=[CommandHandler("cancel", cancel)],
)