加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
linebuffer.py 25.96 KB
一键复制 编辑 原始数据 按行查看 历史
云金杞 提交于 2021-12-18 20:38 . update backtrader files
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829
#!/usr/bin/env python
# -*- coding: utf-8; py-indent-offset:4 -*-
###############################################################################
#
# Copyright (C) 2015-2020 Daniel Rodriguez
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################
'''
.. module:: linebuffer
Classes that hold the buffer for a *line* and can operate on it
with appends, forwarding, rewinding, resetting and other
.. moduleauthor:: Daniel Rodriguez
'''
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import array
import collections
import datetime
from itertools import islice
import math
from .utils.py3 import range, with_metaclass, string_types
from .lineroot import LineRoot, LineSingle, LineMultiple
from . import metabase
from .utils import num2date, time2num
NAN = float('NaN')
class LineBuffer(LineSingle):
'''
LineBuffer defines an interface to an "array.array" (or list) in which
index 0 points to the item which is active for input and output.
Positive indices fetch values from the past (left hand side)
Negative indices fetch values from the future (if the array has been
extended on the right hand side)
With this behavior no index has to be passed around to entities which have
to work with the current value produced by other entities: the value is
always reachable at "0".
Likewise storing the current value produced by "self" is done at 0.
Additional operations to move the pointer (home, forward, extend, rewind,
advance getzero) are provided
The class can also hold "bindings" to other LineBuffers. When a value
is set in this class
it will also be set in the binding.
'''
UnBounded, QBuffer = (0, 1)
def __init__(self):
self.lines = [self]
self.mode = self.UnBounded
self.bindings = list()
self.reset()
self._tz = None
def get_idx(self):
return self._idx
def set_idx(self, idx, force=False):
# if QBuffer and the last position of the buffer was reached, keep
# it (unless force) as index 0. This allows resampling
# - forward adds a position, but the 1st one is discarded, the 0 is
# invariant
# force supports replaying, which needs the extra bar to float
# forward/backwards, because the last input is read, and after a
# "backwards" is used to update the previous data. Unless the position
# 0 was moved to the previous index, it would fail
if self.mode == self.QBuffer:
if force or self._idx < self.lenmark:
self._idx = idx
else: # default: UnBounded
self._idx = idx
idx = property(get_idx, set_idx)
def reset(self):
''' Resets the internal buffer structure and the indices
'''
if self.mode == self.QBuffer:
# add extrasize to ensure resample/replay work because they will
# use backwards to erase the last bar/tick before delivering a new
# bar The previous forward would have discarded the bar "period"
# times ago and it will not come back. Having + 1 in the size
# allows the forward without removing that bar
self.array = collections.deque(maxlen=self.maxlen + self.extrasize)
self.useislice = True
else:
self.array = array.array(str('d'))
self.useislice = False
self.lencount = 0
self.idx = -1
self.extension = 0
def qbuffer(self, savemem=0, extrasize=0):
self.mode = self.QBuffer
self.maxlen = self._minperiod
self.extrasize = extrasize
self.lenmark = self.maxlen - (not self.extrasize)
self.reset()
def getindicators(self):
return []
def minbuffer(self, size):
'''The linebuffer must guarantee the minimum requested size to be
available.
In non-dqbuffer mode, this is always true (of course until data is
filled at the beginning, there are less values, but minperiod in the
framework should account for this.
In dqbuffer mode the buffer has to be adjusted for this if currently
less than requested
'''
if self.mode != self.QBuffer or self.maxlen >= size:
return
self.maxlen = size
self.lenmark = self.maxlen - (not self.extrasize)
self.reset()
def __len__(self):
return self.lencount
def buflen(self):
''' Real data that can be currently held in the internal buffer
The internal buffer can be longer than the actual stored data to
allow for "lookahead" operations. The real amount of data that is
held/can be held in the buffer
is returned
'''
return len(self.array) - self.extension
def __getitem__(self, ago):
return self.array[self.idx + ago]
def get(self, ago=0, size=1):
''' Returns a slice of the array relative to *ago*
Keyword Args:
ago (int): Point of the array to which size will be added
to return the slice size(int): size of the slice to return,
can be positive or negative
If size is positive *ago* will mark the end of the iterable and vice
versa if size is negative
Returns:
A slice of the underlying buffer
'''
if self.useislice:
start = self.idx + ago - size + 1
end = self.idx + ago + 1
return list(islice(self.array, start, end))
return self.array[self.idx + ago - size + 1:self.idx + ago + 1]
def getzeroval(self, idx=0):
''' Returns a single value of the array relative to the real zero
of the buffer
Keyword Args:
idx (int): Where to start relative to the real start of the buffer
size(int): size of the slice to return
Returns:
A slice of the underlying buffer
'''
return self.array[idx]
def getzero(self, idx=0, size=1):
''' Returns a slice of the array relative to the real zero of the buffer
Keyword Args:
idx (int): Where to start relative to the real start of the buffer
size(int): size of the slice to return
Returns:
A slice of the underlying buffer
'''
if self.useislice:
return list(islice(self.array, idx, idx + size))
return self.array[idx:idx + size]
def __setitem__(self, ago, value):
''' Sets a value at position "ago" and executes any associated bindings
Keyword Args:
ago (int): Point of the array to which size will be added to return
the slice
value (variable): value to be set
'''
self.array[self.idx + ago] = value
for binding in self.bindings:
binding[ago] = value
def set(self, value, ago=0):
''' Sets a value at position "ago" and executes any associated bindings
Keyword Args:
value (variable): value to be set
ago (int): Point of the array to which size will be added to return
the slice
'''
self.array[self.idx + ago] = value
for binding in self.bindings:
binding[ago] = value
def home(self):
''' Rewinds the logical index to the beginning
The underlying buffer remains untouched and the actual len can be found
out with buflen
'''
self.idx = -1
self.lencount = 0
def forward(self, value=NAN, size=1):
''' Moves the logical index foward and enlarges the buffer as much as needed
Keyword Args:
value (variable): value to be set in new positins
size (int): How many extra positions to enlarge the buffer
'''
self.idx += size
self.lencount += size
for i in range(size):
self.array.append(value)
def backwards(self, size=1, force=False):
''' Moves the logical index backwards and reduces the buffer as much as needed
Keyword Args:
size (int): How many extra positions to rewind and reduce the
buffer
'''
# Go directly to property setter to support force
self.set_idx(self._idx - size, force=force)
self.lencount -= size
for i in range(size):
self.array.pop()
def rewind(self, size=1):
self.idx -= size
self.lencount -= size
def advance(self, size=1):
''' Advances the logical index without touching the underlying buffer
Keyword Args:
size (int): How many extra positions to move forward
'''
self.idx += size
self.lencount += size
def extend(self, value=NAN, size=0):
''' Extends the underlying array with positions that the index will not reach
Keyword Args:
value (variable): value to be set in new positins
size (int): How many extra positions to enlarge the buffer
The purpose is to allow for lookahead operations or to be able to
set values in the buffer "future"
'''
self.extension += size
for i in range(size):
self.array.append(value)
def addbinding(self, binding):
''' Adds another line binding
Keyword Args:
binding (LineBuffer): another line that must be set when this line
becomes a value
'''
self.bindings.append(binding)
# record in the binding when the period is starting (never sooner
# than self)
binding.updateminperiod(self._minperiod)
def plot(self, idx=0, size=None):
''' Returns a slice of the array relative to the real zero of the buffer
Keyword Args:
idx (int): Where to start relative to the real start of the buffer
size(int): size of the slice to return
This is a variant of getzero which unless told otherwise returns the
entire buffer, which is usually the idea behind plottint (all must
plotted)
Returns:
A slice of the underlying buffer
'''
return self.getzero(idx, size or len(self))
def plotrange(self, start, end):
if self.useislice:
return list(islice(self.array, start, end))
return self.array[start:end]
def oncebinding(self):
'''
Executes the bindings when running in "once" mode
'''
larray = self.array
blen = self.buflen()
for binding in self.bindings:
binding.array[0:blen] = larray[0:blen]
def bind2lines(self, binding=0):
'''
Stores a binding to another line. "binding" can be an index or a name
'''
if isinstance(binding, string_types):
line = getattr(self._owner.lines, binding)
else:
line = self._owner.lines[binding]
self.addbinding(line)
return self
bind2line = bind2lines
def __call__(self, ago=None):
'''Returns either a delayed verison of itself in the form of a
LineDelay object or a timeframe adapting version with regards to a ago
Param: ago (default: None)
If ago is None or an instance of LineRoot (a lines object) the
returned valued is a LineCoupler instance
If ago is anything else, it is assumed to be an int and a LineDelay
object will be returned
'''
from .lineiterator import LineCoupler
if ago is None or isinstance(ago, LineRoot):
return LineCoupler(self, ago)
return LineDelay(self, ago)
def _makeoperation(self, other, operation, r=False, _ownerskip=None):
return LinesOperation(self, other, operation, r=r,
_ownerskip=_ownerskip)
def _makeoperationown(self, operation, _ownerskip=None):
return LineOwnOperation(self, operation, _ownerskip=_ownerskip)
def _settz(self, tz):
self._tz = tz
def datetime(self, ago=0, tz=None, naive=True):
return num2date(self.array[self.idx + ago],
tz=tz or self._tz, naive=naive)
def date(self, ago=0, tz=None, naive=True):
return num2date(self.array[self.idx + ago],
tz=tz or self._tz, naive=naive).date()
def time(self, ago=0, tz=None, naive=True):
return num2date(self.array[self.idx + ago],
tz=tz or self._tz, naive=naive).time()
def dt(self, ago=0):
'''
return numeric date part of datetimefloat
'''
return math.trunc(self.array[self.idx + ago])
def tm_raw(self, ago=0):
'''
return raw numeric time part of datetimefloat
'''
# This function is named raw because it retrieves the fractional part
# without transforming it to time to avoid the influence of the day
# count (integer part of coding)
return math.modf(self.array[self.idx + ago])[0]
def tm(self, ago=0):
'''
return numeric time part of datetimefloat
'''
# To avoid precision errors, this returns the fractional part after
# having converted it to a datetime.time object to avoid precision
# errors in comparisons
return time2num(num2date(self.array[self.idx + ago]).time())
def tm_lt(self, other, ago=0):
'''
return numeric time part of datetimefloat
'''
# To compare a raw "tm" part (fractional part of coded datetime)
# with the tm of the current datetime, the raw "tm" has to be
# brought in sync with the current "day" count (integer part) to avoid
dtime = self.array[self.idx + ago]
tm, dt = math.modf(dtime)
return dtime < (dt + other)
def tm_le(self, other, ago=0):
'''
return numeric time part of datetimefloat
'''
# To compare a raw "tm" part (fractional part of coded datetime)
# with the tm of the current datetime, the raw "tm" has to be
# brought in sync with the current "day" count (integer part) to avoid
dtime = self.array[self.idx + ago]
tm, dt = math.modf(dtime)
return dtime <= (dt + other)
def tm_eq(self, other, ago=0):
'''
return numeric time part of datetimefloat
'''
# To compare a raw "tm" part (fractional part of coded datetime)
# with the tm of the current datetime, the raw "tm" has to be
# brought in sync with the current "day" count (integer part) to avoid
dtime = self.array[self.idx + ago]
tm, dt = math.modf(dtime)
return dtime == (dt + other)
def tm_gt(self, other, ago=0):
'''
return numeric time part of datetimefloat
'''
# To compare a raw "tm" part (fractional part of coded datetime)
# with the tm of the current datetime, the raw "tm" has to be
# brought in sync with the current "day" count (integer part) to avoid
dtime = self.array[self.idx + ago]
tm, dt = math.modf(dtime)
return dtime > (dt + other)
def tm_ge(self, other, ago=0):
'''
return numeric time part of datetimefloat
'''
# To compare a raw "tm" part (fractional part of coded datetime)
# with the tm of the current datetime, the raw "tm" has to be
# brought in sync with the current "day" count (integer part) to avoid
dtime = self.array[self.idx + ago]
tm, dt = math.modf(dtime)
return dtime >= (dt + other)
def tm2dtime(self, tm, ago=0):
'''
Returns the given ``tm`` in the frame of the (ago bars) datatime.
Useful for external comparisons to avoid precision errors
'''
return int(self.array[self.idx + ago]) + tm
def tm2datetime(self, tm, ago=0):
'''
Returns the given ``tm`` in the frame of the (ago bars) datatime.
Useful for external comparisons to avoid precision errors
'''
return num2date(int(self.array[self.idx + ago]) + tm)
class MetaLineActions(LineBuffer.__class__):
'''
Metaclass for Lineactions
Scans the instance before init for LineBuffer (or parentclass LineSingle)
instances to calculate the minperiod for this instance
postinit it registers the instance to the owner (remember that owner has
been found in the base Metaclass for LineRoot)
'''
_acache = dict()
_acacheuse = False
@classmethod
def cleancache(cls):
cls._acache = dict()
@classmethod
def usecache(cls, onoff):
cls._acacheuse = onoff
def __call__(cls, *args, **kwargs):
if not cls._acacheuse:
return super(MetaLineActions, cls).__call__(*args, **kwargs)
# implement a cache to avoid duplicating lines actions
ckey = (cls, tuple(args), tuple(kwargs.items())) # tuples hashable
try:
return cls._acache[ckey]
except TypeError: # something not hashable
return super(MetaLineActions, cls).__call__(*args, **kwargs)
except KeyError:
pass # hashable but not in the cache
_obj = super(MetaLineActions, cls).__call__(*args, **kwargs)
return cls._acache.setdefault(ckey, _obj)
def dopreinit(cls, _obj, *args, **kwargs):
_obj, args, kwargs = \
super(MetaLineActions, cls).dopreinit(_obj, *args, **kwargs)
_obj._clock = _obj._owner # default setting
if isinstance(args[0], LineRoot):
_obj._clock = args[0]
# Keep a reference to the datas for buffer adjustment purposes
_obj._datas = [x for x in args if isinstance(x, LineRoot)]
# Do not produce anything until the operation lines produce something
_minperiods = [x._minperiod for x in args if isinstance(x, LineSingle)]
mlines = [x.lines[0] for x in args if isinstance(x, LineMultiple)]
_minperiods += [x._minperiod for x in mlines]
_minperiod = max(_minperiods or [1])
# update own minperiod if needed
_obj.updateminperiod(_minperiod)
return _obj, args, kwargs
def dopostinit(cls, _obj, *args, **kwargs):
_obj, args, kwargs = \
super(MetaLineActions, cls).dopostinit(_obj, *args, **kwargs)
# register with _owner to be kicked later
_obj._owner.addindicator(_obj)
return _obj, args, kwargs
class PseudoArray(object):
def __init__(self, wrapped):
self.wrapped = wrapped
def __getitem__(self, key):
return self.wrapped
@property
def array(self):
return self
class LineActions(with_metaclass(MetaLineActions, LineBuffer)):
'''
Base class derived from LineBuffer intented to defined the
minimum interface to make it compatible with a LineIterator by
providing operational _next and _once interfaces.
The metaclass does the dirty job of calculating minperiods and registering
'''
_ltype = LineBuffer.IndType
def getindicators(self):
return []
def qbuffer(self, savemem=0):
super(LineActions, self).qbuffer(savemem=savemem)
for data in self._datas:
data.minbuffer(size=self._minperiod)
@staticmethod
def arrayize(obj):
if isinstance(obj, LineRoot):
if not isinstance(obj, LineSingle):
obj = obj.lines[0] # get 1st line from multiline
else:
obj = PseudoArray(obj)
return obj
def _next(self):
clock_len = len(self._clock)
if clock_len > len(self):
self.forward()
if clock_len > self._minperiod:
self.next()
elif clock_len == self._minperiod:
# only called for the 1st value
self.nextstart()
else:
self.prenext()
def _once(self):
self.forward(size=self._clock.buflen())
self.home()
self.preonce(0, self._minperiod - 1)
self.oncestart(self._minperiod - 1, self._minperiod)
self.once(self._minperiod, self.buflen())
self.oncebinding()
def LineDelay(a, ago=0, **kwargs):
if ago <= 0:
return _LineDelay(a, ago, **kwargs)
return _LineForward(a, ago, **kwargs)
def LineNum(num):
return LineDelay(PseudoArray(num))
class _LineDelay(LineActions):
'''
Takes a LineBuffer (or derived) object and stores the value from
"ago" periods effectively delaying the delivery of data
'''
def __init__(self, a, ago):
super(_LineDelay, self).__init__()
self.a = a
self.ago = ago
# Need to add the delay to the period. "ago" is 0 based and therefore
# we need to pass and extra 1 which is the minimum defined period for
# any data (which will be substracted inside addminperiod)
self.addminperiod(abs(ago) + 1)
def next(self):
self[0] = self.a[self.ago]
def once(self, start, end):
# cache python dictionary lookups
dst = self.array
src = self.a.array
ago = self.ago
for i in range(start, end):
dst[i] = src[i + ago]
class _LineForward(LineActions):
'''
Takes a LineBuffer (or derived) object and stores the value from
"ago" periods from the future
'''
def __init__(self, a, ago):
super(_LineForward, self).__init__()
self.a = a
self.ago = ago
# Need to add the delay to the period. "ago" is 0 based and therefore
# we need to pass and extra 1 which is the minimum defined period for
# any data (which will be substracted inside addminperiod)
# self.addminperiod(abs(ago) + 1)
if ago > self.a._minperiod:
self.addminperiod(ago - self.a._minperiod + 1)
def next(self):
self[-self.ago] = self.a[0]
def once(self, start, end):
# cache python dictionary lookups
dst = self.array
src = self.a.array
ago = self.ago
for i in range(start, end):
dst[i - ago] = src[i]
class LinesOperation(LineActions):
'''
Holds an operation that operates on a two operands. Example: mul
It will "next"/traverse the array applying the operation on the
two operands and storing the result in self.
To optimize the operations and avoid conditional checks the right
next/once is chosen using the operation direction (normal or reversed)
and the nature of the operands (LineBuffer vs non-LineBuffer)
In the "once" operations "map" could be used as in:
operated = map(self.operation, srca[start:end], srcb[start:end])
self.array[start:end] = array.array(str(self.typecode), operated)
No real execution time benefits were appreciated and therefore the loops
have been kept in place for clarity (although the maps are not really
unclear here)
'''
def __init__(self, a, b, operation, r=False):
super(LinesOperation, self).__init__()
self.operation = operation
self.a = a # always a linebuffer
self.b = b
self.r = r
self.bline = isinstance(b, LineBuffer)
self.btime = isinstance(b, datetime.time)
self.bfloat = not self.bline and not self.btime
if r:
self.a, self.b = b, a
def next(self):
if self.bline:
self[0] = self.operation(self.a[0], self.b[0])
elif not self.r:
if not self.btime:
self[0] = self.operation(self.a[0], self.b)
else:
self[0] = self.operation(self.a.time(), self.b)
else:
self[0] = self.operation(self.a, self.b[0])
def once(self, start, end):
if self.bline:
self._once_op(start, end)
elif not self.r:
if not self.btime:
self._once_val_op(start, end)
else:
self._once_time_op(start, end)
else:
self._once_val_op_r(start, end)
def _once_op(self, start, end):
# cache python dictionary lookups
dst = self.array
srca = self.a.array
srcb = self.b.array
op = self.operation
for i in range(start, end):
dst[i] = op(srca[i], srcb[i])
def _once_time_op(self, start, end):
# cache python dictionary lookups
dst = self.array
srca = self.a.array
srcb = self.b
op = self.operation
tz = self._tz
for i in range(start, end):
dst[i] = op(num2date(srca[i], tz=tz).time(), srcb)
def _once_val_op(self, start, end):
# cache python dictionary lookups
dst = self.array
srca = self.a.array
srcb = self.b
op = self.operation
for i in range(start, end):
dst[i] = op(srca[i], srcb)
def _once_val_op_r(self, start, end):
# cache python dictionary lookups
dst = self.array
srca = self.a
srcb = self.b.array
op = self.operation
for i in range(start, end):
dst[i] = op(srca, srcb[i])
class LineOwnOperation(LineActions):
'''
Holds an operation that operates on a single operand. Example: abs
It will "next"/traverse the array applying the operation and storing
the result in self
'''
def __init__(self, a, operation):
super(LineOwnOperation, self).__init__()
self.operation = operation
self.a = a
def next(self):
self[0] = self.operation(self.a[0])
def once(self, start, end):
# cache python dictionary lookups
dst = self.array
srca = self.a.array
op = self.operation
for i in range(start, end):
dst[i] = op(srca[i])
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化