import logging
import os
import io
import functools
import requests
from enum import Enum
from hashlib import sha256 as _sha256, sha1 as _sha1, md5 as _md5
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
from future.utils import raise_from, string_types
# Windows might rase an OSError instead of an ImportError like this
# OSError: [WinError 193] %1 is not a valid Win32 application
try:
import yara
except (ImportError, OSError):
from polyswarm_api import exceptions, core, settings
[docs]logger = logging.getLogger(__name__)
#####################################################################
# Resources returned by the API
#####################################################################
[docs]class Engine(core.BaseJsonResource):
[docs] RESOURCE_ENDPOINT = '/microengines'
def __init__(self, content, api=None):
super(Engine, self).__init__(content=content, api=api)
self.id = str(content['id'])
self.name = content['name']
try:
self.address = content['address'].lower()
except:
self.address = None
account_number = content.get('accountNumber')
self.account_number = str(account_number) if account_number else None
self.engine_type = content.get('engineType', 'microengine')
self.is_microengine = self.engine_type == 'microengine'
self.is_arbiter = self.engine_type == 'arbiter'
self.status = content.get('status', 'disabled')
self.verified = self.status == 'verified'
# These fields can be `null`; don't replace w/ default value in `get()`
self.artifact_types = set(content.get('artifactTypes') or [])
self.tags = set(content.get('tags') or [])
self.communities = set(content.get('communities') or [])
self.mimetypes = set(content.get('mimeTypes') or [])
self.created_at = core.parse_isoformat(content.get('createdAt'))
self.modified_at = core.parse_isoformat(content.get('modifiedAt'))
self.archived_at = core.parse_isoformat(content.get('archivedAt'))
@classmethod
[docs] def __hash__(self):
return hash(self.id)
[docs] def __eq__(self, other):
return self.id == other.id if isinstance(other, Engine) else False
[docs] def __repr__(self):
return '{}(name={}, address={}, status={}, engine_type={})'.format(
self.__class__.__name__,
self.name,
self.address,
self.status,
self.engine_type,
)
[docs]class IOC(core.BaseJsonResource):
[docs] RESOURCE_ENDPOINT = '/ioc'
@classmethod
[docs] def iocs_by_hash(cls, api, hash_value, hash_type, hide_known_good=False):
return core.PolyswarmRequest(
api,
{
'method': 'GET',
'url': '{}/ioc/{}/{}'.format(api.uri, hash_type, hash_value),
'params': {
'hide_known_good': hide_known_good,
'community': api.community,
},
},
result_parser=cls,
).execute()
@classmethod
[docs] def ioc_search(cls, api, ip=None, domain=None, ttp=None, imphash=None):
params = dict(community=api.community)
if ip is not None:
params['ip'] = ip
if domain is not None:
params['domain'] = domain
if ttp is not None:
params['ttp'] = ttp
if imphash is not None:
params['imphash'] = imphash
return core.PolyswarmRequest(
api,
{
'method': 'GET',
'url': '{}/ioc/search'.format(api.uri),
'params': params
},
result_parser=cls,
).execute()
@classmethod
[docs] def check_known_hosts(cls, api, ips, domains):
return core.PolyswarmRequest(
api,
{
'method': 'GET',
'url': '{}/ioc/known'.format(api.uri),
'params': {
'ip': ips,
'domain': domains
}
},
result_parser=cls,
).execute()
@classmethod
[docs] def create_known_good(cls, api, type, host, source):
return core.PolyswarmRequest(
api,
{
'method': 'POST',
'url': '{}/ioc/known'.format(api.uri),
'json': {
'type': type,
'host': host,
'source': source,
'good': True
}
},
result_parser=cls,
).execute()
@classmethod
[docs] def update_known_good(cls, api, id, type, host, source, good):
return core.PolyswarmRequest(
api,
{
'method': 'PUT',
'url': '{}/ioc/known'.format(api.uri),
'json': {
'id': id,
'type': type,
'host': host,
'source': source,
'good': good
}
},
result_parser=cls,
).execute()
@classmethod
[docs] def delete_known_good(cls, api, id):
return core.PolyswarmRequest(
api,
{
'method': 'DELETE',
'url': '{}/ioc/known'.format(api.uri),
'params': {
'id': id
}
},
result_parser=cls,
).execute()
[docs]class ArtifactInstance(core.BaseJsonResource, core.Hashable):
[docs] RESOURCE_ENDPOINT = '/instance'
def __init__(self, content, api=None):
super(ArtifactInstance, self).__init__(content=content, api=api,
hash_value=content['sha256'], hash_type='sha256')
# Artifact fields
self.sha256 = content['sha256']
self.artifact_id = content.get('artifact_id')
self.md5 = content['md5']
self.sha1 = content['sha1']
self.mimetype = content['mimetype']
self.size = content['size']
self.extended_type = content['extended_type']
self.first_seen = core.parse_isoformat(content['first_seen'])
self.upload_url = content['upload_url']
# Deprecated
self.last_seen = core.parse_isoformat(content.get('last_seen'))
self.last_scanned = core.parse_isoformat(content.get('last_scanned'))
metadata_json = content.get('metadata') or []
metadata = {metadata['tool']: metadata['tool_metadata'] for metadata in metadata_json}
self.metadata = Metadata(metadata, api)
# ArtifactInstance fields
self.id = content.get('id')
self.assertions = [Assertion(a, api=api, scanfile=self) for a in content.get('assertions', [])]
self.country = content.get('country')
self.community = content.get('community')
self.created = core.parse_isoformat(content.get('created'))
self.failed = content.get('failed')
self.filename = content.get('filename')
self.result = content.get('result')
self.type = content.get('type')
self.votes = [Vote(v, api=api, scanfile=self) for v in content.get('votes', [])]
self.window_closed = content.get('window_closed')
self.polyscore = float(content['polyscore']) if content.get('polyscore') is not None else None
self.permalink = settings.DEFAULT_PERMALINK_BASE + '/' + str(self.hash) + '/' + str(self.id)
self._malicious_assertions = None
self._benign_assertions = None
self._valid_assertions = None
[docs] def upload_file(self, artifact, attempts=3, **kwargs):
if not self.upload_url:
raise exceptions.InvalidValueException('upload_url must be set to upload a file')
if not artifact:
raise exceptions.InvalidValueException('A LocalArtifact must be provided in order to upload')
r = None
while attempts > 0 and not r:
attempts -= 1
artifact.seek(0, io.SEEK_END)
length = artifact.tell()
artifact.seek(0)
# https://github.com/psf/requests/issues/4215#issuecomment-319521235
# We have to manually handle the case when the file is empty
# in a way that requests won't set Transfer-Encoding: chunked
if not length:
artifact = ''
r = requests.put(self.upload_url, data=artifact, **kwargs)
r.raise_for_status()
return r
@classmethod
[docs] def exists_hash(cls, api, hash_value, hash_type):
return core.PolyswarmRequest(
api,
{
'method': 'HEAD',
'url': '{}/search/hash/{}'.format(api.uri, hash_type),
'params': {
'hash': hash_value,
'community': api.community,
},
},
).execute()
@classmethod
[docs] def search_hash(cls, api, hash_value, hash_type):
return core.PolyswarmRequest(
api,
{
'method': 'GET',
'url': '{}/search/hash/{}'.format(api.uri, hash_type),
'params': {
'hash': hash_value,
'community': api.community,
},
},
result_parser=cls,
).execute()
@classmethod
[docs] def search_url(cls, api, url):
return core.PolyswarmRequest(
api,
{
'method': 'GET',
'url': '{}/search/url'.format(api.uri),
'params': {
'url': url,
'community': api.community,
},
},
result_parser=cls,
).execute()
@classmethod
[docs] def list_scans(cls, api, hash_value):
return core.PolyswarmRequest(
api,
{
'method': 'GET',
'url': '{}/search/instances'.format(api.uri),
'params': {
'hash': hash_value,
'community': api.community,
},
},
result_parser=cls,
).execute()
@classmethod
[docs] def submit(cls, api, artifact, artifact_name, artifact_type, scan_config=None):
parameters = {
'method': 'POST',
'url': '{}/consumer/submission/{}'.format(api.uri, api.community),
'files': {
'file': (artifact_name, artifact),
},
# very oddly, when included in files parameter this errors out
'data': {
'artifact-type': artifact_type,
}
}
if scan_config:
parameters['data']['scan-config'] = scan_config
return core.PolyswarmRequest(
api,
parameters,
result_parser=cls,
).execute()
@classmethod
[docs] def rescan(cls, api, hash_value, hash_type, scan_config=None):
parameters = {
'method': 'POST',
'url': '{}/consumer/submission/{}/rescan/{}/{}'.format(api.uri, api.community, hash_type, hash_value),
'data': { 'community': api.community }
}
if scan_config:
parameters['data']['scan-config'] = scan_config
return core.PolyswarmRequest(
api,
parameters,
result_parser=cls,
).execute()
@classmethod
[docs] def rescan_id(cls, api, submission_id, scan_config=None):
parameters = {
'method': 'POST',
'url': '{}/consumer/submission/{}/rescan/{}'.format(api.uri, api.community, int(submission_id)),
}
if scan_config:
parameters.setdefault('data', {})['scan-config'] = scan_config
return core.PolyswarmRequest(
api,
parameters,
result_parser=cls,
).execute()
@classmethod
[docs] def lookup_uuid(cls, api, submission_id):
return core.PolyswarmRequest(
api,
{
'method': 'GET',
'url': '{}/consumer/submission/{}/{}'.format(api.uri, api.community, int(submission_id)),
},
result_parser=cls,
).execute()
@classmethod
).execute()
[docs] def __str__(self):
return "ArtifactInstance-<%s>" % self.hash
@property
[docs] def malicious_assertions(self):
if not self._malicious_assertions:
self._malicious_assertions = [a for a in self.assertions if a.mask and a.verdict]
return self._malicious_assertions
@property
[docs] def benign_assertions(self):
if not self._benign_assertions:
self._benign_assertions = [a for a in self.assertions if a.mask and not a.verdict]
return self._benign_assertions
@property
[docs] def valid_assertions(self):
if not self._valid_assertions:
self._valid_assertions = [a for a in self.assertions if a.mask]
return self._valid_assertions
[docs]class ArtifactArchive(core.BaseJsonResource):
[docs] RESOURCE_ENDPOINT = '/consumer/download/stream'
def __init__(self, content, api=None):
super(ArtifactArchive, self).__init__(content=content, api=api)
self.id = content['id']
self.community = content['community']
self.created = core.parse_isoformat(content['created'])
self.uri = content['uri']
[docs]class AssertionsJob(core.BaseJsonResource):
[docs] RESOURCE_ENDPOINT = '/consumer/assertions-job'
def __init__(self, content, api=None):
super(AssertionsJob, self).__init__(content=content, api=api)
self.id = content['id']
self.engine_id = content['engine_id']
self.created = core.parse_isoformat(content['created'])
self.date_start = core.parse_isoformat(content['date_start'])
self.date_end = core.parse_isoformat(content['date_end'])
self.storage_path = content['storage_path']
self.true_positive = content['true_positive']
self.true_negative = content['true_negative']
self.false_positive = content['false_positive']
self.false_negative = content['false_negative']
self.suspicious = content['suspicious']
self.unknown = content['unknown']
self.total = content['total']
[docs]class VotesJob(core.BaseJsonResource):
[docs] RESOURCE_ENDPOINT = '/consumer/votes-job'
def __init__(self, content, api=None):
super(VotesJob, self).__init__(content=content, api=api)
self.id = content['id']
self.engine_id = content['engine_id']
self.created = core.parse_isoformat(content['created'])
self.date_start = core.parse_isoformat(content['date_start'])
self.date_end = core.parse_isoformat(content['date_end'])
self.storage_path = content['storage_path']
self.true_positive = content['true_positive']
self.true_negative = content['true_negative']
self.false_positive = content['false_positive']
self.false_negative = content['false_negative']
self.suspicious = content['suspicious']
self.unknown = content['unknown']
self.total = content['total']
[docs]def _read_chunks(file_handle):
while True:
data = file_handle.read(settings.FILE_CHUNK_SIZE)
if not data:
break
yield data
[docs]def all_hashes(file_handle, algorithms=(_sha256, _sha1, _md5)):
hashers = [alg() for alg in algorithms]
for data in _read_chunks(file_handle):
[h.update(data) for h in hashers]
return [Hash(h.hexdigest()) for h in hashers]
[docs]class LocalArtifact(core.BaseResource, core.Hashable):
""" Artifact for which we have local content """
def __init__(self, response, api=None, handle=None, folder=None,
artifact_name=None, artifact_type=None, analyze=False, **kwargs):
"""
A representation of an artifact we have locally
:param artifact_name: Name of the artifact
:param artifact_type: Type of artifact
:param api: PolyswarmAPI instance
:param analyze: Boolean, if True will run analyses on artifact on startup (Note: this may still run later if False)
"""
# check if we have a destination to store the file
# raise an error if we don't have exacltly one
if folder and handle:
raise exceptions.InvalidValueException('Only one of path or handle should be defined.')
if not (folder or handle):
raise exceptions.InvalidValueException('At least one of path or handle must be defined.')
# initialize super classes and default values
super(LocalArtifact, self).__init__(response, api=api, hash_type='sha256')
self.sha256 = None
self.sha1 = None
self.md5 = None
self.analyzed = False
self.artifact_type = artifact_type or ArtifactType.FILE
# resolve the file name
if artifact_name:
# prioritize explicitly provided name
self.artifact_name = artifact_name
else:
if response:
# respect content-disposition if there is a response
filename = response.headers.get('content-disposition', '').partition('filename=')[2]
if filename:
self.artifact_name = filename
elif os.path.basename(getattr(handle, 'name', '')):
self.artifact_name = os.path.basename(getattr(handle, 'name', ''))
else:
self.artifact_name = os.path.basename(urlparse(response.url).path)
elif os.path.basename(getattr(handle, 'name', '')):
# if there is no response and no artifact_name, try to get from the handle
self.artifact_name = os.path.basename(getattr(handle, 'name', ''))
# resolve the handle to be used
# only one of handle or folder can be provided (we checked for this above)
# if one was explicitly provided, use it
# if we have a folder, use a file named after file_name in that folder
# otherwise use an in-memory handle
remove_on_error = False
try:
if folder:
# TODO: this should be replaced with os.makedirs(path, exist_ok=True)
# once we drop support to python 2.7
if not os.path.exists(folder):
try:
os.makedirs(folder)
except FileExistsError:
pass
remove_on_error = True
self.handle = open(os.path.join(folder, self.artifact_name), mode='wb+', **kwargs)
else:
self.handle = handle or io.BytesIO()
if response:
# process the content in the response if available, write to handle
for chunk in response.iter_content(settings.DOWNLOAD_CHUNK_SIZE):
self.handle.write(chunk)
if hasattr(self.handle, 'flush'):
self.handle.flush()
if analyze:
# analyze the artifact in case it is needed
self.analyze_artifact()
except Exception:
try:
if remove_on_error and self.handle:
# make sure we cleanup the handle
# if an exception happened and this is a file we created
self.handle.close()
os.remove(self.handle.name)
except Exception:
logger.exception('Failed to cleanup the target file.')
raise
@classmethod
[docs] def download(cls, api, hash_value, hash_type, handle=None, folder=None, artifact_name=None):
return core.PolyswarmRequest(
api,
{
'method': 'GET',
'url': '{}/consumer/download/{}/{}'.format(api.uri, hash_type, hash_value),
'stream': True,
'params': { 'community': api.community },
},
result_parser=cls,
handle=handle,
folder=folder,
artifact_name=artifact_name,
).execute()
@classmethod
[docs] def download_id(cls, api, instance_id, handle=None, folder=None, artifact_name=None):
return core.PolyswarmRequest(
api,
{
'method': 'GET',
'url': '{}/instance/download'.format(api.uri),
'stream': True,
'params': {'instance_id': instance_id},
},
result_parser=cls,
handle=handle,
folder=folder,
artifact_name=artifact_name,
).execute()
@classmethod
[docs] def download_archive(cls, api, u, handle=None, folder=None, artifact_name=None):
""" This method is special, in that it is simply for downloading from S3 """
return core.PolyswarmRequest(
api,
{
'method': 'GET',
'url': u,
'stream': True,
'headers': {'Authorization': None}
},
result_parser=cls,
handle=handle,
folder=folder,
artifact_name=artifact_name,
).execute()
# Inspired by
# https://github.com/python/cpython/blob/29500737d45cbca9604d9ce845fb2acc3f531401/Lib/tempfile.py#L461
[docs] def __getattr__(self, name):
# Attribute lookups are delegated to the underlying file
# and cached for non-numeric results
# (i.e. methods are cached, closed and friends are not)
a = getattr(self.handle, name)
if hasattr(a, '__call__'):
func = a
@functools.wraps(func)
def func_wrapper(*args, **kwargs):
return func(*args, **kwargs)
a = func_wrapper
if not isinstance(a, int):
setattr(self, name, a)
return a
# https://github.com/python/cpython/blob/29500737d45cbca9604d9ce845fb2acc3f531401/Lib/tempfile.py#L499
# iter() doesn't use __getattr__ to find the __iter__ method
[docs] def __iter__(self):
for line in self.handle:
yield line
@classmethod
[docs] def from_handle(cls, api, handle, artifact_type=None, analyze=False, artifact_name=None, **kwargs):
# create the LocalHandle with the given handle and don't write anything to it
return cls(b'', handle=handle, artifact_name=artifact_name,
artifact_type=artifact_type, analyze=analyze, api=api)
@classmethod
[docs] def from_path(cls, api, path, artifact_type=None, analyze=False, artifact_name=None, **kwargs):
if not isinstance(path, string_types):
raise exceptions.InvalidValueException('Path should be a string')
artifact_name = artifact_name or os.path.basename(path)
handle = open(path, mode='rb', **kwargs)
# create the LocalHandle with the given handle and don't write anything to it
return cls(b'', handle=handle, artifact_name=artifact_name,
artifact_type=artifact_type, analyze=analyze, api=api)
@classmethod
[docs] def from_content(cls, api, content, artifact_name=None, artifact_type=None, analyze=False):
if isinstance(content, string_types):
content = content.encode("utf8")
handle = io.BytesIO(content)
# create the LocalHandle with the given handle and don't write anything to it
return cls(b'', handle=handle, artifact_name=artifact_name,
artifact_type=artifact_type, analyze=analyze, api=api)
@property
[docs] def hash(self):
self.analyze_artifact()
return super(LocalArtifact, self).hash
[docs] def analyze_artifact(self, force=False):
if not self.analyzed or force:
self.handle.seek(0)
self._calc_hashes(self.handle)
self.handle.seek(0)
self._run_analyzers(self.handle)
self.analyzed = True
# define the hash value only when analyzed
self._hash = self.sha256
[docs] def _calc_hashes(self, fh):
self.sha256, self.sha1, self.md5 = all_hashes(fh)
[docs] def _run_analyzers(self, fh):
# TODO implement custom analyzer support, so users can implement plugins here.
return {}
[docs] def __str__(self):
return "Artifact <%s>" % self.hash
[docs]class YaraRuleset(core.BaseJsonResource):
[docs] RESOURCE_ENDPOINT = '/hunt/rule'
def __init__(self, content, api=None):
super(YaraRuleset, self).__init__(content, api=api)
self.id = content.get('id')
self.livescan_id = content.get('livescan_id')
self.livescan_created = content.get('livescan_created')
self.name = content.get('name')
self.description = content.get('description')
self.created = core.parse_isoformat(content.get('created'))
self.modified = core.parse_isoformat(content.get('modified'))
self.deleted = content.get('deleted')
self.yara = content.get('yara')
[docs]class LiveYaraRuleset(YaraRuleset):
[docs] RESOURCE_ENDPOINT = '/hunt/rule/live'
[docs]class LiveHuntResult(core.BaseJsonResource):
[docs] RESOURCE_ENDPOINT = '/hunt/live'
def __init__(self, content, api=None):
super(LiveHuntResult, self).__init__(content=content, api=api)
self.id = content['id']
self.livescan_id = content['livescan_id']
self.instance_id = content['instance_id']
self.created = core.parse_isoformat(content['created'])
self.sha256 = content['sha256']
self.rule_name = content['rule_name']
self.tags = content['tags']
self.polyscore = content['polyscore']
self.malware_family = content['malware_family']
self.detections = content['detections']
self.yara = content.get('yara')
self.download_url = content.get('download_url')
self.community = content.get('community')
[docs]class LiveHuntResultList(LiveHuntResult):
[docs] RESOURCE_ENDPOINT = '/hunt/live/list'
[docs]class HistoricalHunt(core.BaseJsonResource):
[docs] RESOURCE_ENDPOINT = '/hunt/historical'
def __init__(self, content, api=None):
super(HistoricalHunt, self).__init__(content=content, api=api)
# active only present for live hunts
self.id = content['id']
self.created = core.parse_isoformat(content['created'])
self.status = content['status']
self.active = content.get('active')
self.ruleset_name = content.get('ruleset_name')
self.yara = content.get('yara')
self.summary = content.get('summary')
self.progress = content['progress']
self.results_csv_uri = content['results_csv_uri']
self.communities = content.get('communities')
[docs]class HistoricalHuntList(HistoricalHunt):
[docs] RESOURCE_ENDPOINT = '/hunt/historical/list'
[docs]class HistoricalHuntResult(core.BaseJsonResource):
[docs] RESOURCE_ENDPOINT = '/hunt/historical/results'
def __init__(self, content, api=None):
super(HistoricalHuntResult, self).__init__(content=content, api=api)
self.id = content['id']
self.historicalscan_id = content['historicalscan_id']
self.instance_id = content['instance_id']
self.sha256 = content['sha256']
self.created = core.parse_isoformat(content['created'])
self.rule_name = content['rule_name']
self.tags = content['tags']
self.polyscore = content['polyscore']
self.malware_family = content['malware_family']
self.detections = content['detections']
self.download_url = content.get('download_url')
self.community = content.get('community')
[docs]class HistoricalHuntResultList(HistoricalHuntResult):
[docs] RESOURCE_ENDPOINT = '/hunt/historical/results/list'
[docs]class TagLink(core.BaseJsonResource):
[docs] RESOURCE_ENDPOINT = '/tags/link'
[docs] RESOURCE_ID_KEYS = ['hash']
def __init__(self, content, api=None):
super(TagLink, self).__init__(content, api=api)
self.id = content.get('id')
self.sha256 = content.get('sha256')
self.created = core.parse_isoformat(content.get('created'))
self.updated = core.parse_isoformat(content.get('updated'))
self.first_seen = core.parse_isoformat(content.get('first_seen'))
self.tags = content.get('tags')
self.families = content.get('families')
self.emerging = core.parse_isoformat(content.get('emerging'))
@classmethod
[docs] def _list_params(cls, **kwargs):
params = []
empty = tuple()
params.extend(('tag', p) for p in kwargs.get('tags', empty))
params.extend(('family', p) for p in kwargs.get('families', empty))
params.extend(('or_tag', p) for p in kwargs.get('or_tags', empty))
params.extend(('or_family', p) for p in kwargs.get('or_families', empty))
params.append(('emerging', kwargs.get('emerging', empty)))
return params, None
[docs]class MalwareFamily(core.BaseJsonResource):
[docs] RESOURCE_ENDPOINT = '/tags/family'
[docs] RESOURCE_ID_KEYS = ['name']
def __init__(self, content, api=None):
super(MalwareFamily, self).__init__(content, api=api)
self.id = content.get('id')
self.created = core.parse_isoformat(content.get('created'))
self.updated = core.parse_isoformat(content.get('updated'))
self.name = content.get('name')
self.emerging = core.parse_isoformat(content.get('emerging'))
[docs]class Tag(core.BaseJsonResource):
[docs] RESOURCE_ENDPOINT = '/tags/tag'
[docs] RESOURCE_ID_KEYS = ['name']
def __init__(self, content, api=None):
super(Tag, self).__init__(content, api=api)
self.id = content.get('id')
self.created = core.parse_isoformat(content.get('created'))
self.updated = core.parse_isoformat(content.get('updated'))
self.name = content.get('name')
#####################################################################
# Nested Resources
#####################################################################
[docs]class Assertion(core.BaseJsonResource):
def __init__(self, content, api=None, scanfile=None):
super(Assertion, self).__init__(content, api=api)
self.scanfile = scanfile
self.author = content['author']
self.author_name = content['author_name']
self.engine_name = content['engine'].get('name')
self.bid = int(content['bid'])
self.mask = content['mask']
# deal with metadata being a string instead of null
self.metadata = content['metadata'] if content['metadata'] else {}
self.verdict = content['verdict']
[docs] def __str__(self):
return "Assertion-%s: %s" % (self.engine_name, self.verdict)
[docs]class Vote(core.BaseJsonResource):
def __init__(self, content, api=None, scanfile=None):
super(Vote, self).__init__(content, api=api)
self.scanfile = scanfile
self.arbiter = content['arbiter']
self.vote = content['vote']
[docs] def __str__(self):
return "Vote-%s: %s" % (self.arbiter, self.vote)
#####################################################################
# Resources Used as input parameters in PolyswarmAPI
#####################################################################
[docs]class ArtifactType(Enum):
@staticmethod
[docs] def parse(value):
if isinstance(value, ArtifactType):
return value
try:
return ArtifactType[value.upper()]
except Exception as e:
raise raise_from(
exceptions.InvalidValueException(
'Unable to get the artifact type from the provided value {}'.format(value)), e)
@staticmethod
[docs] def to_string(artifact_type):
return artifact_type.name.lower()
[docs] def decode_content(self, content):
if content is None:
return None
if self == ArtifactType.URL:
try:
return content.decode('utf-8')
except UnicodeDecodeError:
raise exceptions.DecodeErrorException('Error decoding URL')
else:
return content
[docs]class Hash(core.Hashable):
def __init__(self, hash_, hash_type=None, validate_hash=True):
super(Hash, self).__init__(hash_value=hash_, hash_type=hash_type, validate_hash=validate_hash)
@classmethod
[docs] def from_hashable(cls, hash_, hash_type=None):
"""
Coerce to Hashable object
:param hash_: Hashable object
:param hash_type: Hash type
:param polyswarm: PolyswarmAPI instance
:return: Hash
"""
if issubclass(type(hash_), core.Hashable):
if hash_type and hash_.hash_type != hash_type:
raise exceptions.InvalidValueException('Detected hash type {}, got {} for hashable {}'
.format(hash_.hash_type, hash_type, hash_.hash))
return Hash(hash_.hash, hash_type=hash_type)
return Hash(hash_, hash_type=hash_type)
[docs] def __hash__(self):
return hash(self.hash)
[docs] def __str__(self):
return self.hash
[docs] def __repr__(self):
return "{}={}".format(self.hash_type, self.hash)
[docs]class SandboxTask(core.BaseJsonResource):
[docs] RESOURCE_ENDPOINT = '/sandbox/sandboxtask'
def __init__(self, content, api=None):
super(SandboxTask, self).__init__(content, api=api)
self.id = content['id']
self.community = content['community']
self.sandbox = content['sandbox']
self.created = content['created']
self.expiration = content['expiration']
self.status = content['status']
self.account_number = content['account_number']
self.team_account_number = content['team_account_number']
self.instance_id = content['instance_id']
self.sha256 = content['sha256']
self.report = content['report']
self.upload_url = content['upload_url']
self.sandbox_artifacts = [SandboxArtifact(a, api=api) for a in content.get('sandbox_artifacts', [])]
[docs] def upload_file(self, artifact, attempts=3, **kwargs):
if not self.upload_url:
raise exceptions.InvalidValueException('upload_url must be set to upload a file')
if not artifact:
raise exceptions.InvalidValueException('A LocalArtifact must be provided in order to upload')
r = None
while attempts > 0 and not r:
attempts -= 1
artifact.seek(0, io.SEEK_END)
length = artifact.tell()
artifact.seek(0)
# https://github.com/psf/requests/issues/4215#issuecomment-319521235
# We have to manually handle the case when the file is empty
# in a way that requests won't set Transfer-Encoding: chunked
if not length:
artifact = ''
r = requests.put(self.upload_url, data=artifact, **kwargs)
r.raise_for_status()
return r
@classmethod
[docs] def get(cls, api, **kwargs):
return super().get(api, community=api.community, **kwargs)
@classmethod
[docs] def latest(cls, api, **kwargs):
params, _ = cls._get_params(community=api.community, **kwargs)
url = cls._endpoint(api) + '/latest'
parameters = {'method': 'GET', 'url': url, 'params': params}
return core.PolyswarmRequest(api, parameters, result_parser=cls).execute()
@classmethod
[docs] def my_tasks(cls, api, **kwargs):
params, _ = cls._get_params(community=api.community, **kwargs)
url = cls._endpoint(api) + '/my-tasks'
parameters = {'method': 'GET', 'url': url, 'params': params}
return core.PolyswarmRequest(api, parameters, result_parser=cls).execute()
@classmethod
[docs] def create_file(cls, api, **kwargs):
return cls._build_request(api, 'POST', cls._create_endpoint(api, **kwargs) + '/instance',
cls._create_headers(api), *cls._create_params(**kwargs)).execute()
@classmethod
[docs] def update_file(cls, api, **kwargs):
return cls._build_request(api, 'PUT', cls._update_endpoint(api, **kwargs) + '/instance',
cls._update_headers(api), *cls._update_params(**kwargs)).execute()
[docs]class SandboxArtifact(core.BaseJsonResource):
def __init__(self, content, api=None):
super(SandboxArtifact, self).__init__(content, api=api)
self.created = content['created']
self.id = content['id']
self.instance_id = content['instance_id']
self.name = content['name']
self.mimetype = content['mimetype']
self.extended_type = content['extended_type']
self.type = content['type']
[docs]class SandboxProvider(core.BaseJsonResource):
[docs] RESOURCE_ENDPOINT = "/sandbox/provider"
def __init__(self, content, api=None):
super(SandboxProvider, self).__init__(content, api=api)
self.slug = content['slug']
self.name = content['name']
self.tool = content['tool']
self.vms = content['vms']
@classmethod
[docs] def parse_result(cls, api, content, **kwargs):
logger.debug('Parsing resource %s', cls.__name__)
return [super(SandboxProvider, cls).parse_result(api, content[slug], **kwargs) for slug in content.keys()]