feat: initial commit for 100.dathere.com and first exercise

This commit is contained in:
rzmk 2024-05-29 09:03:38 -04:00
commit 86f90af434
35 changed files with 860 additions and 0 deletions

View file

@ -0,0 +1,3 @@
"""A bash kernel for Jupyter"""
__version__ = '0.4.1'

View file

@ -0,0 +1,3 @@
from ipykernel.kernelapp import IPKernelApp
from .kernel import BashKernel
IPKernelApp.launch_instance(kernel_class=BashKernel)

View file

@ -0,0 +1,48 @@
import base64
import imghdr
import os
#from IPython.
_TEXT_SAVED_IMAGE = "bash_kernel: saved image data to:"
image_setup_cmd = """
display () {
TMPFILE=$(mktemp ${TMPDIR-/tmp}/bash_kernel.XXXXXXXXXX)
cat > $TMPFILE
echo "%s $TMPFILE" >&2
}
""" % _TEXT_SAVED_IMAGE
def display_data_for_image(filename):
with open(filename, 'rb') as f:
image = f.read()
os.unlink(filename)
image_type = imghdr.what(None, image)
if image_type is None:
raise ValueError("Not a valid image: %s" % image)
image_data = base64.b64encode(image).decode('ascii')
content = {
'data': {
'image/' + image_type: image_data
},
'metadata': {}
}
return content
def extract_image_filenames(output):
output_lines = []
image_filenames = []
for line in output.split("\n"):
if line.startswith(_TEXT_SAVED_IMAGE):
filename = line.rstrip().split(": ")[-1]
image_filenames.append(filename)
else:
output_lines.append(line)
output = "\n".join(output_lines)
return image_filenames, output

View file

@ -0,0 +1,47 @@
import json
import os
import sys
import getopt
from jupyter_client.kernelspec import KernelSpecManager
from IPython.utils.tempdir import TemporaryDirectory
kernel_json = {"argv":[sys.executable,"-m","bash_kernel", "-f", "{connection_file}"],
"display_name":"Bash",
"language":"bash",
"codemirror_mode":"shell",
"env":{"PS1": "$"}
}
def install_my_kernel_spec(user=True, prefix=None):
with TemporaryDirectory() as td:
os.chmod(td, 0o755) # Starts off as 700, not user readable
with open(os.path.join(td, 'kernel.json'), 'w') as f:
json.dump(kernel_json, f, sort_keys=True)
# TODO: Copy resources once they're specified
print('Installing IPython kernel spec')
KernelSpecManager().install_kernel_spec(td, 'bash', user=user, replace=True, prefix=prefix)
def _is_root():
try:
return os.geteuid() == 0
except AttributeError:
return False # assume not an admin on non-Unix platforms
def main(argv=[]):
prefix = None
user = not _is_root()
opts, _ = getopt.getopt(argv[1:], '', ['user', 'prefix='])
for k, v in opts:
if k == '--user':
user = True
elif k == '--prefix':
prefix = v
user = False
install_my_kernel_spec(user=user, prefix=prefix)
if __name__ == '__main__':
main(argv=sys.argv)

View file

@ -0,0 +1,154 @@
from ipykernel.kernelbase import Kernel
from pexpect import replwrap, EOF
from subprocess import check_output
from os import unlink
import base64
import imghdr
import re
import signal
import urllib
__version__ = '0.2'
version_pat = re.compile(r'version (\d+(\.\d+)+)')
from .images import (
extract_image_filenames, display_data_for_image, image_setup_cmd
)
class BashKernel(Kernel):
implementation = 'bash_kernel'
implementation_version = __version__
@property
def language_version(self):
m = version_pat.search(self.banner)
return m.group(1)
_banner = None
@property
def banner(self):
if self._banner is None:
self._banner = check_output(['bash', '--version']).decode('utf-8')
return self._banner
language_info = {'name': 'bash',
'codemirror_mode': 'shell',
'mimetype': 'text/x-sh',
'file_extension': '.sh'}
def __init__(self, **kwargs):
Kernel.__init__(self, **kwargs)
self._start_bash()
def _start_bash(self):
# Signal handlers are inherited by forked processes, and we can't easily
# reset it from the subprocess. Since kernelapp ignores SIGINT except in
# message handlers, we need to temporarily reset the SIGINT handler here
# so that bash and its children are interruptible.
sig = signal.signal(signal.SIGINT, signal.SIG_DFL)
try:
self.bashwrapper = replwrap.bash()
finally:
signal.signal(signal.SIGINT, sig)
# Register Bash function to write image data to temporary file
self.bashwrapper.run_command(image_setup_cmd)
def do_execute(self, code, silent, store_history=True,
user_expressions=None, allow_stdin=False):
if not code.strip():
return {'status': 'ok', 'execution_count': self.execution_count,
'payload': [], 'user_expressions': {}}
interrupted = False
try:
output = self.bashwrapper.run_command(code.rstrip(), timeout=None)
except KeyboardInterrupt:
self.bashwrapper.child.sendintr()
interrupted = True
self.bashwrapper._expect_prompt()
output = self.bashwrapper.child.before
except EOF:
output = self.bashwrapper.child.before + 'Restarting Bash'
self._start_bash()
if not silent:
image_filenames, output = extract_image_filenames(output)
# Send standard output
stream_content = {'name': 'stdout', 'text': output}
self.send_response(self.iopub_socket, 'stream', stream_content)
# Send images, if any
for filename in image_filenames:
try:
data = display_data_for_image(filename)
except ValueError as e:
message = {'name': 'stdout', 'text': str(e)}
self.send_response(self.iopub_socket, 'stream', message)
else:
self.send_response(self.iopub_socket, 'display_data', data)
if interrupted:
return {'status': 'abort', 'execution_count': self.execution_count}
try:
exitcode = int(self.bashwrapper.run_command('echo $?').rstrip())
except Exception:
exitcode = 1
if exitcode and not (code.rstrip().endswith("-h") or code.rstrip().endswith("--help")):
error_content = {'execution_count': self.execution_count,
'ename': '', 'evalue': str(exitcode), 'traceback': []}
self.send_response(self.iopub_socket, 'error', error_content)
error_content['status'] = 'error'
return error_content
else:
return {'status': 'ok', 'execution_count': self.execution_count,
'payload': [], 'user_expressions': {}}
def do_complete(self, code, cursor_pos):
code = code[:cursor_pos]
default = {'matches': [], 'cursor_start': 0,
'cursor_end': cursor_pos, 'metadata': dict(),
'status': 'ok'}
if not code or code[-1] == ' ':
return default
tokens = code.replace(';', ' ').split()
if not tokens:
return default
matches = []
token = tokens[-1]
start = cursor_pos - len(token)
if token[0] == '$':
# complete variables
cmd = 'compgen -A arrayvar -A export -A variable %s' % token[1:] # strip leading $
output = self.bashwrapper.run_command(cmd).rstrip()
completions = set(output.split())
# append matches including leading $
matches.extend(['$'+c for c in completions])
else:
# complete functions and builtins
cmd = 'compgen -cdfa %s' % token
output = self.bashwrapper.run_command(cmd).rstrip()
matches.extend(output.split())
if not matches:
return default
matches = [m for m in matches if m.startswith(token)]
return {'matches': sorted(matches), 'cursor_start': start,
'cursor_end': cursor_pos, 'metadata': dict(),
'status': 'ok'}