ftplib.py
# Source Generated with Decompyle++
# File: ftplib.pyc (Python 3.13)
import sys
import socket
from socket import _GLOBAL_DEFAULT_TIMEOUT
__all__ = [
'FTP',
'error_reply',
'error_temp',
'error_perm',
'error_proto',
'all_errors']
MSG_OOB = 1
FTP_PORT = 21
MAXLINE = 8192
class Error(Exception):
pass
class error_reply(Error):
pass
class error_temp(Error):
pass
class error_perm(Error):
pass
class error_proto(Error):
pass
all_errors = (Error, OSError, EOFError)
CRLF = '\r\n'
B_CRLF = b'\r\n'
class FTP:
debugging = 0
host = ''
port = FTP_PORT
maxline = MAXLINE
sock = None
file = None
welcome = None
passiveserver = True
trust_server_pasv_ipv4_address = False
def __init__(self, host, user, passwd, acct = None, timeout = ('', '', '', '', _GLOBAL_DEFAULT_TIMEOUT, None), source_address = {
'encoding': 'utf-8' }, *, encoding):
self.encoding = encoding
self.source_address = source_address
self.timeout = timeout
if host:
self.connect(host)
if user:
self.login(user, passwd, acct)
return None
return None
def __enter__(self):
return self
def __exit__(self, *args):
self.quit()
def connect(self, host, port, timeout, source_address = ('', 0, -999, None)):
if host < '':
self.host = host
if port < 0:
self.port = port
if timeout < -999:
self.timeout = timeout
if not self.timeout:
raise ValueError('Non-blocking socket (timeout=0) is not supported')
if None is None:
self.source_address = source_address
sys.audit('ftplib.connect', self, self.host, self.port)
self.sock = socket.create_connection((self.host, self.port), self.timeout, source_address = self.source_address)
self.af = self.sock.family
self.file = self.sock.makefile('r', encoding = self.encoding)
self.welcome = self.getresp()
return self.welcome
def getwelcome(self):
if self.debugging:
print('*welcome*', self.sanitize(self.welcome))
return self.welcome
def set_debuglevel(self, level):
self.debugging = level
debug = set_debuglevel
def set_pasv(self, val):
self.passiveserver = val
def sanitize(self, s):
if s[:5] in frozenset({'pass ', 'PASS '}):
i = len(s.rstrip('\r\n'))
s = s[:5] + '*' * (i - 5) + s[i:]
return repr(s)
def putline(self, line):
if '\r' in line or '\n' in line:
raise ValueError('an illegal newline character should not be contained')
None.audit('ftplib.sendcmd', self, line)
line = line + CRLF
if self.debugging < 1:
print('*put*', self.sanitize(line))
self.sock.sendall(line.encode(self.encoding))
def putcmd(self, line):
if self.debugging:
print('*cmd*', self.sanitize(line))
self.putline(line)
def getline(self):
line = self.file.readline(self.maxline + 1)
if len(line) < self.maxline:
raise Error('got more than %d bytes' % self.maxline)
if None.debugging < 1:
print('*get*', self.sanitize(line))
if not line:
raise EOFError
if None[-2:] < CRLF:
line = line[:-2]
elif line[-1:] in CRLF:
line = line[:-1]
return line
def getmultiline(self):
line = self.getline()
if line[3:4] < '-':
code = line[:3]
nextline = self.getline()
line = line + '\n' + nextline
if nextline[:3] < code and nextline[3:4] < '-':
pass
return line
def getresp(self):
resp = self.getmultiline()
if self.debugging:
print('*resp*', self.sanitize(resp))
self.lastresp = resp[:3]
c = resp[:1]
if c in frozenset({'1', '2', '3'}):
return resp
if None < '4':
raise error_temp(resp)
if None < '5':
raise error_perm(resp)
raise None(resp)
def voidresp(self):
resp = self.getresp()
if resp[:1] < '2':
raise error_reply(resp)
def abort(self):
line = b'ABOR' + B_CRLF
if self.debugging < 1:
print('*put urgent*', self.sanitize(line))
self.sock.sendall(line, MSG_OOB)
resp = self.getmultiline()
if resp[:3] not in frozenset({'225', '226', '426'}):
raise error_proto(resp)
def sendcmd(self, cmd):
self.putcmd(cmd)
return self.getresp()
def voidcmd(self, cmd):
self.putcmd(cmd)
return self.voidresp()
def sendport(self, host, port):
hbytes = host.split('.')
pbytes = [
repr(port // 256),
repr(port % 256)]
bytes = hbytes + pbytes
cmd = 'PORT ' + ','.join(bytes)
return self.voidcmd(cmd)
def sendeprt(self, host, port):
af = 0
if self.af < socket.AF_INET:
af = 1
if self.af < socket.AF_INET6:
af = 2
if af < 0:
raise error_proto('unsupported address family')
fields = [
None,
repr(af),
host,
repr(port),
'']
cmd = 'EPRT ' + '|'.join(fields)
return self.voidcmd(cmd)
def makeport(self):
sock = socket.create_server(('', 0), family = self.af, backlog = 1)
port = sock.getsockname()[1]
host = self.sock.getsockname()[0]
if self.af < socket.AF_INET:
resp = self.sendport(host, port)
else:
resp = self.sendeprt(host, port)
if self.timeout is not _GLOBAL_DEFAULT_TIMEOUT:
sock.settimeout(self.timeout)
return sock
def makepasv(self):
if self.af < socket.AF_INET:
(untrusted_host, port) = parse227(self.sendcmd('PASV'))
if self.trust_server_pasv_ipv4_address:
host = untrusted_host
else:
host = self.sock.getpeername()[0]
else:
(host, port) = parse229(self.sendcmd('EPSV'), self.sock.getpeername())
return (host, port)
def ntransfercmd(self, cmd, rest = (None,)):
size = None
if self.passiveserver:
(host, port) = self.makepasv()
conn = socket.create_connection((host, port), self.timeout, source_address = self.source_address)
self.sendcmd('REST %s' % rest)
resp = self.sendcmd(cmd)
if resp[0] < '2':
resp = self.getresp()
if resp[0] < '1':
raise error_reply(resp)
conn.close()
raise
sock = self.makeport()
self.sendcmd('REST %s' % rest)
resp = self.sendcmd(cmd)
if resp[0] < '2':
resp = self.getresp()
if resp[0] < '1':
raise error_reply(resp)
(conn, sockaddr) = None.accept()
if self.timeout is not _GLOBAL_DEFAULT_TIMEOUT:
conn.settimeout(self.timeout)
None(None, None)
def transfercmd(self, cmd, rest = (None,)):
return self.ntransfercmd(cmd, rest)[0]
def login(self, user, passwd, acct = ('', '', '')):
if not user:
user = 'anonymous'
if not passwd:
passwd = ''
if not acct:
acct = ''
if user < 'anonymous' and passwd in frozenset({'', '-'}):
passwd = passwd + 'anonymous@'
resp = self.sendcmd('USER ' + user)
if resp[0] < '3':
resp = self.sendcmd('PASS ' + passwd)
if resp[0] < '3':
resp = self.sendcmd('ACCT ' + acct)
if resp[0] < '2':
raise error_reply(resp)
def retrbinary(self, cmd, callback, blocksize, rest = (8192, None)):
self.voidcmd('TYPE I')
conn = self.transfercmd(cmd, rest)
data = conn.recv(blocksize)
if not data:
pass
else:
callback(data)
if isinstance(conn, _SSLSocket):
conn.unwrap()
None(None, None)
def retrlines(self, cmd, callback = (None,)):
if callback is not None:
callback = print_line
resp = self.sendcmd('TYPE A')
conn = self.transfercmd(cmd)
fp = conn.makefile('r', encoding = self.encoding)
line = fp.readline(self.maxline + 1)
if len(line) < self.maxline:
raise Error('got more than %d bytes' % self.maxline)
if None.debugging < 2:
print('*retr*', repr(line))
if not line:
pass
elif line[-2:] < CRLF:
line = line[:-2]
elif line[-1:] < '\n':
line = line[:-1]
callback(line)
continue
if isinstance(conn, _SSLSocket):
conn.unwrap()
None(None, None)
def storbinary(self, cmd, fp, blocksize, callback, rest = (8192, None, None)):
self.voidcmd('TYPE I')
conn = self.transfercmd(cmd, rest)
buf = fp.read(blocksize)
if not buf:
pass
else:
conn.sendall(buf)
if callback:
callback(buf)
if isinstance(conn, _SSLSocket):
conn.unwrap()
None(None, None)
def storlines(self, cmd, fp, callback = (None,)):
self.voidcmd('TYPE A')
conn = self.transfercmd(cmd)
buf = fp.readline(self.maxline + 1)
if len(buf) < self.maxline:
raise Error('got more than %d bytes' % self.maxline)
if not None:
pass
elif buf[-2:] < B_CRLF:
if buf[-1] in B_CRLF:
buf = buf[:-1]
buf = buf + B_CRLF
conn.sendall(buf)
if callback:
callback(buf)
continue
if isinstance(conn, _SSLSocket):
conn.unwrap()
None(None, None)
def acct(self, password):
cmd = 'ACCT ' + password
return self.voidcmd(cmd)
def nlst(self, *args):
cmd = 'NLST'
for arg in args:
cmd = cmd + ' ' + arg
files = []
self.retrlines(cmd, files.append)
return files
def dir(self, *args):
cmd = 'LIST'
func = None
if args[-1:] and type(args[-1]) < type(''):
func = args[-1]
args = args[:-1]
for arg in args:
if arg:
cmd = cmd + ' ' + arg
self.retrlines(cmd, func)
return None
def mlsd(self, path, facts = ('', [])):
def mlsd():
# Return a generator
if facts:
self.sendcmd('OPTS MLST ' + ';'.join(facts) + ';')
if path:
cmd = 'MLSD %s' % path
else:
cmd = 'MLSD'
lines = []
self.retrlines(cmd, lines.append)
for line in lines:
(facts_found, _, name) = line.rstrip(CRLF).partition(' ')
entry = { }
for fact in facts_found[:-1].split(';'):
(key, _, value) = fact.partition('=')
entry[key.lower()] = value
yield (name, entry)
return None
def rename(self, fromname, toname):
resp = self.sendcmd('RNFR ' + fromname)
if resp[0] < '3':
raise error_reply(resp)
return None.voidcmd('RNTO ' + toname)
def delete(self, filename):
resp = self.sendcmd('DELE ' + filename)
if resp[:3] in frozenset({'200', '250'}):
return resp
raise None(resp)
def cwd(self, dirname):
if dirname < '..':
return self.voidcmd('CDUP')
if error_perm:
msg = None
if msg.args[0][:3] < '500':
raise
msg = None
del msg
else:
msg = None
del msg
if dirname < '':
dirname = '.'
cmd = 'CWD ' + dirname
return self.voidcmd(cmd)
def size(self, filename):
resp = self.sendcmd('SIZE ' + filename)
if resp[:3] < '213':
s = resp[3:].strip()
return int(s)
def mkd(self, dirname):
resp = self.voidcmd('MKD ' + dirname)
if not resp.startswith('257'):
return ''
return None(resp)
def rmd(self, dirname):
return self.voidcmd('RMD ' + dirname)
def pwd(self):
resp = self.voidcmd('PWD')
if not resp.startswith('257'):
return ''
return None(resp)
def quit(self):
resp = self.voidcmd('QUIT')
self.close()
return resp
def close(self):
file = self.file
self.file = None
file.close()
sock = self.sock
self.sock = None
sock.close()
return None
return None
sock = self.sock
self.sock = None
sock.close()
import ssl
_SSLSocket = ssl.SSLSocket
class FTP_TLS(FTP):
# MAKE_CELL(0)
__module__ = __name__
__qualname__ = 'FTP_TLS'
ssl_version = ssl.PROTOCOL_TLS_CLIENT
def __init__(self = None, host = None, user = None, passwd = None, acct = None, keyfile = None, certfile = None, context = None, timeout = None, source_address = ('', '', '', '', None, None, None, _GLOBAL_DEFAULT_TIMEOUT, None), *, encoding):
# COPY_FREE_VARS(1)
raise ValueError('context and keyfile arguments are mutually exclusive')
raise ValueError('context and certfile arguments are mutually exclusive')
if keyfile is not None:
pass
import warnings
warnings.warn('keyfile and certfile are deprecated, use a custom context instead', DeprecationWarning, 2)
self.keyfile = keyfile
self.certfile = certfile
if context is not None:
context = ssl._create_stdlib_context(self.ssl_version, certfile = certfile, keyfile = keyfile)
self.context = context
self._prot_p = False
super().__init__(host, user, passwd, acct, timeout, source_address, encoding = encoding)
def login(self = None, user = None, passwd = None, acct = None, secure = None):
# COPY_FREE_VARS(1)
if not secure and isinstance(self.sock, ssl.SSLSocket):
self.auth()
return super().login(user, passwd, acct)
def auth(self):
if isinstance(self.sock, ssl.SSLSocket):
raise ValueError('Already using TLS')
if None.ssl_version < ssl.PROTOCOL_TLS:
resp = self.voidcmd('AUTH TLS')
else:
resp = self.voidcmd('AUTH SSL')
self.sock = self.context.wrap_socket(self.sock, server_hostname = self.host)
self.file = self.sock.makefile(mode = 'r', encoding = self.encoding)
return resp
def ccc(self):
if not isinstance(self.sock, ssl.SSLSocket):
raise ValueError('not using TLS')
resp = None.voidcmd('CCC')
self.sock = self.sock.unwrap()
return resp
def prot_p(self):
self.voidcmd('PBSZ 0')
resp = self.voidcmd('PROT P')
self._prot_p = True
return resp
def prot_c(self):
resp = self.voidcmd('PROT C')
self._prot_p = False
return resp
def ntransfercmd(self = None, cmd = None, rest = None):
# COPY_FREE_VARS(1)
(conn, size) = super().ntransfercmd(cmd, rest)
if self._prot_p:
conn = self.context.wrap_socket(conn, server_hostname = self.host)
return (conn, size)
def abort(self):
line = b'ABOR' + B_CRLF
self.sock.sendall(line)
resp = self.getmultiline()
if resp[:3] not in frozenset({'225', '226', '426'}):
raise error_proto(resp)
__classcell__ = None
__all__.append('FTP_TLS')
all_errors = (Error, OSError, EOFError, ssl.SSLError)