Phew, it's been a few days. I've done yet more refactoring, and am finally happy with how the quest definitions work. The code for this post is at 921090e
Stage execute loop
Stages now have pre-defined functions to execute, the execution looks like:
- Fetch quest stages that are ready to go from the quest graph
- Skip if already complete, and mark done
- Instantiate the stage
- run stage's
prepare()
method. This is intended in case the stage needs to do any tasks to fetch the data it needs - run stage's
condition()
to see whether the stage should execute. This method should decide whether the stage is ready to execute - If the condition returned True, then we can run
execute()
to do the task - Finally, check
is_done()
to decide whether this was a quest completion.
The actual code, which additionally handles a few edge-conditions.
while self.graph.is_active():
ready_nodes = self.graph.get_ready()
if not ready_nodes:
log.info("No more ready nodes, stopping execution")
break
log.info("Got Ready nodes", ready_nodes=ready_nodes)
for node in ready_nodes:
# skip if completed, avoids triggering two final stages
if self.complete:
log.info("Done flag set, skipping the rest")
return
# completed node: TODO: just not put completed nodes into the graph?
if node in self.completed_stages:
self.graph.done(node)
log.info(
"Node is already complete, skipping",
node=node,
complete=self.completed_stages,
)
continue
log_node = log.bind(node=node)
log_node.info("Begin processing stage")
# instantiate stage and execute
StageClass = self.stages[node]
stage = StageClass(self)
stage.prepare()
if stage.condition():
log_node.info("Condition check passed, executing")
stage.execute()
if stage.is_done():
log_node.info("Stage reports done")
self.completed_stages.append(node)
self.graph.done(node)
log.info("Done processing node")
Running this loop will cycle through all of the processable parts of the quest tree, including nodes that become available only after the previous node is complete.
The Stage
Abstract Base Class now looks like:
class Stage(ABC):
@property
@abstractmethod
def children(cls) -> List[str]:
""" List of children nodes of this stage """
return NotImplemented
def prepare(self) -> None:
""" Any preparation for the stage """
return
def condition(self) -> bool:
""" Function that will decide whether to execute """
return True
def execute(self) -> None:
""" Run the stage """
return
def is_done(self) -> bool:
""" Returns whether quest was completed """
return True
def __init__(self, quest: Quest):
self.quest = quest
def __repr__(self):
return f"{self.__class__.__name__}(quest={repr(self.quest)})"
As can be seen, only the children
property is required to implement; by default condition()
and is_done()
returns True if not overridden.
The reason there's both a condition()
and an is_done()
is take for example a quest that looks like this:
- Quest asks player to find some information and reply in a comment
-
condition()
decides whether this quest stage should fetch the comment (from some notification trigger?) -
execute()
fetches the data, checks the value, and then sends a reply saying "Yes that was it" or "No, that's not right, try again" - In the latter case,
is_done()
returns False, and the player doesn't progress, and can make another attempt; in the former case,is_done()
returns True, and the player progresses to the next stage
Conditions
I've added a new kind of stage called a ConditionStage
which lets us handle conditional execution of a stage. Without it, we can branch quests, but we can't control whether a branch will execute. It overrides the base class's condition()
method, and allows a quest definition to specify which data from the quest datastructure to check:
class ConditionStage(Stage):
""" For conditional branch execution """
@property
@abstractmethod
def variable(cls) -> str:
""" Name of the variable to check """
return NotImplemented
# the variable to check against
compare_variable: ClassVar[Optional[str]] = None
# the value to check against, if compare_variable is None
compare_value: ClassVar[Any] = None
# the operator to use comparison on
operator: ClassVar[Callable[..., bool]] = operator.eq
def condition(self) -> bool:
value_left = getattr(self.quest.quest_data, self.variable)
if self.compare_variable is not None:
value_right = getattr(self.quest.quest_data, self.compare_variable)
else:
value_right = self.compare_value
return self.operator(value_left, value_right)
A concrete implementation of the class looks like:
class TestQuestBranching(Quest):
class QuestDataModel(QuestBaseModel):
value_a: int = 1
value_b: int = 2
version = VersionInfo.parse("1.0.0")
difficulty = Difficulty.RESERVED
description = "This is a quest to test branching"
class Start(DebugStage):
children = ["BranchA", "BranchB"]
class BranchA(ConditionStage):
children = ["EndingA"]
variable = "value_a"
compare_variable = "value_b"
class BranchB(ConditionStage):
children = ["EndingB"]
variable = "value_a"
operator = operator.gt
compare_value = 10
class EndingA(FinalStage):
children = []
class EndingB(FinalStage):
children = []
Here, we have a quest that branches off into two, with two separate endings. The condition of the first branch is value_a
== value_b
, while the condition of the second is value_a
> 10, making use of python's built-in operator
library to provide comparison methods like operator.gt(a, b)
.
FinalStage
Finally, we need a way to mark quests as complete. Nodes with no children
will simply not trigger a next stage, but might not signify the end of a quest (they may be a dead-end branch). So the FinalStage
concrete implementation of a Stage
simply sets the quests's complete
boolean to True, (I've refactored quest storage to include this as part of the quest data model:
class FinalStage(Stage):
""" For ending the quest """
def __init_subclass__(cls):
cls.children = []
def execute(self) -> None:
self.quest.complete = True
With these concrete implementations of different loops, it's now possible to have quests that require conditions to be fulfilled, and mark done.
Top comments (0)