from io import BytesIO
from SpiffWorkflow.bpmn.BpmnWorkflow import BpmnWorkflow
from SpiffWorkflow.bpmn.storage.BpmnSerializer import BpmnSerializer
from SpiffWorkflow.bpmn.storage.CompactWorkflowSerializer import CompactWorkflowSerializer
from SpiffWorkflow import Task
from SpiffWorkflow.specs import WorkflowSpec
from SpiffWorkflow.bpmn.storage.Packager import Packager
from SpiffWorkflow.bpmn.parser.BpmnParser import BpmnParser
from SpiffWorkflow.bpmn.specs.BpmnSpecMixin import BpmnSpecMixin
from SpiffWorkflow.specs.Simple import Simple
from SpiffWorkflow.bpmn.parser.TaskParser import TaskParser
from SpiffWorkflow.bpmn.parser.util import *
from SpiffWorkflow.bpmn.specs.UserTask import UserTask
class ServiceTask(Simple, BpmnSpecMixin):
#class ServiceTask(UserTask):
"""
Task Spec for a bpmn:serviceTask node.
"""
def is_engine_task(self):
return False
def entering_complete_state(self, task):
print "Do command : %s" % task.get_description()
class ServiceTaskParser(TaskParser):
pass
class CloudBpmnParser(BpmnParser):
OVERRIDE_PARSER_CLASSES = {
full_tag('serviceTask') : (ServiceTaskParser, ServiceTask),
}
class InMemoryPackager(Packager):
"""
Creates spiff's wf packages on the fly.
"""
PARSER_CLASS = CloudBpmnParser
@classmethod
def package_in_memory(cls, workflow_name, workflow_files, editor='signavio'):
"""
Generates wf packages from workflow diagrams.
Args:
workflow_name: Name of wf
workflow_files: Diagram file.
Returns:
Workflow package (file like) object
"""
s = BytesIO()
p = cls(s, workflow_name, meta_data=[], editor=editor)
p.add_bpmn_files_by_glob(workflow_files)
p.create_package()
return s.getvalue()
class Node(object):
"""
Keep the Task information
"""
def __init__(self, task):
self.input = {}
self.output = {}
self.task = None
self.task_type = None
self.task_name = None
self.description = None
self.activity = None
self.init_task(task)
def init_task(self, task):
self.task = task
self.task_type = task.task_spec.__class__.__name__
self.task_name = task.get_name()
self.description = task.get_description()
self.activity = getattr(task.task_spec, 'service_class', '')
def show(self):
print "task type:%s" % self.task_type
print "task name:%s" % self.task_name
print "description:%s" % self.description
print "activity :%s" % self.activity
print "state name:%s" % self.task.get_state_name()
print "\n"
class BpmnEngine:
def __init__(self, path, name):
self.spec = self.load_spec(path, name)
self.workflow = BpmnWorkflow(self.spec)
self.run_engine()
def create_workflow(self):
self.workflow_spec = self.load_workflow_spec()
def load_spec(self, content_path, workflow_name):
return self.load_workflow_spec(content_path, workflow_name)
def load_workflow_spec(self, content_path, workflow_name):
package = InMemoryPackager.package_in_memory(workflow_name, content_path)
return BpmnSerializer().deserialize_workflow_spec(package)
def start_engine(self, **kwargs):
self.setUp()
def run_engine(self):
for task in self.workflow.get_tasks():
task_name = task.get_name()
print task_name
def run_engine(self):
while 1:
self.workflow.do_engine_steps()
tasks = self.workflow.get_tasks(Task.READY)
if len(tasks) == 0:
break
for task in tasks:
current_node = Node(task)
current_node.show()
self.workflow.complete_task_from_id(task.id)
BpmnEngine('./test.bpmn', 'Approvals')
GitHub BPMN: https://github.com/knipknap/SpiffWorkflow
Zengine: https://github.com/zetaops/zengine